jobworker

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2020 License: MIT Imports: 9 Imported by: 0

README

jobworker

DESCRIPTION

Package jobworker provides a generic interface around message queue.

The jobworker package must be used in conjunction with some message queue connector.

list of connectors:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyStarted        = errors.New("already started")
	ErrQueueSettingsRequired = errors.New("queue settings required")
	ErrNoJob                 = errors.New("no job")
)
View Source
var ErrJobDuplicationDetected = fmt.Errorf("job duplication detected")
View Source
var (
	ErrPrimaryConnIsRequired = errors.New("primary conn is required")
)

Functions

func Register

func Register(name string, driver Driver)

Types

type ChangeJobVisibilityInput

type ChangeJobVisibilityInput struct {
	Job               *Job
	VisibilityTimeout int64
}

type ChangeJobVisibilityOutput

type ChangeJobVisibilityOutput struct{}

type CompleteJobInput

type CompleteJobInput struct {
	Job *Job
}

type CompleteJobOutput

type CompleteJobOutput struct{}

type Connector

type Connector interface {
	GetName() string

	ReceiveJobs(ctx context.Context, ch chan<- *Job, input *ReceiveJobsInput, opts ...func(*Option)) (*ReceiveJobsOutput, error)
	EnqueueJob(ctx context.Context, input *EnqueueJobInput, opts ...func(*Option)) (*EnqueueJobOutput, error)
	EnqueueJobBatch(ctx context.Context, input *EnqueueJobBatchInput, opts ...func(*Option)) (*EnqueueJobBatchOutput, error)
	CompleteJob(ctx context.Context, input *CompleteJobInput, opts ...func(*Option)) (*CompleteJobOutput, error)
	FailJob(ctx context.Context, input *FailJobInput, opts ...func(*Option)) (*FailJobOutput, error)
	ChangeJobVisibility(ctx context.Context, input *ChangeJobVisibilityInput, opts ...func(*Option)) (*ChangeJobVisibilityOutput, error)

	CreateQueue(ctx context.Context, input *CreateQueueInput, opts ...func(*Option)) (*CreateQueueOutput, error)
	UpdateQueue(ctx context.Context, input *UpdateQueueInput, opts ...func(*Option)) (*UpdateQueueOutput, error)

	RedriveJob(ctx context.Context, input *RedriveJobInput, opts ...func(*Option)) (*RedriveJobOutput, error)

	Close() error

	SetLogger(logger Logger)
}

func Open

func Open(driverName string, attrs map[string]interface{}) (Connector, error)

type ConnectorProvider

type ConnectorProvider struct {
	// contains filtered or unexported fields
}

func (*ConnectorProvider) Close

func (p *ConnectorProvider) Close()

func (*ConnectorProvider) GetConnectorsInPriorityOrder

func (p *ConnectorProvider) GetConnectorsInPriorityOrder() []Connector

func (*ConnectorProvider) IsDead

func (p *ConnectorProvider) IsDead(conn Connector) bool

func (*ConnectorProvider) MarkDead

func (p *ConnectorProvider) MarkDead(conn Connector)

func (*ConnectorProvider) Register

func (p *ConnectorProvider) Register(priority int, conn Connector)

func (*ConnectorProvider) SetRetrySeconds

func (p *ConnectorProvider) SetRetrySeconds(sec time.Duration)

type CreateQueueInput

type CreateQueueInput struct {
	Name       string
	Attributes map[string]interface{}
}

type CreateQueueOutput

type CreateQueueOutput struct{}

type Driver

type Driver interface {
	Open(attrs map[string]interface{}) (Connector, error)
}

type EnqueueJobBatchInput

type EnqueueJobBatchInput struct {
	Queue      string
	Id2Payload map[string]*Payload
}

type EnqueueJobBatchOutput

type EnqueueJobBatchOutput struct {
	Failed     []string // ID
	Successful []string // ID
}

type EnqueueJobInput

type EnqueueJobInput struct {
	Queue   string
	Payload *Payload
}

type EnqueueJobOutput

type EnqueueJobOutput struct {
	JobID string
}

type FailJobInput

type FailJobInput struct {
	Job *Job
}

type FailJobOutput

type FailJobOutput struct{}

type Job

type Job struct {
	Queue string

	SecID           uint64
	JobID           string
	Class           string
	ReceiptID       string
	Args            string
	DeduplicationID string
	GroupID         string
	InvisibleUntil  int64
	RetryCount      int64
	EnqueueAt       int64

	LoggerFunc func(...interface{})
	// contains filtered or unexported fields
}

func NewJob

func NewJob(queue string,
	jobID string,
	class string,
	receiptID string,
	args string,
	deduplicationID string,
	groupID string,
	invisibleUntil int64,
	retryCount int64,
	enqueueAt int64,
	conn Connector,
) *Job

func (*Job) Complete

func (j *Job) Complete(ctx context.Context) error

func (*Job) ExtendVisibility

func (j *Job) ExtendVisibility(ctx context.Context, visibilityTimeout int64) error

func (*Job) Fail

func (j *Job) Fail(ctx context.Context) error

func (*Job) GetConnName

func (j *Job) GetConnName() string

func (*Job) SetLoggerFunc

func (j *Job) SetLoggerFunc(f func(...interface{}))

type JobWorker

type JobWorker struct {
	// contains filtered or unexported fields
}

func New

func New(s *Setting) (*JobWorker, error)

func (*JobWorker) EnqueueJob

func (jw *JobWorker) EnqueueJob(ctx context.Context, input *EnqueueJobInput) error

func (*JobWorker) EnqueueJobBatch

func (jw *JobWorker) EnqueueJobBatch(ctx context.Context, input *EnqueueJobBatchInput) error

func (*JobWorker) Register

func (jw *JobWorker) Register(class string, worker Worker) bool

func (*JobWorker) RegisterFunc

func (jw *JobWorker) RegisterFunc(class string, f WorkerFunc) bool

func (*JobWorker) RegisterOnShutdown

func (jw *JobWorker) RegisterOnShutdown(f func())

func (*JobWorker) Shutdown

func (jw *JobWorker) Shutdown(ctx context.Context) error

func (*JobWorker) Work

func (jw *JobWorker) Work(s *WorkSetting) error

type Logger

type Logger interface {
	Debug(...interface{})
}

type Option

type Option struct {
}

func (*Option) ApplyOptions

func (o *Option) ApplyOptions(opts ...func(*Option))

type Payload

type Payload struct {
	Class           string
	Args            string
	DelaySeconds    int64
	DeduplicationID string
	GroupID         string
}

type ReceiveJobsInput

type ReceiveJobsInput struct {
	Queue string
}

type ReceiveJobsOutput

type ReceiveJobsOutput struct {
	NoJob bool
}

type RedriveJobInput

type RedriveJobInput struct {
	From         string
	To           string
	Target       string
	DelaySeconds int64
}

type RedriveJobOutput

type RedriveJobOutput struct{}

type Setting

type Setting struct {
	Primary   Connector
	Secondary Connector

	DeadConnectorRetryInterval int64 // Seconds

	Logger Logger
}

type UpdateQueueInput

type UpdateQueueInput struct {
	Name       string
	Attributes map[string]interface{}
}

type UpdateQueueOutput

type UpdateQueueOutput struct{}

type WorkSetting

type WorkSetting struct {
	HeartbeatInterval     int64
	OnHeartBeat           func(job *Job)
	WorkerConcurrency     int
	Queue2PollingInterval map[string]int64 // key: queue name, value; polling interval (seconds)
}

type Worker

type Worker interface {
	Work(*Job) error
}

type WorkerFunc

type WorkerFunc func(*Job) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL