recurring

package
v0.0.0-...-83a686f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NOT_LEADER state = iota
	LEADER
)

Variables

View Source
var MockDomainRecurringTask = Task{
	ID:                 "mock",
	ScheduleExpression: "* * * * *",
	TaskDefinition: TaskDefinition{
		Queue: "q",
		Kind:  "k",
	},
	IsDeleted: false,
	LoadedAt:  nil,
	Metadata: metadata.Metadata{
		CreatedAt:  metadata.CreatedAt(MockNow),
		ModifiedAt: metadata.ModifiedAt(MockNow),
		Version: metadata.Version{
			SeqNum:      0,
			PrimaryTerm: 1,
		},
	},
}
View Source
var MockNow = time.Now().UTC()

Functions

This section is empty.

Types

type AlreadyExists

type AlreadyExists struct {
	ID task.RecurringTaskId
}

AlreadyExists is returned when the service tries to create a Task, but there already exists one with the same ID

func (AlreadyExists) Error

func (e AlreadyExists) Error() string

func (AlreadyExists) Id

type BulkUpdateOtherError

type BulkUpdateOtherError struct {
	RecurringTask Task
	Result        string
}

type InvalidPersistedData

type InvalidPersistedData struct {
	PersistedData interface{}
}

Invalid data

func (InvalidPersistedData) Error

func (e InvalidPersistedData) Error() string

type InvalidVersion

type InvalidVersion struct {
	ID task.RecurringTaskId
}

Invalid version returned when the version is invalid

func (InvalidVersion) Error

func (e InvalidVersion) Error() string

func (InvalidVersion) Id

type IsDeleted

type IsDeleted bool

Soft-delete

type LoadedAt

type LoadedAt time.Time

When a given Task was last "loaded" by a scheduling process.

Note that "loaded" as a term is ... loaded here; it basically means that a Recurring Task's latest change has been "seen". For example, if it was (soft) deleted, we actually remove it from the Scheduler; but we update LoadedAt.

This means that this field also acts as a "is_dirty" marker

type Manager

type Manager interface {
	// Returns a function that conditionally syncs Recurring Task changes from the data store,
	// ensuring tasks that are deleted are stopped, tasks that are created or updated are scheduled
	// properly
	RecurringSyncFunc() func(ctx context.Context, isLeader leader.Checker) error

	// Returns a function that conditionally runs a full sync of Recurring Task changes from the data store
	RecurringSyncEnforceFunc() func(ctx context.Context, isLeader leader.Checker) error
}

Manager is in charge of reading recurring Tasks and scheduling them to be run, keeping things synced and updated on calls to its methods.

It exposes methods that are called by the InternalRecurringFunctionRunner at configured intervals.

func NewManager

func NewManager(scheduler Scheduler, service Service) Manager

Returns a new recurring Tasks Manager

type MockRecurringTasksService

type MockRecurringTasksService struct {
	CreateCalled       uint
	CreateOverride     func() (*Task, error)
	GetCalled          uint
	GetOverride        func() (*Task, error)
	UpdateCalled       uint
	UpdateOverride     func() (*Task, error)
	DeleteCalled       uint
	DeleteOverride     func() (*Task, error)
	AllCalled          uint
	AllOverride        func() ([]Task, error)
	MarkLoadedCalled   uint
	MarkLoadedOverride func() (*MultiUpdateResult, error)
	NotLoadedCalled    uint
	NotLoadedOverride  func() ([]Task, error)
}

func (*MockRecurringTasksService) All

func (*MockRecurringTasksService) Create

func (m *MockRecurringTasksService) Create(ctx context.Context, task *NewTask) (*Task, error)

func (*MockRecurringTasksService) Delete

func (*MockRecurringTasksService) Get

func (m *MockRecurringTasksService) Get(ctx context.Context, id task.RecurringTaskId, includeSoftDeleted bool) (*Task, error)

func (*MockRecurringTasksService) MarkLoaded

func (m *MockRecurringTasksService) MarkLoaded(ctx context.Context, toMarks []Task) (*MultiUpdateResult, error)

func (*MockRecurringTasksService) NotLoaded

func (m *MockRecurringTasksService) NotLoaded(ctx context.Context) ([]Task, error)

func (*MockRecurringTasksService) Update

func (m *MockRecurringTasksService) Update(ctx context.Context, update *Task) (*Task, error)

type MultiUpdateResult

type MultiUpdateResult struct {
	Successes        []Task
	VersionConflicts []Task
	NotFounds        []Task
	Others           []BulkUpdateOtherError
}

MultiUpdateResult models a (partial) successful multi update result

type NewTask

type NewTask struct {
	ID                          task.RecurringTaskId
	ScheduleExpression          ScheduleExpression
	TaskDefinition              TaskDefinition
	SkipIfOutstandingTasksExist bool
}

A Task that is yet to be persisted We assume that the ScheduleExpression is valid *before* we persist it

type NotFound

type NotFound struct {
	ID task.RecurringTaskId
}

NotFound is returned when the repo cannot find a repo by a given RecurringTaskId

func (NotFound) Error

func (e NotFound) Error() string

func (NotFound) Id

func (e NotFound) Id() task.RecurringTaskId

type Schedule

type Schedule interface {
	Next(t time.Time) time.Time
}

type ScheduleExpression

type ScheduleExpression string

A cron-like statement. Just a typed version of whatever can be parsed by our actual scheduling infra lib...

type ScheduleParser

type ScheduleParser interface {
	Parse(spec string) (Schedule, error)
}

type Scheduler

type Scheduler interface {
	ScheduleParser

	// Schedules the given recurring Task to be inserted at intervals according
	//to the Task's ScheduleExpression.
	//
	// Note that it takes a full Task (not a ref) because the scheduler likely
	// does scheduling asynchronously, and an address is dangerous.
	//
	// This function will act like an upsert: if there already exists a Task
	// that has the same RecurringTaskId, the existing one is first unscheduled, then the new
	// one scheduled.
	//
	// Since we assume the ScheduleExpression is _valid_, there should be no
	// errors, but what the hey 🤷🏻‍♂️
	Schedule(task Task) error

	// Stops the given recurring Task from being inserted at intervals
	//
	// Returns true if it was previously scheduled, false otherwise
	Unschedule(taskId task.RecurringTaskId) bool

	// Starts the Scheduler in its own Go routing
	Start()

	// Stops the Scheduler
	Stop()
}

A thin wrapper interface around Go Cron

type Service

type Service interface {

	// Creates (persists) a NewTask, returning a Task
	//
	// An error is returned if there is already an existing Task with
	// the given id
	Create(ctx context.Context, task *NewTask) (*Task, error)

	// Retrieves a single Task, optionally returning soft-deleted tasks.
	//
	// Errors if no such Task is found that is (optionally) non-soft-deleted
	Get(ctx context.Context, id task.RecurringTaskId, includeSoftDeleted bool) (*Task, error)

	// Deletes a single Task
	//
	// Errors if no such Task is found that is non-deleted
	Delete(ctx context.Context, id task.RecurringTaskId) (*Task, error)

	// Loads returns all persisted RecurringTasks that have not been deleted
	//
	// Sorted by id and always reflects the realtime state of the data.
	All(ctx context.Context) ([]Task, error)

	// NotLoaded returns not-loaded (seen by recurring tasks manager) RecurringTasks.
	//
	// Note that the data returned may be not be realtime.
	//
	// This is used to find tasks that have been modified but not loaded.
	NotLoaded(ctx context.Context) ([]Task, error)

	// Updates multiple RecurringTasks at once and returns a MultiUpdateResult
	//
	// Also nils-out the LoadedAt field to make sure the persisted data works within
	// the expectations of how things are stored.
	//
	// An error is returned if there is no such Task, or if there was a version conflict
	Update(ctx context.Context, update *Task) (*Task, error)

	// MarkLoaded sets the LoadedAt field of multiple RecurringTasks to now
	// at once and returns a MultiUpdateResult.
	//
	// An error is returned if the update _completely_ failed.
	MarkLoaded(ctx context.Context, toMarks []Task) (*MultiUpdateResult, error)
}

type ServiceErr

type ServiceErr interface {
	error
	Id() task.RecurringTaskId
}

ServiceErr is an error interface for Service

type Task

type Task struct {
	ID                          task.RecurringTaskId
	ScheduleExpression          ScheduleExpression
	TaskDefinition              TaskDefinition
	IsDeleted                   IsDeleted
	LoadedAt                    *LoadedAt
	SkipIfOutstandingTasksExist bool
	Metadata                    metadata.Metadata
}

The way this is structured is somewhat odd; why soft deletes? Why "loadedAt"?

There is some form of leaky abstraction going on here: saving it this way allows us to easily atomically store data and query it.

Given good ES integration is an explicit goal, I *think* at this point that it's ok to leak a bit of that into the domain ..

func (*Task) IntoDeleted

func (r *Task) IntoDeleted()

func (*Task) UpdateSchedule

func (r *Task) UpdateSchedule(expression ScheduleExpression)

func (*Task) UpdateTaskDefinition

func (r *Task) UpdateTaskDefinition(definition TaskDefinition)

type TaskDefinition

type TaskDefinition struct {
	Queue             queue.Name
	RetryTimes        task.RetryTimes
	Kind              task.Kind
	Priority          task.Priority
	ProcessingTimeout task.ProcessingTimeout
	Args              *task.Args
	Context           *task.Context
}

The actual recurring task that gets inserted

type WrappingErr

type WrappingErr interface {
	error
	Unwrap() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL