Documentation ¶
Index ¶
- func Wrap(j interface{}) job.Interface
- type DeDuplicator
- type GoCraftWorkPool
- func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error
- func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error)
- func (gcwp *GoCraftWorkPool) GetJobStats(jobID string) (models.JobStats, error)
- func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (interface{}, bool)
- func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error)
- func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error
- func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error
- func (gcwp *GoCraftWorkPool) RegisterJobs(jobs map[string]interface{}) error
- func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error
- func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, ...) (models.JobStats, error)
- func (gcwp *GoCraftWorkPool) Start() error
- func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error)
- func (gcwp *GoCraftWorkPool) StopJob(jobID string) error
- func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params map[string]interface{}) error
- type Interface
- type MessageServer
- type RedisDeDuplicator
- type RedisJob
- type RedisPoolContext
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type DeDuplicator ¶
type DeDuplicator interface { // Check the uniqueness of the unique job and set the unique flag if it is not set yet. // // Parameters: // jobName string : name of the job // params models.Parameters : parameters of the job // // Returns: // If no unique flag and successfully set it, a nil error is returned; // otherwise, a non nil error is returned. Unique(jobName string, params models.Parameters) error // Remove the unique flag after job exiting // Parameters: // jobName string : name of the job // params models.Parameters : parameters of the job // // Returns: // If unique flag is successfully removed, a nil error is returned; // otherwise, a non nil error is returned. DelUniqueSign(jobName string, params models.Parameters) error }
DeDuplicator is designed to handle the uniqueness of the job. Once a job is declared to be unique, the job can be enqueued only if no same job (same job name and parameters) in the queue or running in progress. Adopt the same unique mechanism with the upstream framework.
type GoCraftWorkPool ¶
type GoCraftWorkPool struct {
// contains filtered or unexported fields
}
GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
func NewGoCraftWorkPool ¶
func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, redisPool *redis.Pool) *GoCraftWorkPool
NewGoCraftWorkPool is constructor of goCraftWorkPool. 创建一个 work pool 所需要的东西,任务队列,访问 redis 的客户端 调度器,日志清理器,消息通知服务器,
func (*GoCraftWorkPool) CancelJob ¶
func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error
CancelJob will cancel the job
func (*GoCraftWorkPool) Enqueue ¶
func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error)
Enqueue job
func (*GoCraftWorkPool) GetJobStats ¶
func (gcwp *GoCraftWorkPool) GetJobStats(jobID string) (models.JobStats, error)
GetJobStats return the job stats of the specified enqueued job.
func (*GoCraftWorkPool) IsKnownJob ¶
func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (interface{}, bool)
IsKnownJob ...
func (*GoCraftWorkPool) PeriodicallyEnqueue ¶
func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error)
PeriodicallyEnqueue job
func (*GoCraftWorkPool) RegisterHook ¶
func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error
RegisterHook registers status hook url sync method
func (*GoCraftWorkPool) RegisterJob ¶
func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error
RegisterJob is used to register the job to the pool. j is the type of job 在 RegisterJob 时 就已经开始执行 job 了
func (*GoCraftWorkPool) RegisterJobs ¶
func (gcwp *GoCraftWorkPool) RegisterJobs(jobs map[string]interface{}) error
RegisterJobs is used to register multiple jobs to pool.
func (*GoCraftWorkPool) RetryJob ¶
func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error
RetryJob retry the job
func (*GoCraftWorkPool) Schedule ¶
func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error)
Schedule job
func (*GoCraftWorkPool) Start ¶
func (gcwp *GoCraftWorkPool) Start() error
Start to serve Unblock action
func (*GoCraftWorkPool) Stats ¶
func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error)
Stats of pool
func (*GoCraftWorkPool) StopJob ¶
func (gcwp *GoCraftWorkPool) StopJob(jobID string) error
StopJob will stop the job
func (*GoCraftWorkPool) ValidateJobParameters ¶
func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params map[string]interface{}) error
ValidateJobParameters ...
type Interface ¶
type Interface interface { // Start to serve // // Return: // error if failed to start Start() error // Register job to the pool. // // name string : job name for referring // job interface{}: job handler which must implement the job.Interface. // // Return: // error if failed to register // 在注册到 pool 的同时,也在开始执行 job RegisterJob(name string, job interface{}) error // Register multiple jobs. // // jobs map[string]interface{}: job map, key is job name and value is job handler. // // Return: // error if failed to register RegisterJobs(jobs map[string]interface{}) error // Enqueue job // // jobName string : the name of enqueuing job // params models.Parameters : parameters of enqueuing job // isUnique bool : specify if duplicated job will be discarded // // Returns: // models.JobStats: the stats of enqueuing job if succeed // error : if failed to enqueue Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error) // Schedule job to run after the specified interval (seconds). // // jobName string : the name of enqueuing job // runAfterSeconds uint64 : the waiting interval with seconds // params models.Parameters : parameters of enqueuing job // isUnique bool : specify if duplicated job will be discarded // // Returns: // models.JobStats: the stats of enqueuing job if succeed // error : if failed to enqueue // 调度 job 的执行 Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error) // Schedule the job periodically running. // // jobName string : the name of enqueuing job // params models.Parameters : parameters of enqueuing job // cronSetting string : the periodic duration with cron style like '0 * * * * *' // // Returns: // models.JobStats: the stats of enqueuing job if succeed // error : if failed to enqueue // 调度 job 周期性执行 PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error) // Return the status info of the pool. // // Returns: // models.JobPoolStats : the stats info of all running pools // error : failed to check // 获取 worker pool 的信息 Stats() (models.JobPoolStats, error) // Check if the job has been already registered. // // name string : name of job // // Returns: // interface{} : the job type of the known job if it's existing // bool : if the known job requires parameters // 检测 job 是否在 worker pool 中注册 IsKnownJob(name string) (interface{}, bool) // Validate the parameters of the known job // // jobType interface{} : type of known job // params map[string]interface{} : parameters of known job // // Return: // error if parameters are not valid // 验证 job 的参数 ValidateJobParameters(jobType interface{}, params map[string]interface{}) error // Get the stats of the specified job // // jobID string : ID of the enqueued job // // Returns: // models.JobStats : job stats data // error : error returned if meet any problems GetJobStats(jobID string) (models.JobStats, error) // Stop the job // // jobID string : ID of the enqueued job // // Return: // error : error returned if meet any problems StopJob(jobID string) error // Cancel the job // // jobID string : ID of the enqueued job // // Return: // error : error returned if meet any problems CancelJob(jobID string) error // Retry the job // // jobID string : ID of the enqueued job // // Return: // error : error returned if meet any problems RetryJob(jobID string) error // Register hook // // jobID string : ID of job // hookURL string : the hook url // // Return: // error : error returned if meet any problems RegisterHook(jobID string, hookURL string) error }
Interface for worker pool. More like a driver to transparent the lower queue.
type MessageServer ¶
type MessageServer struct {
// contains filtered or unexported fields
}
MessageServer implements the sub/pub mechanism via redis to do async message exchanging. 订阅/发布模式,实现了消息的异步交换 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式
func NewMessageServer ¶
NewMessageServer creates a new ptr of MessageServer
func (*MessageServer) Subscribe ¶
func (ms *MessageServer) Subscribe(event string, callback interface{}) error
Subscribe event with specified callback 订阅具体事件,当事件发生时调用 指定的回调函数 callback
type RedisDeDuplicator ¶
type RedisDeDuplicator struct {
// contains filtered or unexported fields
}
RedisDeDuplicator implement the DeDuplicator interface based on redis.
func NewRedisDeDuplicator ¶
func NewRedisDeDuplicator(ns string, pool *redis.Pool) *RedisDeDuplicator
NewRedisDeDuplicator is constructor of RedisDeDuplicator
func (*RedisDeDuplicator) DelUniqueSign ¶
func (rdd *RedisDeDuplicator) DelUniqueSign(jobName string, params models.Parameters) error
DelUniqueSign delete the job unique sign
func (*RedisDeDuplicator) Unique ¶
func (rdd *RedisDeDuplicator) Unique(jobName string, params models.Parameters) error
Unique checks if the job is unique and set unique flag if it is not set yet.
type RedisJob ¶
type RedisJob struct {
// contains filtered or unexported fields
}
RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
func NewRedisJob ¶
func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager, deDuplicator DeDuplicator) *RedisJob
NewRedisJob is constructor of RedisJob
type RedisPoolContext ¶
type RedisPoolContext struct{}
RedisPoolContext ... We did not use this context to pass context info so far, just a placeholder.