Documentation
¶
Index ¶
- Constants
- Variables
- type AlreadyExists
- type BulkUpdateOtherError
- type InvalidPersistedData
- type InvalidVersion
- type IsDeleted
- type LoadedAt
- type Manager
- type MockRecurringTasksService
- func (m *MockRecurringTasksService) All(ctx context.Context) ([]Task, error)
- func (m *MockRecurringTasksService) Create(ctx context.Context, task *NewTask) (*Task, error)
- func (m *MockRecurringTasksService) Delete(ctx context.Context, id task.RecurringTaskId) (*Task, error)
- func (m *MockRecurringTasksService) Get(ctx context.Context, id task.RecurringTaskId, includeSoftDeleted bool) (*Task, error)
- func (m *MockRecurringTasksService) MarkLoaded(ctx context.Context, toMarks []Task) (*MultiUpdateResult, error)
- func (m *MockRecurringTasksService) NotLoaded(ctx context.Context) ([]Task, error)
- func (m *MockRecurringTasksService) Update(ctx context.Context, update *Task) (*Task, error)
- type MultiUpdateResult
- type NewTask
- type NotFound
- type Schedule
- type ScheduleExpression
- type ScheduleParser
- type Scheduler
- type Service
- type ServiceErr
- type Task
- type TaskDefinition
- type WrappingErr
Constants ¶
const ( NOT_LEADER state = iota LEADER )
Variables ¶
var MockDomainRecurringTask = Task{ ID: "mock", ScheduleExpression: "* * * * *", TaskDefinition: TaskDefinition{ Queue: "q", Kind: "k", }, IsDeleted: false, LoadedAt: nil, Metadata: metadata.Metadata{ CreatedAt: metadata.CreatedAt(MockNow), ModifiedAt: metadata.ModifiedAt(MockNow), Version: metadata.Version{ SeqNum: 0, PrimaryTerm: 1, }, }, }
var MockNow = time.Now().UTC()
Functions ¶
This section is empty.
Types ¶
type AlreadyExists ¶
type AlreadyExists struct {
ID task.RecurringTaskId
}
AlreadyExists is returned when the service tries to create a Task, but there already exists one with the same ID
func (AlreadyExists) Error ¶
func (e AlreadyExists) Error() string
func (AlreadyExists) Id ¶
func (e AlreadyExists) Id() task.RecurringTaskId
type BulkUpdateOtherError ¶
type InvalidPersistedData ¶
type InvalidPersistedData struct {
PersistedData interface{}
}
Invalid data
func (InvalidPersistedData) Error ¶
func (e InvalidPersistedData) Error() string
type InvalidVersion ¶
type InvalidVersion struct {
ID task.RecurringTaskId
}
Invalid version returned when the version is invalid
func (InvalidVersion) Error ¶
func (e InvalidVersion) Error() string
func (InvalidVersion) Id ¶
func (e InvalidVersion) Id() task.RecurringTaskId
type LoadedAt ¶
When a given Task was last "loaded" by a scheduling process.
Note that "loaded" as a term is ... loaded here; it basically means that a Recurring Task's latest change has been "seen". For example, if it was (soft) deleted, we actually remove it from the Scheduler; but we update LoadedAt.
This means that this field also acts as a "is_dirty" marker
type Manager ¶
type Manager interface { // Returns a function that conditionally syncs Recurring Task changes from the data store, // ensuring tasks that are deleted are stopped, tasks that are created or updated are scheduled // properly RecurringSyncFunc() func(ctx context.Context, isLeader leader.Checker) error // Returns a function that conditionally runs a full sync of Recurring Task changes from the data store RecurringSyncEnforceFunc() func(ctx context.Context, isLeader leader.Checker) error }
Manager is in charge of reading recurring Tasks and scheduling them to be run, keeping things synced and updated on calls to its methods.
It exposes methods that are called by the InternalRecurringFunctionRunner at configured intervals.
func NewManager ¶
Returns a new recurring Tasks Manager
type MockRecurringTasksService ¶
type MockRecurringTasksService struct { CreateCalled uint CreateOverride func() (*Task, error) GetCalled uint GetOverride func() (*Task, error) UpdateCalled uint UpdateOverride func() (*Task, error) DeleteCalled uint DeleteOverride func() (*Task, error) AllCalled uint AllOverride func() ([]Task, error) MarkLoadedCalled uint MarkLoadedOverride func() (*MultiUpdateResult, error) NotLoadedCalled uint NotLoadedOverride func() ([]Task, error) }
func (*MockRecurringTasksService) All ¶
func (m *MockRecurringTasksService) All(ctx context.Context) ([]Task, error)
func (*MockRecurringTasksService) Delete ¶
func (m *MockRecurringTasksService) Delete(ctx context.Context, id task.RecurringTaskId) (*Task, error)
func (*MockRecurringTasksService) Get ¶
func (m *MockRecurringTasksService) Get(ctx context.Context, id task.RecurringTaskId, includeSoftDeleted bool) (*Task, error)
func (*MockRecurringTasksService) MarkLoaded ¶
func (m *MockRecurringTasksService) MarkLoaded(ctx context.Context, toMarks []Task) (*MultiUpdateResult, error)
type MultiUpdateResult ¶
type MultiUpdateResult struct { Successes []Task VersionConflicts []Task NotFounds []Task Others []BulkUpdateOtherError }
MultiUpdateResult models a (partial) successful multi update result
type NewTask ¶
type NewTask struct { ID task.RecurringTaskId ScheduleExpression ScheduleExpression TaskDefinition TaskDefinition SkipIfOutstandingTasksExist bool }
A Task that is yet to be persisted We assume that the ScheduleExpression is valid *before* we persist it
type NotFound ¶
type NotFound struct {
ID task.RecurringTaskId
}
NotFound is returned when the repo cannot find a repo by a given RecurringTaskId
func (NotFound) Id ¶
func (e NotFound) Id() task.RecurringTaskId
type ScheduleExpression ¶
type ScheduleExpression string
A cron-like statement. Just a typed version of whatever can be parsed by our actual scheduling infra lib...
type ScheduleParser ¶
type Scheduler ¶
type Scheduler interface { ScheduleParser // Schedules the given recurring Task to be inserted at intervals according //to the Task's ScheduleExpression. // // Note that it takes a full Task (not a ref) because the scheduler likely // does scheduling asynchronously, and an address is dangerous. // // This function will act like an upsert: if there already exists a Task // that has the same RecurringTaskId, the existing one is first unscheduled, then the new // one scheduled. // // Since we assume the ScheduleExpression is _valid_, there should be no // errors, but what the hey 🤷🏻♂️ Schedule(task Task) error // Stops the given recurring Task from being inserted at intervals // // Returns true if it was previously scheduled, false otherwise Unschedule(taskId task.RecurringTaskId) bool // Starts the Scheduler in its own Go routing Start() // Stops the Scheduler Stop() }
A thin wrapper interface around Go Cron
type Service ¶
type Service interface { // Creates (persists) a NewTask, returning a Task // // An error is returned if there is already an existing Task with // the given id Create(ctx context.Context, task *NewTask) (*Task, error) // Retrieves a single Task, optionally returning soft-deleted tasks. // // Errors if no such Task is found that is (optionally) non-soft-deleted Get(ctx context.Context, id task.RecurringTaskId, includeSoftDeleted bool) (*Task, error) // Deletes a single Task // // Errors if no such Task is found that is non-deleted Delete(ctx context.Context, id task.RecurringTaskId) (*Task, error) // Loads returns all persisted RecurringTasks that have not been deleted // // Sorted by id and always reflects the realtime state of the data. All(ctx context.Context) ([]Task, error) // NotLoaded returns not-loaded (seen by recurring tasks manager) RecurringTasks. // // Note that the data returned may be not be realtime. // // This is used to find tasks that have been modified but not loaded. NotLoaded(ctx context.Context) ([]Task, error) // Updates multiple RecurringTasks at once and returns a MultiUpdateResult // // Also nils-out the LoadedAt field to make sure the persisted data works within // the expectations of how things are stored. // // An error is returned if there is no such Task, or if there was a version conflict Update(ctx context.Context, update *Task) (*Task, error) // MarkLoaded sets the LoadedAt field of multiple RecurringTasks to now // at once and returns a MultiUpdateResult. // // An error is returned if the update _completely_ failed. MarkLoaded(ctx context.Context, toMarks []Task) (*MultiUpdateResult, error) }
type ServiceErr ¶
type ServiceErr interface { error Id() task.RecurringTaskId }
ServiceErr is an error interface for Service
type Task ¶
type Task struct { ID task.RecurringTaskId ScheduleExpression ScheduleExpression TaskDefinition TaskDefinition IsDeleted IsDeleted LoadedAt *LoadedAt SkipIfOutstandingTasksExist bool Metadata metadata.Metadata }
The way this is structured is somewhat odd; why soft deletes? Why "loadedAt"?
There is some form of leaky abstraction going on here: saving it this way allows us to easily atomically store data and query it.
Given good ES integration is an explicit goal, I *think* at this point that it's ok to leak a bit of that into the domain ..
func (*Task) IntoDeleted ¶
func (r *Task) IntoDeleted()
func (*Task) UpdateSchedule ¶
func (r *Task) UpdateSchedule(expression ScheduleExpression)
func (*Task) UpdateTaskDefinition ¶
func (r *Task) UpdateTaskDefinition(definition TaskDefinition)
type TaskDefinition ¶
type TaskDefinition struct { Queue queue.Name RetryTimes task.RetryTimes Kind task.Kind Priority task.Priority ProcessingTimeout task.ProcessingTimeout Args *task.Args Context *task.Context }
The actual recurring task that gets inserted