jobgen

package
v0.0.0-...-bcffac6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2016 License: BSD-2-Clause Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CYCLE_CLASS_NAME = "ScriptFramework\\Script_JobGenerator"
	PINBA_CLASS_NAME = "\\" + CYCLE_CLASS_NAME

	JOBS_TYPE_NONE      = "none"
	JOBS_TYPE_INSTANCES = "instances"
	JOBS_TYPE_RANGE     = "range"
	JOBS_TYPE_CUSTOM    = "custom"

	LOCATION_TYPE_ANY  = "any"
	LOCATION_TYPE_EACH = "each"

	LOCATION_ALL = "*"

	METHOD_RUN         = "run"
	METHOD_INIT_JOBS   = "initJobs"
	METHOD_FINISH_JOBS = "finishJobs"

	RUN_STATUS_WAITING  = "Waiting"
	RUN_STATUS_INIT     = "Init"
	RUN_STATUS_RUNNING  = "Running"
	RUN_STATUS_FINISHED = "Finished"

	DEFAULT_LOCATION_IDX = "0"

	HOSTS_UPDATE_INTERVAL = time.Second * 5

	DELETE_IDS_KEEP_GENERATIONS = 3 // buffered channel (1) + processing (1) + selecting (1)

	DEVELOPER_DEBUG_HOSTNAME = "www1"

	SELECT_HOSTNAME_MAX_WAITING_LEN = 5
	THROTTLE_CHAN_CAPACITY          = 5
)
View Source
const (
	PHPROXY_TAG      = "scriptframework"
	INIT_TIMEOUT_SEC = 20

	DEVELOPER_CUSTOM_PATH_TIMEOUT = 14400 // how long "developer" field has effect, in seconds
	PROFILING_TIMEOUT             = 3600  // how long "profiling_enabled" field has effect, in seconds
	DEBUG_TIMEOUT                 = 1200  // how long "debug_enabled" field has effect, in seconds

	KILL_ACTION_NO_ACTION                 = "noAction"
	KILL_ACTION_DELETE_FROM_QUEUE         = "deleteFromQueue"
	KILL_ACTION_SET_WAITING               = "setWaiting"
	KILL_ACTION_LOG_SCRIPT_FINISH_INIT    = "logScriptFinishInit"
	KILL_ACTION_LOG_SCRIPT_FINISH_RUNNING = "logScriptFinishRunning"
)
View Source
const (
	TABLE_ACTION_LOG          = "ActionLog"
	TABLE_SCRIPT              = "Script"
	TABLE_ACTION_SETTINGS     = "ActionScriptSettings"
	TABLE_SCRIPT_SETTINGS     = "ScriptSettings"
	TABLE_SCRIPT_JOB_INFO     = "ScriptJobInfo"
	TABLE_SCRIPT_JOB_RESULT   = "ScriptJobResult"
	TABLE_SCRIPT_FAIL_INFO    = "ScriptFailInfo"
	TABLE_SCRIPT_RUSAGE_STATS = "ScriptRusageStats"
	TABLE_SCRIPT_TIMETABLE    = "ScriptTimetable"
	TABLE_SCRIPT_TAG          = "ScriptTag"
	TABLE_SCRIPT_FLAGS        = "ScriptFlags"
	TABLE_SERVER              = "Server"
	TABLE_SERVER_GROUP        = "ServerGroup"
	TABLE_RUN_QUEUE           = "RunQueue"
)
View Source
const (
	QUERY_SELECT_FROM_TIMETABLE = `SELECT
		id, generation_id, class_name, ` + "`repeat`" + `, retry_count, default_retry, job_data, method,
		location, UNIX_TIMESTAMP(finished_ts), finished_successfully, finish_count, UNIX_TIMESTAMP(next_launch_ts),
		UNIX_TIMESTAMP(added_to_queue_ts), token, settings_id, UNIX_TIMESTAMP(created)
		FROM ` + TABLE_SCRIPT_TIMETABLE + ` #add_where#`

	QUERY_GET_RUNQUEUE = `SELECT
		id, class_name, timetable_id, generation_id, hostname, hostname_idx, job_data, method, run_status,
		UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(waiting_ts), UNIX_TIMESTAMP(should_init_ts), init_attempts,
		UNIX_TIMESTAMP(init_ts), UNIX_TIMESTAMP(running_ts), UNIX_TIMESTAMP(max_finished_ts), UNIX_TIMESTAMP(finished_ts),
		stopped_employee_id, token, retry_attempt, settings_id
      FROM ` + TABLE_RUN_QUEUE + ` #where#`

	QUERY_SIMPLE_GET_SCRIPTS_FOR_PLATFORM = "SELECT class_name, settings_id FROM " + TABLE_SCRIPT

	QUERY_GET_NEW_SETTINGS = `SELECT id, class_name, instance_count, max_time, jobs,
		next_ts_callback, ` + "`repeat`" + `, retry, ttl, repeat_job, retry_job, location, location_type, developer, max_retries,
		profiling_enabled, debug_enabled, named_params, UNIX_TIMESTAMP(created)
		FROM ` + TABLE_SCRIPT_SETTINGS + `
		WHERE id IN(#new_settings_ids#)`

	QUERY_GET_JOB_INFO = `SELECT generation_id, class_name, location,
		UNIX_TIMESTAMP(init_jobs_ts), UNIX_TIMESTAMP(jobs_generated_ts), UNIX_TIMESTAMP(finish_jobs_ts), UNIX_TIMESTAMP(next_generate_job_ts),
		settings_id
		FROM ` + TABLE_SCRIPT_JOB_INFO

	QUERY_GET_FLAGS = `SELECT class_name,
		UNIX_TIMESTAMP(run_requested_ts), UNIX_TIMESTAMP(run_accepted_ts), UNIX_TIMESTAMP(pause_requested_ts), UNIX_TIMESTAMP(kill_requested_ts),
		kill_request_employee_id, UNIX_TIMESTAMP(run_queue_killed_ts), UNIX_TIMESTAMP(killed_ts), UNIX_TIMESTAMP(paused_ts)
		FROM ` + TABLE_SCRIPT_FLAGS

	QUERY_GET_SCRIPTS_RUSAGE_STATS = `SELECT class_name, real_time, sys_time, user_time, max_memory
		FROM ` + TABLE_SCRIPT_RUSAGE_STATS

	QUERY_GET_AVAILABLE_HOSTS = `SELECT
            hostname,
            ` + "`group`" + `,
            FLOOR(cpu_idle * cpu_parrots_per_core * cpu_cores / 100) AS cpu_idle_parrots,
            cpu_parrots_per_core,
            ROUND(cpu_idle * cpu_cores / 100, 2) AS cpu_idle_cores,
            cpu_cores,
            cpu_parasite,
            mem_total,
            mem_free,
            mem_cached,
            mem_parasite,
            swap_used,
            min_memory,
            min_memory_ratio
        FROM ` + TABLE_SERVER + `
        WHERE phproxyd_heartbeat_ts > NOW() - INTERVAL 15 SECOND AND disabled_ts IS NULL
        ORDER BY cpu_idle_parrots DESC`

	// Update queries
	QUERY_INSERT_INTO_TIMETABLE = `INSERT INTO ` + TABLE_SCRIPT_TIMETABLE + `
		(class_name, default_retry, ` + "`repeat`" + `, method, finished_successfully, generation_id, settings_id, location, job_data, shard_id, created, next_launch_ts)
		VALUES #values#`

	QUERY_INSERT_INTO_JOB_INFO = `INSERT INTO ` + TABLE_SCRIPT_JOB_INFO + ` #fields#
		VALUES #values#`

	QUERY_SET_JOBS_GENERATED_TS = `UPDATE ` + TABLE_SCRIPT_JOB_INFO + `
        SET jobs_generated_ts = NOW()
        WHERE class_name = '#class_name#' AND location IN(#locations#)`

	QUERY_CLEAR_JOB_RESULTS = `DELETE FROM ` + TABLE_SCRIPT_JOB_RESULT + `
        WHERE class_name = '#class_name#'`

	QUERY_CLEAR_JOB_RESULTS_FOR_LOCATIONS = `DELETE FROM ` + TABLE_SCRIPT_JOB_RESULT + `
        WHERE class_name = '#class_name#' AND location IN(#locations#)`

	QUERY_SET_NEXT_GENERATE_JOB_TS = `UPDATE ` + TABLE_SCRIPT_JOB_INFO + `
        SET
            next_generate_job_ts = FROM_UNIXTIME(#next_generate_job_ts#),
            jobs_generated_ts = NULL, jobs_finished_ts = NULL, init_jobs_ts = NULL, finish_jobs_ts = NULL,
            generation_id = generation_id + 1, settings_id = #settings_id#
        WHERE class_name = '#class_name#'`

	QUERY_BATCH_SET_NEXT_GENERATE_JOB_TS = `INSERT INTO ` + TABLE_SCRIPT_JOB_INFO + `
		#fields#
		VALUES #values#
		ON DUPLICATE KEY UPDATE
			next_generate_job_ts = VALUES(next_generate_job_ts), jobs_generated_ts = VALUES(jobs_generated_ts),
			jobs_finished_ts = VALUES(jobs_finished_ts), init_jobs_ts = VALUES(init_jobs_ts),
			finish_jobs_ts = VALUES(finish_jobs_ts), generation_id = generation_id + 1,
			settings_id = VALUES(settings_id)`

	QUERY_SET_INIT_JOBS_TS = `UPDATE ` + TABLE_SCRIPT_JOB_INFO + `
        SET init_jobs_ts = NOW()
        WHERE class_name = '#class_name#' AND location IN(#locations#)`

	QUERY_SET_FINISH_JOBS_TS = `UPDATE ` + TABLE_SCRIPT_JOB_INFO + `
        SET finish_jobs_ts = NOW()
        WHERE class_name = '#class_name#' AND location IN(#locations#)`

	QUERY_SET_MAX_FINISHED_TS_NOW = `UPDATE ` + TABLE_RUN_QUEUE + `
        SET max_finished_ts = FROM_UNIXTIME(#ts#), stopped_employee_id = #employee_id#
        WHERE class_name = '#class_name#'`

	QUERY_INSERT_INTO_RUN_QUEUE = "INSERT INTO " + TABLE_RUN_QUEUE + "#fields# VALUES#values#"

	QUERY_LOG_ADD_TO_QUEUE = "UPDATE " + TABLE_SCRIPT_TIMETABLE + " SET added_to_queue_ts = NOW() WHERE id IN(#ids#) AND added_to_queue_ts IS NULL"

	QUERY_UPDATE_TIMETABLE_STATUS = "UPDATE " + TABLE_SCRIPT_TIMETABLE + ` SET
		finished_ts = FROM_UNIXTIME(#finished_ts#), next_launch_ts = FROM_UNIXTIME(#next_launch_ts#), added_to_queue_ts = FROM_UNIXTIME(#added_to_queue_ts#),
		retry_count = #retry_count#, finish_count = #finish_count#, finished_successfully = #finished_successfully#
		WHERE id = #id# #add_where#`

	QUERY_DELETE_FROM_TIMETABLE = "DELETE FROM " + TABLE_SCRIPT_TIMETABLE + " WHERE id IN(#ids#) #add_where#"

	QUERY_RESET_RUN_REQUEST = `UPDATE ` + TABLE_SCRIPT_FLAGS + `
        SET run_requested_ts = NULL, run_accepted_ts = NULL
        WHERE class_name = '#class_name#'`

	QUERY_SET_RUN_ACCEPTED = `UPDATE ` + TABLE_SCRIPT_FLAGS + `
        SET run_accepted_ts = NOW()
        WHERE class_name = '#class_name#'`

	QUERY_DELETE_FROM_QUEUE = "DELETE FROM " + TABLE_RUN_QUEUE + " WHERE id IN(#ids#) AND run_status = '#status#'"

	QUERY_UPDATE_RUN_STATUS = "UPDATE " + TABLE_RUN_QUEUE + `
		SET run_status = '#status#', #status#_ts = NOW()
		WHERE id = #id# AND run_status = '#prev_status#'`

	QUERY_UPDATE_RUN_STATUS_INIT = "UPDATE " + TABLE_RUN_QUEUE + `
		SET run_status = 'Init', init_ts = NOW(), max_finished_ts = created + INTERVAL #max_time# SECOND
		WHERE id = #id#`

	QUERY_QUERY_CLEAR_OLD_HEARTBEATS = `DELETE FROM ` + TABLE_RUN_QUEUE + `
		WHERE class_name = '#class_name#'
		AND run_status = 'Waiting' AND created < NOW() - INTERVAL 30 SECOND`

	QUERY_CHECK_TT_IDS = "SELECT id FROM " + TABLE_SCRIPT_TIMETABLE + " WHERE id IN(#ids#)"
)

language=SQL

View Source
const LAUNCHER_DB_DEBUG = false
View Source
const MAX_API_JOBS = 50000
View Source
const MAX_TIME_DISCREPANCY = 3

Variables

This section is empty.

Functions

func APIAcceptTTJobs

func APIAcceptTTJobs(jobs []*thunder.RequestAddJobsJobT) ([]uint64, error)

func APILogFinish

func APILogFinish(hostname string, runId uint64, prevStatus string, success bool) (string, error)

string is json-encode'd run queue entry row

func APIUpdateRunStatus

func APIUpdateRunStatus(hostname string, runId uint64, prevStatus, status string) error

func GenerateJobsCycle

func GenerateJobsCycle()

func GetKillerThreadsList

func GetKillerThreadsList() []string

func GetLauncherThreadsList

func GetLauncherThreadsList() []string

func SelectRunQueue

func SelectRunQueue() (map[string][]*RunQueueEntry, error)

func Setup

func Setup(config common.FullConfig)

func WriteLogsThread

func WriteLogsThread(filename string)

Types

type DebugPrintRequest

type DebugPrintRequest struct {
	Waiting bool
	Added   bool

	RespCh chan *JobGenState
}

type DispatcherData

type DispatcherData struct {
	// contains filtered or unexported fields
}

type DispatcherThreadDescr

type DispatcherThreadDescr struct {
	ClassName string
	Location  string
}

func GetDispatcherThreadsList

func GetDispatcherThreadsList() []DispatcherThreadDescr

type FinishEvent

type FinishEvent struct {
	// contains filtered or unexported fields
}

type FinishResult

type FinishResult struct {
	Id           uint64              `json:"id"`
	TimetableId  uint64              `json:"timetable_id"`
	ClassName    string              `json:"class_name"`
	Hostname     string              `json:"hostname"`
	Success      bool                `json:"success"`
	PrevStatus   string              `json:"prev_status"`
	Rusage       FinishResultRusage  `json:"rusage"`
	ProfilingUrl string              `json:"profiling_url"`
	Initial      bool                `json:"initial"`
	Timestamp    uint64              `json:"timestamp"`
	RunInfo      FinishResultRunInfo `json:"run_info"`
}

type FinishResultRunInfo

type FinishResultRunInfo struct {
	Id                uint64      `json:"id"`
	TimetableId       uint64      `json:"timetable_id"`
	GenerationId      uint64      `json:"generation_id"`
	Hostname          string      `json:"hostname"`
	HostnameIdx       uint32      `json:"hostname_idx"`
	ClassName         string      `json:"class_name"`
	JobData           string      `json:"job_data"`
	Method            string      `json:"method"`
	RunStatus         string      `json:"run_status"`
	Created           interface{} `json:"created"`
	WaitingTs         interface{} `json:"waiting_ts"`
	ShouldInitTs      interface{} `json:"should_init_ts"`
	InitAttempts      uint32      `json:"init_attempts"`
	InitTs            interface{} `json:"init_ts"`
	RunningTs         interface{} `json:"running_ts"`
	MaxFinishedTs     interface{} `json:"max_finished_ts"`
	FinishedTs        interface{} `json:"finished_ts"`
	StoppedEmployeeId int64       `json:"stopped_employee_id"`
	Token             string      `json:"token"`
	RetryAttempt      uint32      `json:"retry_attempt"`
	SettingsId        uint64      `json:"settings_id"`
}

type FinishResultRusage

type FinishResultRusage struct {
	UserTime  float64 `json:"user_time"`
	SysTime   float64 `json:"sys_time"`
	RealTime  float64 `json:"real_time"`
	MaxMemory uint64  `json:"max_memory"`
}

type FlagEntry

type FlagEntry struct {
	// contains filtered or unexported fields
}

type JobGenState

type JobGenState struct {
	Added   []*TimetableEntry
	Waiting []*TimetableEntry

	RawResp string
}

func GetDispatcherJobs

func GetDispatcherJobs(className, location string) (*JobGenState, error)

type JobInfoEntry

type JobInfoEntry struct {
	// contains filtered or unexported fields
}

type Jobs

type Jobs struct {
	Type             string
	Min              int
	Max              int
	Have_finish_jobs bool
	Temporary        bool
}

type JobsCountRequest

type JobsCountRequest struct {
	RespCh chan int
}

type KillRequest

type KillRequest struct {
	ResCh chan error // response for request (success or failure)
}

type LauncherData

type LauncherData struct {
	// contains filtered or unexported fields
}

type LauncherDebugPrintRequest

type LauncherDebugPrintRequest struct {
	RespCh chan *LauncherState
}

type LauncherLogFinishRequest

type LauncherLogFinishRequest struct {
	Hostname   string
	RunId      uint64
	PrevStatus string
	Success    bool
	// contains filtered or unexported fields
}

type LauncherLogFinishResponse

type LauncherLogFinishResponse struct {
	// contains filtered or unexported fields
}

type LauncherState

type LauncherState struct {
	Waiting  []*RunQueueEntry
	Init     []*RunQueueEntry
	Running  []*RunQueueEntry
	Finished []*RunQueueEntry

	RawResp string
}

func GetLauncherJobs

func GetLauncherJobs(hostname string) (*LauncherState, error)

type LauncherUpdateStatusRequest

type LauncherUpdateStatusRequest struct {
	Hostname   string
	RunId      uint64
	PrevStatus string
	Status     string
	// contains filtered or unexported fields
}

type LoadStateFunc

type LoadStateFunc struct {
	// contains filtered or unexported fields
}

type NewJobs

type NewJobs struct {
	// contains filtered or unexported fields
}

type NextGenParams

type NextGenParams struct {
	Location string
	JobInfo  *JobInfoEntry
}

type NextTsCallback

type NextTsCallback struct {
	Callback string
	Settings map[string]string
}

type RunQueueEntry

type RunQueueEntry struct {
	Id        uint64
	ClassName string

	JobData string

	RunStatus string
	// contains filtered or unexported fields
}

type ScriptEntry

type ScriptEntry struct {
	// contains filtered or unexported fields
}

type ScriptRusageEntry

type ScriptRusageEntry struct {
	// contains filtered or unexported fields
}

type ScriptSettings

type ScriptSettings struct {
	// contains filtered or unexported fields
}

type ServerInfo

type ServerInfo struct {
	// contains filtered or unexported fields
}

func (*ServerInfo) String

func (entry *ServerInfo) String() string

type TTPriorityQueue

type TTPriorityQueue []*TimetableEntry

TTPriorityQueue implements heap.Interface and holds Items.

func (TTPriorityQueue) Len

func (pq TTPriorityQueue) Len() int

func (TTPriorityQueue) Less

func (pq TTPriorityQueue) Less(i, j int) bool

func (*TTPriorityQueue) Pop

func (pq *TTPriorityQueue) Pop() interface{}

func (*TTPriorityQueue) Push

func (pq *TTPriorityQueue) Push(x interface{})

func (TTPriorityQueue) Swap

func (pq TTPriorityQueue) Swap(i, j int)

type TimetableEntry

type TimetableEntry struct {
	JobData string

	NextLaunchTs sql.NullInt64
	// contains filtered or unexported fields
}

func (*TimetableEntry) String1

func (p *TimetableEntry) String1() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL