job

package
v0.0.0-...-820a931 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: AGPL-3.0 Imports: 39 Imported by: 31

Documentation

Overview

Package job is for the scheduling and execution of asynchronous jobs via the workers. The scheduling is done via the triggers. The jobs are put in queues before being processed by a worker.

Index

Constants

View Source
const DocTypeVersionTrigger = "1"

DocTypeVersionTrigger represents the doctype version. Each time this document structure is modified, update this value

View Source
const SchedKey = "scheduling"

SchedKey is the the key of the sorted set in redis used for triggers currently being executed

View Source
const TriggersKey = "triggers"

TriggersKey is the the key of the sorted set in redis used for triggers waiting to be activated

Variables

View Source
var (
	// ErrClosed is using a closed system
	ErrClosed = errors.New("jobs: closed")
	// ErrNotFoundJob is used when the job could not be found
	ErrNotFoundJob = errors.New("jobs: not found")
	// ErrQueueClosed is used to indicate the queue is closed
	ErrQueueClosed = errors.New("jobs: queue is closed")
	// ErrUnknownWorker the asked worker does not exist
	ErrUnknownWorker = errors.New("jobs: could not find worker")
	// ErrMessageNil is used for an nil message
	ErrMessageNil = errors.New("jobs: message is nil")
	// ErrMessageUnmarshal is used when unmarshalling a message causes an error
	ErrMessageUnmarshal = errors.New("jobs: message unmarshal")
	// ErrAbort can be used to abort the execution of the job without causing
	// errors.
	ErrAbort = errors.New("jobs: abort")

	// ErrUnknownTrigger is used when the trigger type is not recognized
	ErrUnknownTrigger = errors.New("Unknown trigger type")
	// ErrNotFoundTrigger is used when the trigger was not found
	ErrNotFoundTrigger = errors.New("Trigger with specified ID does not exist")
	// ErrMalformedTrigger is used to indicate the trigger is unparsable
	ErrMalformedTrigger = echo.NewHTTPError(http.StatusBadRequest, "Trigger unparsable")
	// ErrNotCronTrigger is used when a @cron trigger is expected, but it is
	// not the case
	ErrNotCronTrigger = errors.New("Invalid type for trigger (@cron expected)")
)

Functions

func AddWorker

func AddWorker(conf *WorkerConfig)

AddWorker adds a new worker to global list of available workers.

func GetCounterTypeFromWorkerType

func GetCounterTypeFromWorkerType(workerType string) (limits.CounterType, error)

GetCounterTypeFromWorkerType returns the CounterTypeFromWorkerType

func GetWorkersNamesList

func GetWorkersNamesList() []string

GetWorkersNamesList returns the names of the configured workers

func SetRedisTimeoutForTest

func SetRedisTimeoutForTest()

SetRedisTimeoutForTest is used by unit test to avoid waiting 10 seconds on cleanup.

func SystemStart

func SystemStart(b Broker, s Scheduler, workersList WorkersList) error

SystemStart initializes and starts the global jobs system with the given broker, scheduler instances and workers list.

Types

type AtTrigger

type AtTrigger struct {
	*TriggerInfos
	// contains filtered or unexported fields
}

AtTrigger implements the @at trigger type. It schedules a job at a specified time in the future.

func NewAtTrigger

func NewAtTrigger(infos *TriggerInfos) (*AtTrigger, error)

NewAtTrigger returns a new instance of AtTrigger given the specified options.

func NewInTrigger

func NewInTrigger(infos *TriggerInfos) (*AtTrigger, error)

NewInTrigger returns a new instance of AtTrigger given the specified options as @in.

func (*AtTrigger) CombineRequest

func (a *AtTrigger) CombineRequest() string

CombineRequest implements the CombineRequest method of the Trigger interface.

func (*AtTrigger) Infos

func (a *AtTrigger) Infos() *TriggerInfos

Infos implements the Infos method of the Trigger interface.

func (*AtTrigger) Schedule

func (a *AtTrigger) Schedule() <-chan *JobRequest

Schedule implements the Schedule method of the Trigger interface.

func (*AtTrigger) Type

func (a *AtTrigger) Type() string

Type implements the Type method of the Trigger interface.

func (*AtTrigger) Unschedule

func (a *AtTrigger) Unschedule()

Unschedule implements the Unschedule method of the Trigger interface.

type BadTriggerError

type BadTriggerError struct {
	Err error
}

BadTriggerError is an error conveying the information of a trigger that is not valid, and could be deleted.

func (BadTriggerError) Error

func (e BadTriggerError) Error() string

type Broker

type Broker interface {
	StartWorkers(workersList WorkersList) error
	ShutdownWorkers(ctx context.Context) error

	// PushJob will push try to push a new job from the specified job request.
	// This method is asynchronous.
	PushJob(db prefixer.Prefixer, request *JobRequest) (*Job, error)

	// WorkerQueueLen returns the total element in the queue of the specified
	// worker type.
	WorkerQueueLen(workerType string) (int, error)
	// WorkerIsReserved returns true if the given worker type is reserved
	// (ie clients should not push jobs to it, only the stack).
	WorkerIsReserved(workerType string) (bool, error)
	// WorkersTypes returns the list of registered workers types.
	WorkersTypes() []string
}

Broker interface is used to represent a job broker associated to a particular domain. A broker can be used to create jobs that are pushed in the job system.

This interface is matched by several implementations: - BrokerMock a mock implementation used for the tests.

func NewMemBroker

func NewMemBroker() Broker

NewMemBroker creates a new in-memory broker system.

The in-memory implementation of the job system has the specifity that workers are actually launched by the broker at its creation.

func NewRedisBroker

func NewRedisBroker(client redis.UniversalClient) Broker

NewRedisBroker creates a new broker that will use redis to distribute the jobs among several cozy-stack processes.

type BrokerMock

type BrokerMock struct {
	mock.Mock
}

BrokerMock is a mock implementation of Broker.

func NewBrokerMock

func NewBrokerMock(t *testing.T) *BrokerMock

func (*BrokerMock) PushJob

func (m *BrokerMock) PushJob(db prefixer.Prefixer, request *JobRequest) (*Job, error)

PushJob mock method.

func (*BrokerMock) ShutdownWorkers

func (m *BrokerMock) ShutdownWorkers(ctx context.Context) error

ShutdownWorkers mock method.

func (*BrokerMock) StartWorkers

func (m *BrokerMock) StartWorkers(workersList WorkersList) error

StartWorkers mock method.

func (*BrokerMock) WorkerIsReserved

func (m *BrokerMock) WorkerIsReserved(workerType string) (bool, error)

WorkerIsReserved mock method.

func (*BrokerMock) WorkerQueueLen

func (m *BrokerMock) WorkerQueueLen(workerType string) (int, error)

WorkerQueueLen mock method.

func (*BrokerMock) WorkersTypes

func (m *BrokerMock) WorkersTypes() []string

WorkersTypes mock method.

type ClientTrigger

type ClientTrigger struct {
	*TriggerInfos
}

ClientTrigger implements the @webhook triggers. It schedules a job when an HTTP request is made at this webhook.

func NewClientTrigger

func NewClientTrigger(infos *TriggerInfos) (*ClientTrigger, error)

NewClientTrigger returns a new instance of ClientTrigger.

func (*ClientTrigger) CombineRequest

func (c *ClientTrigger) CombineRequest() string

CombineRequest implements the CombineRequest method of the Trigger interface.

func (*ClientTrigger) Infos

func (c *ClientTrigger) Infos() *TriggerInfos

Infos implements the Infos method of the Trigger interface.

func (*ClientTrigger) Schedule

func (c *ClientTrigger) Schedule() <-chan *JobRequest

Schedule implements the Schedule method of the Trigger interface.

func (*ClientTrigger) Type

func (c *ClientTrigger) Type() string

Type implements the Type method of the Trigger interface.

func (*ClientTrigger) Unschedule

func (c *ClientTrigger) Unschedule()

Unschedule implements the Unschedule method of the Trigger interface.

type CronTrigger

type CronTrigger struct {
	*TriggerInfos
	// contains filtered or unexported fields
}

CronTrigger implements the @cron trigger type. It schedules recurring jobs with the weird but very used Cron syntax.

func NewCronTrigger

func NewCronTrigger(infos *TriggerInfos) (*CronTrigger, error)

NewCronTrigger returns a new instance of CronTrigger given the specified options.

func NewDailyTrigger

func NewDailyTrigger(infos *TriggerInfos) (*CronTrigger, error)

NewDailyTrigger returns a new instance of CronTrigger given the specified options as @daily. It will take a random hour in the possible range to spread the triggers from the same app manifest.

func NewEveryTrigger

func NewEveryTrigger(infos *TriggerInfos) (*CronTrigger, error)

NewEveryTrigger returns a new instance of CronTrigger given the specified options as @every.

func NewHourlyTrigger

func NewHourlyTrigger(infos *TriggerInfos) (*CronTrigger, error)

NewHourlyTrigger returns a new instance of CronTrigger given the specified options as @hourly. It will take a random minute in the possible range to spread the triggers from the same app manifest.

func NewMonthlyTrigger

func NewMonthlyTrigger(infos *TriggerInfos) (*CronTrigger, error)

NewMonthlyTrigger returns a new instance of CronTrigger given the specified options as @monthly. It will take a random day/hour in the possible range to spread the triggers from the same app manifest.

func NewWeeklyTrigger

func NewWeeklyTrigger(infos *TriggerInfos) (*CronTrigger, error)

NewWeeklyTrigger returns a new instance of CronTrigger given the specified options as @weekly. It will take a random day/hour in the possible range to spread the triggers from the same app manifest.

func (*CronTrigger) CombineRequest

func (c *CronTrigger) CombineRequest() string

CombineRequest implements the CombineRequest method of the Trigger interface.

func (*CronTrigger) Infos

func (c *CronTrigger) Infos() *TriggerInfos

Infos implements the Infos method of the Trigger interface.

func (*CronTrigger) NextExecution

func (c *CronTrigger) NextExecution(last time.Time) time.Time

NextExecution returns the next time when a job should be fired for this trigger

func (*CronTrigger) Schedule

func (c *CronTrigger) Schedule() <-chan *JobRequest

Schedule implements the Schedule method of the Trigger interface.

func (*CronTrigger) Type

func (c *CronTrigger) Type() string

Type implements the Type method of the Trigger interface.

func (*CronTrigger) Unschedule

func (c *CronTrigger) Unschedule()

Unschedule implements the Unschedule method of the Trigger interface.

type DumpFilePather

type DumpFilePather struct{}

DumpFilePather is a struct made for calling the Path method of a FileDoc and relying on the cached fullpath of this document (not trying to rebuild it)

func (DumpFilePather) FilePath

func (d DumpFilePather) FilePath(doc *vfs.FileDoc) (string, error)

FilePath only returns an error saying to not call this method

type Event

type Event json.RawMessage

Event is a json encoded value of a realtime.Event.

func NewEvent

func NewEvent(data *realtime.Event) (Event, error)

NewEvent return a json encoded realtime.Event

func (Event) Unmarshal

func (e Event) Unmarshal(evt interface{}) error

Unmarshal can be used to unmarshal the encoded message value in the specified interface's type.

type EventTrigger

type EventTrigger struct {
	*TriggerInfos
	// contains filtered or unexported fields
}

EventTrigger implements Trigger for realtime triggered events

func NewEventTrigger

func NewEventTrigger(infos *TriggerInfos) (*EventTrigger, error)

NewEventTrigger returns a new instance of EventTrigger given the specified options.

func (*EventTrigger) CombineRequest

func (t *EventTrigger) CombineRequest() string

CombineRequest implements the CombineRequest method of the Trigger interface.

func (*EventTrigger) Infos

func (t *EventTrigger) Infos() *TriggerInfos

Infos implements the Infos method of the Trigger interface.

func (*EventTrigger) Schedule

func (t *EventTrigger) Schedule() <-chan *JobRequest

Schedule implements the Schedule method of the Trigger interface.

func (*EventTrigger) Type

func (t *EventTrigger) Type() string

Type implements the Type method of the Trigger interface.

func (*EventTrigger) Unschedule

func (t *EventTrigger) Unschedule()

Unschedule implements the Unschedule method of the Trigger interface.

type FrequencyKind

type FrequencyKind int

FrequencyKind is used to tell if a periodic trigger is weekly or monthly.

const (
	MonthlyKind FrequencyKind = iota
	WeeklyKind
	DailyKind
	HourlyKind
)

type Job

type Job struct {
	JobID       string      `json:"_id,omitempty"`
	JobRev      string      `json:"_rev,omitempty"`
	Cluster     int         `json:"couch_cluster,omitempty"`
	Domain      string      `json:"domain"`
	Prefix      string      `json:"prefix,omitempty"`
	WorkerType  string      `json:"worker"`
	TriggerID   string      `json:"trigger_id,omitempty"`
	Message     Message     `json:"message"`
	Event       Event       `json:"event"`
	Payload     Payload     `json:"payload,omitempty"`
	Manual      bool        `json:"manual_execution,omitempty"`
	Debounced   bool        `json:"debounced,omitempty"`
	Options     *JobOptions `json:"options,omitempty"`
	State       State       `json:"state"`
	QueuedAt    time.Time   `json:"queued_at"`
	StartedAt   time.Time   `json:"started_at"`
	FinishedAt  time.Time   `json:"finished_at"`
	Error       string      `json:"error,omitempty"`
	ForwardLogs bool        `json:"forward_logs,omitempty"`
}

Job contains all the metadata informations of a Job. It can be marshalled in JSON.

func FilterByWorkerAndState

func FilterByWorkerAndState(jobs []*Job, workerType string, state State, limit int) []*Job

FilterByWorkerAndState filters a job slice by its workerType and State

func FilterJobsBeforeDate

func FilterJobsBeforeDate(jobs []*Job, date time.Time) []*Job

FilterJobsBeforeDate returns alls jobs queued before the specified date

func Get

func Get(db prefixer.Prefixer, jobID string) (*Job, error)

Get returns the informations about a job.

func GetAllJobs

func GetAllJobs(db prefixer.Prefixer) ([]*Job, error)

GetAllJobs returns the list of all the jobs on the given instance.

func GetJobs

func GetJobs(db prefixer.Prefixer, triggerID string, limit int) ([]*Job, error)

GetJobs returns the jobs launched by the given trigger.

func GetLastsJobs

func GetLastsJobs(jobs []*Job, workerType string) ([]*Job, error)

GetLastsJobs returns the N lasts job of each state for an instance/worker type pair

func GetQueuedJobs

func GetQueuedJobs(db prefixer.Prefixer, workerType string) ([]*Job, error)

GetQueuedJobs returns the list of jobs which states is "queued" or "running"

func NewJob

func NewJob(db prefixer.Prefixer, req *JobRequest) *Job

NewJob creates a new Job instance from a job request.

func (*Job) Ack

func (j *Job) Ack() error

Ack sets the job infos state to Done an sends the new job infos on the channel.

func (*Job) AckConsumed

func (j *Job) AckConsumed() error

AckConsumed sets the job infos state to Running an sends the new job infos on the channel.

func (*Job) Clone

func (j *Job) Clone() couchdb.Doc

Clone implements the couchdb.Doc interface

func (*Job) Create

func (j *Job) Create() error

Create creates the job in couchdb

func (*Job) DBCluster

func (j *Job) DBCluster() int

DBCluster implements the prefixer.Prefixer interface.

func (*Job) DBPrefix

func (j *Job) DBPrefix() string

DBPrefix implements the prefixer.Prefixer interface.

func (*Job) DocType

func (j *Job) DocType() string

DocType implements the couchdb.Doc interface

func (*Job) DomainName

func (j *Job) DomainName() string

DomainName implements the prefixer.Prefixer interface.

func (*Job) Fetch

func (j *Job) Fetch(field string) []string

Fetch implements the permission.Fetcher interface

func (*Job) ID

func (j *Job) ID() string

ID implements the couchdb.Doc interface

func (*Job) Logger

func (j *Job) Logger() *logger.Entry

Logger returns a logger associated with the job domain

func (*Job) Nack

func (j *Job) Nack(errorMessage string) error

Nack sets the job infos state to Errored, set the specified error has the error field and sends the new job infos on the channel.

func (*Job) Rev

func (j *Job) Rev() string

Rev implements the couchdb.Doc interface

func (*Job) SetID

func (j *Job) SetID(id string)

SetID implements the couchdb.Doc interface

func (*Job) SetRev

func (j *Job) SetRev(rev string)

SetRev implements the couchdb.Doc interface

func (*Job) Update

func (j *Job) Update() error

Update updates the job in couchdb

func (*Job) WaitUntilDone

func (j *Job) WaitUntilDone(db prefixer.Prefixer) error

WaitUntilDone will wait until the job is done. It will return an error if the job has failed. And there is a timeout (10 minutes).

type JobErrorCheckerHook

type JobErrorCheckerHook func(err error) bool

JobErrorCheckerHook is an optional method called at the beginning of the job execution to prevent a retry according to the previous error (specifically useful in the retries loop)

type JobOptions

type JobOptions struct {
	MaxExecCount int           `json:"max_exec_count"`
	Timeout      time.Duration `json:"timeout"`
}

JobOptions struct contains the execution properties of the jobs.

type JobRequest

type JobRequest struct {
	WorkerType  string
	TriggerID   string
	Trigger     Trigger
	Message     Message
	Event       Event
	Payload     Payload
	Manual      bool
	Debounced   bool
	ForwardLogs bool
	Options     *JobOptions
}

JobRequest struct is used to represent a new job request.

func (*JobRequest) DocType

func (jr *JobRequest) DocType() string

DocType implements the permission.Getter interface

func (*JobRequest) Fetch

func (jr *JobRequest) Fetch(field string) []string

Fetch implements the permission.Fetcher interface

func (*JobRequest) ID

func (jr *JobRequest) ID() string

ID implements the permission.Getter interface

type JobSystem

type JobSystem interface {
	Broker
	Scheduler
	utils.Shutdowner
}

JobSystem is a pair of broker, scheduler linked together.

func System

func System() JobSystem

System returns the global job system.

type Message

type Message json.RawMessage

Message is a json encoded job message.

func NewMessage

func NewMessage(data interface{}) (Message, error)

NewMessage returns a json encoded data

func (Message) MarshalJSON

func (m Message) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler on Message.

func (Message) Unmarshal

func (m Message) Unmarshal(msg interface{}) error

Unmarshal can be used to unmarshal the encoded message value in the specified interface's type.

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler on Message. It should be retro- compatible with the old Message representation { Data, Type }.

type Payload

type Payload json.RawMessage

Payload is a json encode value of a webhook payload.

func (Payload) Unmarshal

func (p Payload) Unmarshal(evt interface{}) error

Unmarshal can be used to unmarshal the encoded message value in the specified interface's type.

type PeriodicParser

type PeriodicParser struct{}

PeriodicParser can be used to parse @weekly and @monthly trigger arguments. It can parse a string like "on monday between 8am and 6pm".

func NewPeriodicParser

func NewPeriodicParser() PeriodicParser

NewPeriodicParser creates a PeriodicParser.

func (*PeriodicParser) Parse

func (p *PeriodicParser) Parse(frequency FrequencyKind, periodic string) (*PeriodicSpec, error)

Parse will transform a string like "on monday" to a PeriodicSpec, or will return an error if the format is not supported.

type PeriodicSpec

type PeriodicSpec struct {
	Frequency   FrequencyKind
	DaysOfMonth []int // empty for *, or a slice of acceptable days (1 to 31)
	DaysOfWeek  []int // a slice of acceptable days, from 0 for sunday to 6 for saturday
	AfterHour   int   // an hour between 0 and 23
	BeforeHour  int   // an hour between 1 and 24
}

PeriodicSpec is the result of a successful parsing

func NewPeriodicSpec

func NewPeriodicSpec() *PeriodicSpec

func (*PeriodicSpec) ToRandomCrontab

func (s *PeriodicSpec) ToRandomCrontab(seed string) string

ToRandomCrontab generates a crontab that verifies the PeriodicSpec. The values are taken randomly, and the random generator uses the given seed to allow stability for a trigger, ie a weekly trigger must always run on the same day at the same hour.

type Scheduler

type Scheduler interface {
	StartScheduler(broker Broker) error
	ShutdownScheduler(ctx context.Context) error
	PollScheduler(now int64) error
	AddTrigger(trigger Trigger) error
	GetTrigger(db prefixer.Prefixer, id string) (Trigger, error)
	UpdateMessage(db prefixer.Prefixer, trigger Trigger, message json.RawMessage) error
	UpdateCron(db prefixer.Prefixer, trigger Trigger, arguments string) error
	DeleteTrigger(db prefixer.Prefixer, id string) error
	GetAllTriggers(db prefixer.Prefixer) ([]Trigger, error)
	HasTrigger(db prefixer.Prefixer, infos TriggerInfos) bool
	CleanRedis() error
	RebuildRedis(db prefixer.Prefixer) error
}

Scheduler interface is used to represent a scheduler that is responsible to listen respond to triggers jobs requests and send them to the broker.

func NewMemScheduler

func NewMemScheduler() Scheduler

NewMemScheduler creates a new in-memory scheduler that will load all registered triggers and schedule their work.

func NewRedisScheduler

func NewRedisScheduler(client redis.UniversalClient) Scheduler

NewRedisScheduler creates a new scheduler that use redis to synchronize with other cozy-stack processes to schedule jobs.

type ShareGroupMessage

type ShareGroupMessage struct {
	ContactID       string           `json:"contact_id,omitempty"`
	GroupsAdded     []string         `json:"added,omitempty"`
	GroupsRemoved   []string         `json:"removed,omitempty"`
	BecomeInvitable bool             `json:"invitable,omitempty"`
	DeletedDoc      *couchdb.JSONDoc `json:"deleted_doc,omitempty"`
	RenamedGroup    *couchdb.JSONDoc `json:"renamed_group,omitempty"`
}

ShareGroupMessage is used for jobs on the share-group worker.

type ShareGroupTrigger

type ShareGroupTrigger struct {
	// contains filtered or unexported fields
}

func NewShareGroupTrigger

func NewShareGroupTrigger(broker Broker) *ShareGroupTrigger

func (*ShareGroupTrigger) Schedule

func (t *ShareGroupTrigger) Schedule()

func (*ShareGroupTrigger) Unschedule

func (t *ShareGroupTrigger) Unschedule()

type State

type State string

State represent the state of a job.

const (
	// Queued state
	Queued State = "queued"
	// Running state
	Running State = "running"
	// Done state
	Done State = "done"
	// Errored state
	Errored State = "errored"
)

type TaskContext

type TaskContext struct {
	context.Context
	Instance *instance.Instance
	// contains filtered or unexported fields
}

TaskContext is a context.Context passed to the worker for each task execution and contains specific values from the job.

func NewTaskContext

func NewTaskContext(workerID string, job *Job, inst *instance.Instance) (*TaskContext, context.CancelFunc)

NewTaskContext returns a context.Context usable by a worker.

func (*TaskContext) Cookie

func (c *TaskContext) Cookie() interface{}

Cookie returns the cookie associated with the worker context.

func (*TaskContext) ID

func (c *TaskContext) ID() string

ID returns a unique identifier for the worker context.

func (*TaskContext) Logger

func (c *TaskContext) Logger() logger.Logger

Logger return the logger associated with the worker context.

func (*TaskContext) Manual

func (c *TaskContext) Manual() bool

Manual returns if the job was started manually

func (*TaskContext) NoRetry

func (c *TaskContext) NoRetry() bool

NoRetry returns the no-retry flag.

func (*TaskContext) SetNoRetry

func (c *TaskContext) SetNoRetry()

SetNoRetry set the no-retry flag to prevent a retry on the next execution.

func (*TaskContext) TriggerID

func (c *TaskContext) TriggerID() (string, bool)

TriggerID returns the possible trigger identifier responsible for launching the job.

func (*TaskContext) UnmarshalEvent

func (c *TaskContext) UnmarshalEvent(v interface{}) error

UnmarshalEvent unmarshals the event contained in the worker context.

func (*TaskContext) UnmarshalMessage

func (c *TaskContext) UnmarshalMessage(v interface{}) error

UnmarshalMessage unmarshals the message contained in the worker context.

func (*TaskContext) UnmarshalPayload

func (c *TaskContext) UnmarshalPayload() (map[string]interface{}, error)

UnmarshalPayload unmarshals the payload contained in the worker context.

func (*TaskContext) WithCookie

func (c *TaskContext) WithCookie(cookie interface{}) *TaskContext

WithCookie returns a clone of the context with a new cookie value.

func (*TaskContext) WithTimeout

func (c *TaskContext) WithTimeout(timeout time.Duration) (*TaskContext, context.CancelFunc)

WithTimeout returns a clone of the context with a different deadline.

type ThumbnailTrigger

type ThumbnailTrigger struct {
	// contains filtered or unexported fields
}

func NewThumbnailTrigger

func NewThumbnailTrigger(broker Broker) *ThumbnailTrigger

func (*ThumbnailTrigger) Schedule

func (t *ThumbnailTrigger) Schedule()

func (*ThumbnailTrigger) Unschedule

func (t *ThumbnailTrigger) Unschedule()

type Trigger

type Trigger interface {
	prefixer.Prefixer
	permission.Fetcher
	Type() string
	Infos() *TriggerInfos
	// Schedule should return a channel on which the trigger can send job
	// requests when it decides to.
	Schedule() <-chan *JobRequest
	// Unschedule should be used to clean the trigger states and should close
	// the returns jobs channel.
	Unschedule()
	CombineRequest() string
}

Trigger interface is used to represent a trigger.

func NewTrigger

func NewTrigger(db prefixer.Prefixer, infos TriggerInfos, data interface{}) (Trigger, error)

NewTrigger creates the trigger associates with the specified trigger options.

type TriggerInfos

type TriggerInfos struct {
	TID          string                 `json:"_id,omitempty"`
	TRev         string                 `json:"_rev,omitempty"`
	Cluster      int                    `json:"couch_cluster,omitempty"`
	Domain       string                 `json:"domain"`
	Prefix       string                 `json:"prefix,omitempty"`
	Type         string                 `json:"type"`
	WorkerType   string                 `json:"worker"`
	Arguments    string                 `json:"arguments"`
	Debounce     string                 `json:"debounce"`
	Options      *JobOptions            `json:"options"`
	Message      Message                `json:"message"`
	CurrentState *TriggerState          `json:"current_state,omitempty"`
	Metadata     *metadata.CozyMetadata `json:"cozyMetadata,omitempty"`
}

TriggerInfos is a struct containing all the options of a trigger.

func (*TriggerInfos) Clone

func (t *TriggerInfos) Clone() couchdb.Doc

Clone implements the couchdb.Doc interface

func (*TriggerInfos) DBCluster

func (t *TriggerInfos) DBCluster() int

DBCluster implements the prefixer.Prefixer interface.

func (*TriggerInfos) DBPrefix

func (t *TriggerInfos) DBPrefix() string

DBPrefix implements the prefixer.Prefixer interface.

func (*TriggerInfos) DocType

func (t *TriggerInfos) DocType() string

DocType implements the couchdb.Doc interface

func (*TriggerInfos) DomainName

func (t *TriggerInfos) DomainName() string

DomainName implements the prefixer.Prefixer interface.

func (*TriggerInfos) Fetch

func (t *TriggerInfos) Fetch(field string) []string

Fetch implements the permission.Fetcher interface

func (*TriggerInfos) ID

func (t *TriggerInfos) ID() string

ID implements the couchdb.Doc interface

func (*TriggerInfos) IsKonnectorTrigger

func (t *TriggerInfos) IsKonnectorTrigger() bool

func (*TriggerInfos) JobRequest

func (t *TriggerInfos) JobRequest() *JobRequest

JobRequest returns a job request associated with the scheduler informations.

func (*TriggerInfos) JobRequestWithEvent

func (t *TriggerInfos) JobRequestWithEvent(event *realtime.Event) (*JobRequest, error)

JobRequestWithEvent returns a job request associated with the scheduler informations associated to the specified realtime event.

func (*TriggerInfos) Rev

func (t *TriggerInfos) Rev() string

Rev implements the couchdb.Doc interface

func (*TriggerInfos) SetID

func (t *TriggerInfos) SetID(id string)

SetID implements the couchdb.Doc interface

func (*TriggerInfos) SetRev

func (t *TriggerInfos) SetRev(rev string)

SetRev implements the couchdb.Doc interface

type TriggerState

type TriggerState struct {
	TID                 string     `json:"trigger_id"`
	Status              State      `json:"status"`
	LastSuccess         *time.Time `json:"last_success,omitempty"`
	LastSuccessfulJobID string     `json:"last_successful_job_id,omitempty"`
	LastExecution       *time.Time `json:"last_execution,omitempty"`
	LastExecutedJobID   string     `json:"last_executed_job_id,omitempty"`
	LastFailure         *time.Time `json:"last_failure,omitempty"`
	LastFailedJobID     string     `json:"last_failed_job_id,omitempty"`
	LastError           string     `json:"last_error,omitempty"`
	LastManualExecution *time.Time `json:"last_manual_execution,omitempty"`
	LastManualJobID     string     `json:"last_manual_job_id,omitempty"`
}

TriggerState represent the current state of the trigger

func GetTriggerState

func GetTriggerState(db prefixer.Prefixer, triggerID string) (*TriggerState, error)

GetTriggerState returns the state of the trigger, calculated from the last launched jobs.

type WebhookTrigger

type WebhookTrigger struct {
	*TriggerInfos
	// contains filtered or unexported fields
}

WebhookTrigger implements the @webhook triggers. It schedules a job when an HTTP request is made at this webhook.

func NewWebhookTrigger

func NewWebhookTrigger(infos *TriggerInfos) (*WebhookTrigger, error)

NewWebhookTrigger returns a new instance of WebhookTrigger.

func (*WebhookTrigger) CombineRequest

func (w *WebhookTrigger) CombineRequest() string

CombineRequest implements the CombineRequest method of the Trigger interface.

func (*WebhookTrigger) Fire

func (w *WebhookTrigger) Fire(payload Payload, manual bool)

Fire is called with a payload when the webhook has been requested.

func (*WebhookTrigger) Infos

func (w *WebhookTrigger) Infos() *TriggerInfos

Infos implements the Infos method of the Trigger interface.

func (*WebhookTrigger) Schedule

func (w *WebhookTrigger) Schedule() <-chan *JobRequest

Schedule implements the Schedule method of the Trigger interface.

func (*WebhookTrigger) SetCallback

func (w *WebhookTrigger) SetCallback(cb firer)

SetCallback registers a struct to be called when the webhook is fired.

func (*WebhookTrigger) Type

func (w *WebhookTrigger) Type() string

Type implements the Type method of the Trigger interface.

func (*WebhookTrigger) Unschedule

func (w *WebhookTrigger) Unschedule()

Unschedule implements the Unschedule method of the Trigger interface.

type Worker

type Worker struct {
	Type string
	Conf *WorkerConfig
	// contains filtered or unexported fields
}

Worker is a unit of work that will consume from a queue and execute the do method for each jobs it pulls.

func NewWorker

func NewWorker(conf *WorkerConfig) *Worker

NewWorker creates a new instance of Worker with the given configuration.

func (*Worker) Shutdown

func (w *Worker) Shutdown(ctx context.Context) error

Shutdown is used to close the worker, waiting for all tasks to end

func (*Worker) Start

func (w *Worker) Start(jobs chan *Job) error

Start is used to start the worker consumption of messages from its queue.

type WorkerBeforeHook

type WorkerBeforeHook func(job *Job) (bool, error)

WorkerBeforeHook is an optional method that is always called before the job is being pushed into the queue. It can be useful to skip the job beforehand.

type WorkerCommit

type WorkerCommit func(ctx *TaskContext, errjob error) error

WorkerCommit is an optional method that is always called once after the execution of the WorkerFunc.

type WorkerConfig

type WorkerConfig struct {
	WorkerInit   WorkerInitFunc
	WorkerStart  WorkerStartFunc
	WorkerFunc   WorkerFunc
	WorkerCommit WorkerCommit
	WorkerType   string
	BeforeHook   WorkerBeforeHook
	ErrorHook    JobErrorCheckerHook
	Concurrency  int
	MaxExecCount int
	Reserved     bool // true when the clients must not push jobs for this worker
	Timeout      time.Duration
	RetryDelay   time.Duration
}

WorkerConfig is the configuration parameter of a worker defined by the job system. It contains parameters of the worker along with the worker main function that perform the work against a job's message.

func GetWorkersList

func GetWorkersList() ([]*WorkerConfig, error)

GetWorkersList returns a list of all activated workers, configured as defined by the configuration file.

func (*WorkerConfig) Clone

func (w *WorkerConfig) Clone() *WorkerConfig

Clone clones the worker config

type WorkerFunc

type WorkerFunc func(ctx *TaskContext) error

WorkerFunc represent the work function that a worker should implement.

type WorkerInitFunc

type WorkerInitFunc func() error

WorkerInitFunc is called at the start of the worker system, only once. It is not called before every job process. It can be useful to initialize a global variable used by the worker.

type WorkerStartFunc

type WorkerStartFunc func(ctx *TaskContext) (*TaskContext, error)

WorkerStartFunc is optionally called at the beginning of the each job process and can produce a context value.

type WorkersList

type WorkersList []*WorkerConfig

WorkersList is a map associating a worker type with its acutal configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL