Documentation ¶
Overview ¶
Package queues is a generated GoMock package.
Package queues is a generated GoMock package.
Package queues is a generated GoMock package.
Package queues is a generated GoMock package.
Index ¶
- Constants
- Variables
- func EstimateTaskMetricTag(e Executable, namespaceRegistry namespace.Registry, currentClusterName string) []metrics.Tag
- func FromPersistenceAndPredicate(attributes *persistencespb.AndPredicateAttributes) tasks.Predicate
- func FromPersistenceDestinationPredicate(attributes *persistencespb.DestinationPredicateAttributes) tasks.Predicate
- func FromPersistenceEmptyPredicate(_ *persistencespb.EmptyPredicateAttributes) tasks.Predicate
- func FromPersistenceNamespaceIDPredicate(attributes *persistencespb.NamespaceIdPredicateAttributes) tasks.Predicate
- func FromPersistenceNotPredicate(attributes *persistencespb.NotPredicateAttributes) tasks.Predicate
- func FromPersistenceOrPredicate(attributes *persistencespb.OrPredicateAttributes) tasks.Predicate
- func FromPersistencePredicate(predicate *persistencespb.Predicate) tasks.Predicate
- func FromPersistenceQueueState(state *persistencespb.QueueState) *queueState
- func FromPersistenceTaskKey(key *persistencespb.TaskKey) tasks.Key
- func FromPersistenceTaskTypePredicate(attributes *persistencespb.TaskTypePredicateAttributes) tasks.Predicate
- func FromPersistenceUniversalPredicate(_ *persistencespb.UniversalPredicateAttributes) tasks.Predicate
- func GetActiveTimerTaskTypeTagValue(executable Executable) string
- func GetActiveTransferTaskTypeTagValue(task tasks.Task) string
- func GetArchivalTaskTypeTagValue(task tasks.Task) string
- func GetStandbyTimerTaskTypeTagValue(task tasks.Task) string
- func GetStandbyTransferTaskTypeTagValue(task tasks.Task) string
- func GetVisibilityTaskTypeTagValue(task tasks.Task) string
- func IsTaskAcked(task tasks.Task, persistenceQueueState *persistencespb.QueueState) bool
- func IsTimeExpired(referenceTime time.Time, testingTime time.Time) bool
- func NewExecutableFactory(executor Executor, scheduler Scheduler, rescheduler Rescheduler, ...) *executableFactoryImpl
- func NewImmediateQueue(shard hshard.Context, category tasks.Category, scheduler Scheduler, ...) *immediateQueue
- func NewRandomKey() tasks.Key
- func NewRandomKeyInRange(r Range) tasks.Key
- func NewReaderPriorityRateLimiter(rateFn quotas.RateFn, maxReaders int64) quotas.RequestRateLimiter
- func NewRescheduler(scheduler Scheduler, timeSource clock.TimeSource, logger log.Logger, ...) *reschedulerImpl
- func NewScheduledQueue(shard hshard.Context, category tasks.Category, scheduler Scheduler, ...) *scheduledQueue
- func ToPersistenceAndPredicate(andPredicate *predicates.AndImpl[tasks.Task]) *persistencespb.Predicate
- func ToPersistenceDestinationPredicate(taskDestinationPredicate *tasks.DestinationPredicate) *persistencespb.Predicate
- func ToPersistenceEmptyPredicate(_ *predicates.EmptyImpl[tasks.Task]) *persistencespb.Predicate
- func ToPersistenceNamespaceIDPredicate(namespaceIDPredicate *tasks.NamespacePredicate) *persistencespb.Predicate
- func ToPersistenceNotPredicate(notPredicate *predicates.NotImpl[tasks.Task]) *persistencespb.Predicate
- func ToPersistenceOrPredicate(orPredicate *predicates.OrImpl[tasks.Task]) *persistencespb.Predicate
- func ToPersistencePredicate(predicate tasks.Predicate) *persistencespb.Predicate
- func ToPersistenceQueueState(queueState *queueState) *persistencespb.QueueState
- func ToPersistenceRange(r Range) *persistencespb.QueueSliceRange
- func ToPersistenceScope(scope Scope) *persistencespb.QueueSliceScope
- func ToPersistenceTaskKey(key tasks.Key) *persistencespb.TaskKey
- func ToPersistenceTaskTypePredicate(taskTypePredicate *tasks.TypePredicate) *persistencespb.Predicate
- func ToPersistenceUniversalPredicate(_ *predicates.UniversalImpl[tasks.Task]) *persistencespb.Predicate
- type Action
- type Alert
- type AlertAttributesQueuePendingTaskCount
- type AlertAttributesReaderStuck
- type AlertAttributesSlicesCount
- type AlertType
- type ChannelWeightFn
- type CommonSchedulerWrapper
- type DLQWriter
- type Executable
- type ExecutableFactory
- type ExecutableFactoryFn
- type ExecutableOption
- type ExecutableParams
- type ExecuteResponse
- type Executor
- type ExecutorWrapper
- type Grouper
- type GrouperNamespaceID
- type GrouperStateMachineNamespaceIDAndDestination
- func (g GrouperStateMachineNamespaceIDAndDestination) Key(task tasks.Task) (key any)
- func (GrouperStateMachineNamespaceIDAndDestination) KeyTyped(task tasks.Task) (key StateMachineTaskTypeNamespaceIDAndDestination)
- func (GrouperStateMachineNamespaceIDAndDestination) Predicate(keys []any) tasks.Predicate
- type Iterator
- type IteratorImpl
- func (i *IteratorImpl) CanMerge(iter Iterator) bool
- func (i *IteratorImpl) CanSplit(key tasks.Key) bool
- func (i *IteratorImpl) HasNext() bool
- func (i *IteratorImpl) Merge(iter Iterator) Iterator
- func (i *IteratorImpl) Next() (tasks.Task, error)
- func (i *IteratorImpl) Range() Range
- func (i *IteratorImpl) Remaining() Iterator
- func (i *IteratorImpl) Split(key tasks.Key) (left Iterator, right Iterator)
- type MaybeTerminalTaskError
- type Mitigator
- type MockExecutable
- func (m *MockExecutable) Abort()
- func (m *MockExecutable) Ack()
- func (m *MockExecutable) Attempt() int
- func (m *MockExecutable) Cancel()
- func (m *MockExecutable) EXPECT() *MockExecutableMockRecorder
- func (m *MockExecutable) Execute() error
- func (m *MockExecutable) GetCategory() tasks0.Category
- func (m *MockExecutable) GetKey() tasks0.Key
- func (m *MockExecutable) GetNamespaceID() string
- func (m *MockExecutable) GetPriority() tasks.Priority
- func (m *MockExecutable) GetRunID() string
- func (m *MockExecutable) GetScheduledTime() time.Time
- func (m *MockExecutable) GetTask() tasks0.Task
- func (m *MockExecutable) GetTaskID() int64
- func (m *MockExecutable) GetType() v1.TaskType
- func (m *MockExecutable) GetVersion() int64
- func (m *MockExecutable) GetVisibilityTime() time.Time
- func (m *MockExecutable) GetWorkflowID() string
- func (m *MockExecutable) HandleErr(err error) error
- func (m *MockExecutable) IsRetryableError(err error) bool
- func (m *MockExecutable) Nack(err error)
- func (m *MockExecutable) Reschedule()
- func (m *MockExecutable) RetryPolicy() backoff.RetryPolicy
- func (m *MockExecutable) SetScheduledTime(arg0 time.Time)
- func (m *MockExecutable) SetTaskID(id int64)
- func (m *MockExecutable) SetVisibilityTime(timestamp time.Time)
- func (m *MockExecutable) State() tasks.State
- type MockExecutableMockRecorder
- func (mr *MockExecutableMockRecorder) Abort() *gomock.Call
- func (mr *MockExecutableMockRecorder) Ack() *gomock.Call
- func (mr *MockExecutableMockRecorder) Attempt() *gomock.Call
- func (mr *MockExecutableMockRecorder) Cancel() *gomock.Call
- func (mr *MockExecutableMockRecorder) Execute() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetCategory() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetKey() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetNamespaceID() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetPriority() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetRunID() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetScheduledTime() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetTask() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetTaskID() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetType() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetVersion() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetVisibilityTime() *gomock.Call
- func (mr *MockExecutableMockRecorder) GetWorkflowID() *gomock.Call
- func (mr *MockExecutableMockRecorder) HandleErr(err interface{}) *gomock.Call
- func (mr *MockExecutableMockRecorder) IsRetryableError(err interface{}) *gomock.Call
- func (mr *MockExecutableMockRecorder) Nack(err interface{}) *gomock.Call
- func (mr *MockExecutableMockRecorder) Reschedule() *gomock.Call
- func (mr *MockExecutableMockRecorder) RetryPolicy() *gomock.Call
- func (mr *MockExecutableMockRecorder) SetScheduledTime(arg0 interface{}) *gomock.Call
- func (mr *MockExecutableMockRecorder) SetTaskID(id interface{}) *gomock.Call
- func (mr *MockExecutableMockRecorder) SetVisibilityTime(timestamp interface{}) *gomock.Call
- func (mr *MockExecutableMockRecorder) State() *gomock.Call
- type MockExecutor
- type MockExecutorMockRecorder
- type MockExecutorWrapper
- type MockExecutorWrapperMockRecorder
- type MockMaybeTerminalTaskError
- type MockMaybeTerminalTaskErrorMockRecorder
- type MockQueue
- type MockQueueMockRecorder
- func (mr *MockQueueMockRecorder) Category() *gomock.Call
- func (mr *MockQueueMockRecorder) FailoverNamespace(namespaceID interface{}) *gomock.Call
- func (mr *MockQueueMockRecorder) NotifyNewTasks(tasks interface{}) *gomock.Call
- func (mr *MockQueueMockRecorder) Start() *gomock.Call
- func (mr *MockQueueMockRecorder) Stop() *gomock.Call
- type MockRescheduler
- type MockReschedulerMockRecorder
- func (mr *MockReschedulerMockRecorder) Add(task, rescheduleTime interface{}) *gomock.Call
- func (mr *MockReschedulerMockRecorder) Len() *gomock.Call
- func (mr *MockReschedulerMockRecorder) Reschedule(namespaceID interface{}) *gomock.Call
- func (mr *MockReschedulerMockRecorder) Start() *gomock.Call
- func (mr *MockReschedulerMockRecorder) Stop() *gomock.Call
- type MockScheduler
- type MockSchedulerMockRecorder
- func (mr *MockSchedulerMockRecorder) Start() *gomock.Call
- func (mr *MockSchedulerMockRecorder) Stop() *gomock.Call
- func (mr *MockSchedulerMockRecorder) Submit(arg0 interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) TaskChannelKeyFn() *gomock.Call
- func (mr *MockSchedulerMockRecorder) TrySubmit(arg0 interface{}) *gomock.Call
- type Monitor
- type MonitorOptions
- type NamespaceIDAndDestination
- type Options
- type PaginationFnProvider
- type PriorityAssigner
- type Queue
- type QueueWriter
- type Range
- func (r *Range) CanMerge(input Range) bool
- func (r *Range) CanSplit(key tasks.Key) bool
- func (r *Range) ContainsKey(key tasks.Key) bool
- func (r *Range) ContainsRange(input Range) bool
- func (r *Range) Equals(input Range) bool
- func (r *Range) IsEmpty() bool
- func (r *Range) Merge(input Range) Range
- func (r *Range) Split(key tasks.Key) (left Range, right Range)
- type RateLimitedSchedulerOptions
- type Reader
- type ReaderCompletionFn
- type ReaderGroup
- func (g *ReaderGroup) ForEach(f func(int64, Reader))
- func (g *ReaderGroup) GetOrCreateReader(readerID int64) Reader
- func (g *ReaderGroup) NewReader(readerID int64, slices ...Slice) Reader
- func (g *ReaderGroup) ReaderByID(readerID int64) (Reader, bool)
- func (g *ReaderGroup) Readers() map[int64]Reader
- func (g *ReaderGroup) RemoveReader(readerID int64)
- func (g *ReaderGroup) Start()
- func (g *ReaderGroup) Stop()
- type ReaderImpl
- func (r *ReaderImpl) AppendSlices(incomingSlices ...Slice)
- func (r *ReaderImpl) ClearSlices(predicate SlicePredicate)
- func (r *ReaderImpl) CompactSlices(predicate SlicePredicate)
- func (r *ReaderImpl) MergeSlices(incomingSlices ...Slice)
- func (r *ReaderImpl) Notify()
- func (r *ReaderImpl) Pause(duration time.Duration)
- func (r *ReaderImpl) Scopes() []Scope
- func (r *ReaderImpl) ShrinkSlices() int
- func (r *ReaderImpl) SplitSlices(splitter SliceSplitter)
- func (r *ReaderImpl) Start()
- func (r *ReaderImpl) Stop()
- func (r *ReaderImpl) WalkSlices(iterator SliceIterator)
- type ReaderInitializer
- type ReaderOptions
- type Rescheduler
- type Scheduler
- type SchedulerOptions
- type SchedulerRateLimiter
- type Scope
- func (s *Scope) CanMergeByPredicate(incomingScope Scope) bool
- func (s *Scope) CanMergeByRange(incomingScope Scope) bool
- func (s *Scope) CanSplitByRange(key tasks.Key) bool
- func (s *Scope) Contains(task tasks.Task) bool
- func (s *Scope) Equals(scope Scope) bool
- func (s *Scope) IsEmpty() bool
- func (s *Scope) MergeByPredicate(incomingScope Scope) Scope
- func (s *Scope) MergeByRange(incomingScope Scope) Scope
- func (s *Scope) SplitByPredicate(predicate tasks.Predicate) (pass Scope, fail Scope)
- func (s *Scope) SplitByRange(key tasks.Key) (left Scope, right Scope)
- type Slice
- type SliceImpl
- func (s *SliceImpl) CanMergeWithSlice(slice Slice) bool
- func (s *SliceImpl) CanSplitByRange(key tasks.Key) bool
- func (s *SliceImpl) Clear()
- func (s *SliceImpl) CompactWithSlice(slice Slice) Slice
- func (s *SliceImpl) MergeWithSlice(slice Slice) []Slice
- func (s *SliceImpl) MoreTasks() bool
- func (s *SliceImpl) Scope() Scope
- func (s *SliceImpl) SelectTasks(readerID int64, batchSize int) ([]Executable, error)
- func (s *SliceImpl) ShrinkScope() int
- func (s *SliceImpl) SplitByPredicate(predicate tasks.Predicate) (pass Slice, fail Slice)
- func (s *SliceImpl) SplitByRange(key tasks.Key) (left Slice, right Slice)
- func (s *SliceImpl) TaskStats() TaskStats
- type SliceIterator
- type SlicePredicate
- type SliceSplitter
- type SpeculativeWorkflowTaskTimeoutQueue
- func (q SpeculativeWorkflowTaskTimeoutQueue) Category() tasks.Category
- func (q SpeculativeWorkflowTaskTimeoutQueue) FailoverNamespace(_ string)
- func (q SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks(ts []tasks.Task)
- func (q SpeculativeWorkflowTaskTimeoutQueue) Start()
- func (q SpeculativeWorkflowTaskTimeoutQueue) Stop()
- type StaleStateError
- type StateMachineTaskTypeNamespaceIDAndDestination
- type TaskChannelKey
- type TaskChannelKeyFn
- type TaskStats
- type UnprocessableTaskError
Constants ¶
const (
DefaultReaderId = common.DefaultQueueReaderID
)
Variables ¶
var ( ErrSendTaskToDLQ = errors.New("failed to send task to DLQ") ErrCreateDLQ = errors.New("failed to create DLQ") ErrGetClusterMetadata = errors.New("failed to get cluster metadata") )
var ErrStaleTask = errors.New("task stale")
ErrStaleTask is an indicator that a task cannot be executed because it is stale. This is expected in certain situations and it is safe to drop the task. An example of a stale task is when the task is pointing to a state machine in mutable state that has a newer version than the version that task was created from, that is the state machine has already transitioned and the task is no longer needed.
var (
ErrTerminalTaskFailure = errors.New("original task failed and this task is now to send the original to the DLQ")
)
var (
NoopReaderCompletionFn = func(_ int64) {}
)
Functions ¶
func EstimateTaskMetricTag ¶ added in v1.23.0
func FromPersistenceAndPredicate ¶ added in v1.17.3
func FromPersistenceAndPredicate( attributes *persistencespb.AndPredicateAttributes, ) tasks.Predicate
func FromPersistenceDestinationPredicate ¶ added in v1.24.0
func FromPersistenceDestinationPredicate( attributes *persistencespb.DestinationPredicateAttributes, ) tasks.Predicate
func FromPersistenceEmptyPredicate ¶ added in v1.17.3
func FromPersistenceEmptyPredicate( _ *persistencespb.EmptyPredicateAttributes, ) tasks.Predicate
func FromPersistenceNamespaceIDPredicate ¶ added in v1.17.3
func FromPersistenceNamespaceIDPredicate( attributes *persistencespb.NamespaceIdPredicateAttributes, ) tasks.Predicate
func FromPersistenceNotPredicate ¶ added in v1.17.3
func FromPersistenceNotPredicate( attributes *persistencespb.NotPredicateAttributes, ) tasks.Predicate
func FromPersistenceOrPredicate ¶ added in v1.17.3
func FromPersistenceOrPredicate( attributes *persistencespb.OrPredicateAttributes, ) tasks.Predicate
func FromPersistencePredicate ¶ added in v1.17.3
func FromPersistencePredicate( predicate *persistencespb.Predicate, ) tasks.Predicate
func FromPersistenceQueueState ¶ added in v1.17.3
func FromPersistenceQueueState( state *persistencespb.QueueState, ) *queueState
func FromPersistenceTaskKey ¶ added in v1.17.3
func FromPersistenceTaskKey( key *persistencespb.TaskKey, ) tasks.Key
func FromPersistenceTaskTypePredicate ¶ added in v1.17.3
func FromPersistenceTaskTypePredicate( attributes *persistencespb.TaskTypePredicateAttributes, ) tasks.Predicate
func FromPersistenceUniversalPredicate ¶ added in v1.17.3
func FromPersistenceUniversalPredicate( _ *persistencespb.UniversalPredicateAttributes, ) tasks.Predicate
func GetActiveTimerTaskTypeTagValue ¶ added in v1.17.0
func GetActiveTimerTaskTypeTagValue( executable Executable, ) string
func GetActiveTransferTaskTypeTagValue ¶ added in v1.17.0
func GetArchivalTaskTypeTagValue ¶ added in v1.19.0
func GetStandbyTimerTaskTypeTagValue ¶ added in v1.17.0
func GetStandbyTransferTaskTypeTagValue ¶ added in v1.17.0
func GetVisibilityTaskTypeTagValue ¶ added in v1.17.0
func IsTaskAcked ¶ added in v1.18.0
func IsTaskAcked( task tasks.Task, persistenceQueueState *persistencespb.QueueState, ) bool
func IsTimeExpired ¶ added in v1.18.0
IsTimeExpired checks if the testing time is equal or before the reference time. The precision of the comparison is millisecond.
func NewExecutableFactory ¶ added in v1.23.0
func NewExecutableFactory( executor Executor, scheduler Scheduler, rescheduler Rescheduler, priorityAssigner PriorityAssigner, timeSource clock.TimeSource, namespaceRegistry namespace.Registry, clusterMetadata cluster.Metadata, logger log.Logger, metricsHandler metrics.Handler, dlqWriter *DLQWriter, dlqEnabled dynamicconfig.BoolPropertyFn, attemptsBeforeSendingToDlq dynamicconfig.IntPropertyFn, dlqInternalErrors dynamicconfig.BoolPropertyFn, dlqErrorPattern dynamicconfig.StringPropertyFn, ) *executableFactoryImpl
func NewImmediateQueue ¶ added in v1.17.3
func NewImmediateQueue( shard hshard.Context, category tasks.Category, scheduler Scheduler, rescheduler Rescheduler, options *Options, hostRateLimiter quotas.RequestRateLimiter, grouper Grouper, logger log.Logger, metricsHandler metrics.Handler, factory ExecutableFactory, ) *immediateQueue
func NewRandomKey ¶ added in v1.17.0
func NewRandomKeyInRange ¶ added in v1.17.0
func NewReaderPriorityRateLimiter ¶ added in v1.18.0
func NewReaderPriorityRateLimiter( rateFn quotas.RateFn, maxReaders int64, ) quotas.RequestRateLimiter
func NewRescheduler ¶ added in v1.17.0
func NewScheduledQueue ¶ added in v1.17.3
func NewScheduledQueue( shard hshard.Context, category tasks.Category, scheduler Scheduler, rescheduler Rescheduler, executableFactory ExecutableFactory, options *Options, hostRateLimiter quotas.RequestRateLimiter, logger log.Logger, metricsHandler metrics.Handler, ) *scheduledQueue
func ToPersistenceAndPredicate ¶ added in v1.17.3
func ToPersistenceAndPredicate( andPredicate *predicates.AndImpl[tasks.Task], ) *persistencespb.Predicate
func ToPersistenceDestinationPredicate ¶ added in v1.24.0
func ToPersistenceDestinationPredicate( taskDestinationPredicate *tasks.DestinationPredicate, ) *persistencespb.Predicate
func ToPersistenceEmptyPredicate ¶ added in v1.17.3
func ToPersistenceEmptyPredicate( _ *predicates.EmptyImpl[tasks.Task], ) *persistencespb.Predicate
func ToPersistenceNamespaceIDPredicate ¶ added in v1.17.3
func ToPersistenceNamespaceIDPredicate( namespaceIDPredicate *tasks.NamespacePredicate, ) *persistencespb.Predicate
func ToPersistenceNotPredicate ¶ added in v1.17.3
func ToPersistenceNotPredicate( notPredicate *predicates.NotImpl[tasks.Task], ) *persistencespb.Predicate
func ToPersistenceOrPredicate ¶ added in v1.17.3
func ToPersistenceOrPredicate( orPredicate *predicates.OrImpl[tasks.Task], ) *persistencespb.Predicate
func ToPersistencePredicate ¶ added in v1.17.3
func ToPersistencePredicate( predicate tasks.Predicate, ) *persistencespb.Predicate
func ToPersistenceQueueState ¶ added in v1.17.3
func ToPersistenceQueueState( queueState *queueState, ) *persistencespb.QueueState
func ToPersistenceRange ¶ added in v1.17.3
func ToPersistenceRange( r Range, ) *persistencespb.QueueSliceRange
func ToPersistenceScope ¶ added in v1.17.3
func ToPersistenceScope( scope Scope, ) *persistencespb.QueueSliceScope
func ToPersistenceTaskKey ¶ added in v1.17.3
func ToPersistenceTaskKey( key tasks.Key, ) *persistencespb.TaskKey
func ToPersistenceTaskTypePredicate ¶ added in v1.17.3
func ToPersistenceTaskTypePredicate( taskTypePredicate *tasks.TypePredicate, ) *persistencespb.Predicate
func ToPersistenceUniversalPredicate ¶ added in v1.17.3
func ToPersistenceUniversalPredicate( _ *predicates.UniversalImpl[tasks.Task], ) *persistencespb.Predicate
Types ¶
type Action ¶ added in v1.18.0
type Action interface { Name() string Run(*ReaderGroup) }
Action is a set of operations that can be run on a ReaderGroup. It is created and run by Mitigator upon receiving an Alert.
type Alert ¶ added in v1.18.0
type Alert struct { AlertType AlertType AlertAttributesQueuePendingTaskCount *AlertAttributesQueuePendingTaskCount AlertAttributesReaderStuck *AlertAttributesReaderStuck AlertAttributesSliceCount *AlertAttributesSlicesCount }
Alert is created by a Monitor when some statistics of the Queue is abnormal
type AlertAttributesQueuePendingTaskCount ¶ added in v1.18.0
type AlertAttributesReaderStuck ¶ added in v1.18.0
type AlertAttributesSlicesCount ¶ added in v1.18.0
type ChannelWeightFn ¶ added in v1.18.0
type ChannelWeightFn = tasks.ChannelWeightFn[TaskChannelKey]
type CommonSchedulerWrapper ¶ added in v1.24.0
type CommonSchedulerWrapper struct { tasks.Scheduler[Executable] TaskKeyFn func(e Executable) TaskChannelKey }
CommonSchedulerWrapper is an adapter that converts a common [task.Scheduler] to a Scheduler with an injectable TaskChannelKeyFn.
func (*CommonSchedulerWrapper) TaskChannelKeyFn ¶ added in v1.24.0
func (s *CommonSchedulerWrapper) TaskChannelKeyFn() TaskChannelKeyFn
type DLQWriter ¶ added in v1.23.0
type DLQWriter struct {
// contains filtered or unexported fields
}
DLQWriter can be used to write tasks to the DLQ.
func NewDLQWriter ¶ added in v1.23.0
func NewDLQWriter(w QueueWriter, m cluster.Metadata, h metrics.Handler, l log.SnTaggedLogger, r namespace.Registry) *DLQWriter
NewDLQWriter returns a DLQ which will write to the given QueueWriter.
type Executable ¶ added in v1.17.0
type Executable interface { ctasks.Task tasks.Task Attempt() int GetTask() tasks.Task GetPriority() ctasks.Priority GetScheduledTime() time.Time SetScheduledTime(time.Time) }
func NewExecutable ¶ added in v1.17.0
func NewExecutable( readerID int64, task tasks.Task, executor Executor, scheduler Scheduler, rescheduler Rescheduler, priorityAssigner PriorityAssigner, timeSource clock.TimeSource, namespaceRegistry namespace.Registry, clusterMetadata cluster.Metadata, logger log.Logger, metricsHandler metrics.Handler, opts ...ExecutableOption, ) Executable
type ExecutableFactory ¶ added in v1.23.0
type ExecutableFactory interface {
NewExecutable(task tasks.Task, readerID int64) Executable
}
ExecutableFactory and the related interfaces here are needed to make it possible to extend the functionality of task processing without changing its core logic. This is similar to ExecutorWrapper.
type ExecutableFactoryFn ¶ added in v1.23.0
type ExecutableFactoryFn func(readerID int64, t tasks.Task) Executable
ExecutableFactoryFn is a convenience type to avoid having to create a struct that implements ExecutableFactory.
func (ExecutableFactoryFn) NewExecutable ¶ added in v1.23.0
func (f ExecutableFactoryFn) NewExecutable(task tasks.Task, readerID int64) Executable
type ExecutableOption ¶ added in v1.23.0
type ExecutableOption func(*ExecutableParams)
type ExecutableParams ¶ added in v1.23.0
type ExecutableParams struct { DLQEnabled dynamicconfig.BoolPropertyFn DLQWriter *DLQWriter MaxUnexpectedErrorAttempts dynamicconfig.IntPropertyFn DLQInternalErrors dynamicconfig.BoolPropertyFn DLQErrorPattern dynamicconfig.StringPropertyFn }
type ExecuteResponse ¶ added in v1.23.0
type Executor ¶ added in v1.17.0
type Executor interface {
Execute(context.Context, Executable) ExecuteResponse
}
type ExecutorWrapper ¶ added in v1.22.0
type Grouper ¶ added in v1.24.0
type Grouper interface { // Key returns the group key for a given task. Key(task tasks.Task) (key any) // Predicate constructs a prdicate from a slice of keys. Predicate(keys []any) tasks.Predicate }
Grouper groups tasks and constructs predicates for those groups.
type GrouperNamespaceID ¶ added in v1.24.0
type GrouperNamespaceID struct { }
type GrouperStateMachineNamespaceIDAndDestination ¶ added in v1.24.0
type GrouperStateMachineNamespaceIDAndDestination struct { }
func (GrouperStateMachineNamespaceIDAndDestination) Key ¶ added in v1.24.0
func (g GrouperStateMachineNamespaceIDAndDestination) Key(task tasks.Task) (key any)
func (GrouperStateMachineNamespaceIDAndDestination) KeyTyped ¶ added in v1.24.0
func (GrouperStateMachineNamespaceIDAndDestination) KeyTyped(task tasks.Task) (key StateMachineTaskTypeNamespaceIDAndDestination)
type IteratorImpl ¶ added in v1.17.0
type IteratorImpl struct {
// contains filtered or unexported fields
}
func NewIterator ¶ added in v1.17.0
func NewIterator( paginationFnProvider PaginationFnProvider, r Range, ) *IteratorImpl
func (*IteratorImpl) CanMerge ¶ added in v1.17.0
func (i *IteratorImpl) CanMerge(iter Iterator) bool
func (*IteratorImpl) CanSplit ¶ added in v1.17.0
func (i *IteratorImpl) CanSplit(key tasks.Key) bool
func (*IteratorImpl) HasNext ¶ added in v1.17.0
func (i *IteratorImpl) HasNext() bool
func (*IteratorImpl) Merge ¶ added in v1.17.0
func (i *IteratorImpl) Merge(iter Iterator) Iterator
func (*IteratorImpl) Range ¶ added in v1.17.0
func (i *IteratorImpl) Range() Range
func (*IteratorImpl) Remaining ¶ added in v1.17.0
func (i *IteratorImpl) Remaining() Iterator
type MaybeTerminalTaskError ¶ added in v1.24.0
type MaybeTerminalTaskError interface {
IsTerminalTaskError() bool
}
MaybeTerminalTaskError are errors which (if IsTerminalTaskError returns true) cannot be retried and should not be rescheduled. Tasks should be enqueued to the DLQ immediately if an error is marked as terminal.
type Mitigator ¶ added in v1.18.0
type Mitigator interface {
Mitigate(Alert)
}
Mitigator generates and runs an Action for resolving the given Alert
type MockExecutable ¶ added in v1.17.0
type MockExecutable struct {
// contains filtered or unexported fields
}
MockExecutable is a mock of Executable interface.
func NewMockExecutable ¶ added in v1.17.0
func NewMockExecutable(ctrl *gomock.Controller) *MockExecutable
NewMockExecutable creates a new mock instance.
func (*MockExecutable) Abort ¶ added in v1.21.0
func (m *MockExecutable) Abort()
Abort mocks base method.
func (*MockExecutable) Attempt ¶ added in v1.17.0
func (m *MockExecutable) Attempt() int
Attempt mocks base method.
func (*MockExecutable) Cancel ¶ added in v1.17.3
func (m *MockExecutable) Cancel()
Cancel mocks base method.
func (*MockExecutable) EXPECT ¶ added in v1.17.0
func (m *MockExecutable) EXPECT() *MockExecutableMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockExecutable) Execute ¶ added in v1.17.0
func (m *MockExecutable) Execute() error
Execute mocks base method.
func (*MockExecutable) GetCategory ¶ added in v1.17.0
func (m *MockExecutable) GetCategory() tasks0.Category
GetCategory mocks base method.
func (*MockExecutable) GetKey ¶ added in v1.17.0
func (m *MockExecutable) GetKey() tasks0.Key
GetKey mocks base method.
func (*MockExecutable) GetNamespaceID ¶ added in v1.17.0
func (m *MockExecutable) GetNamespaceID() string
GetNamespaceID mocks base method.
func (*MockExecutable) GetPriority ¶ added in v1.17.0
func (m *MockExecutable) GetPriority() tasks.Priority
GetPriority mocks base method.
func (*MockExecutable) GetRunID ¶ added in v1.17.0
func (m *MockExecutable) GetRunID() string
GetRunID mocks base method.
func (*MockExecutable) GetScheduledTime ¶ added in v1.18.0
func (m *MockExecutable) GetScheduledTime() time.Time
GetScheduledTime mocks base method.
func (*MockExecutable) GetTask ¶ added in v1.17.0
func (m *MockExecutable) GetTask() tasks0.Task
GetTask mocks base method.
func (*MockExecutable) GetTaskID ¶ added in v1.17.0
func (m *MockExecutable) GetTaskID() int64
GetTaskID mocks base method.
func (*MockExecutable) GetType ¶ added in v1.17.0
func (m *MockExecutable) GetType() v1.TaskType
GetType mocks base method.
func (*MockExecutable) GetVersion ¶ added in v1.17.0
func (m *MockExecutable) GetVersion() int64
GetVersion mocks base method.
func (*MockExecutable) GetVisibilityTime ¶ added in v1.17.0
func (m *MockExecutable) GetVisibilityTime() time.Time
GetVisibilityTime mocks base method.
func (*MockExecutable) GetWorkflowID ¶ added in v1.17.0
func (m *MockExecutable) GetWorkflowID() string
GetWorkflowID mocks base method.
func (*MockExecutable) HandleErr ¶ added in v1.17.0
func (m *MockExecutable) HandleErr(err error) error
HandleErr mocks base method.
func (*MockExecutable) IsRetryableError ¶ added in v1.17.0
func (m *MockExecutable) IsRetryableError(err error) bool
IsRetryableError mocks base method.
func (*MockExecutable) Nack ¶ added in v1.17.0
func (m *MockExecutable) Nack(err error)
Nack mocks base method.
func (*MockExecutable) Reschedule ¶ added in v1.17.0
func (m *MockExecutable) Reschedule()
Reschedule mocks base method.
func (*MockExecutable) RetryPolicy ¶ added in v1.17.0
func (m *MockExecutable) RetryPolicy() backoff.RetryPolicy
RetryPolicy mocks base method.
func (*MockExecutable) SetScheduledTime ¶ added in v1.18.0
func (m *MockExecutable) SetScheduledTime(arg0 time.Time)
SetScheduledTime mocks base method.
func (*MockExecutable) SetTaskID ¶ added in v1.17.0
func (m *MockExecutable) SetTaskID(id int64)
SetTaskID mocks base method.
func (*MockExecutable) SetVisibilityTime ¶ added in v1.17.0
func (m *MockExecutable) SetVisibilityTime(timestamp time.Time)
SetVisibilityTime mocks base method.
func (*MockExecutable) State ¶ added in v1.17.0
func (m *MockExecutable) State() tasks.State
State mocks base method.
type MockExecutableMockRecorder ¶ added in v1.17.0
type MockExecutableMockRecorder struct {
// contains filtered or unexported fields
}
MockExecutableMockRecorder is the mock recorder for MockExecutable.
func (*MockExecutableMockRecorder) Abort ¶ added in v1.21.0
func (mr *MockExecutableMockRecorder) Abort() *gomock.Call
Abort indicates an expected call of Abort.
func (*MockExecutableMockRecorder) Ack ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) Ack() *gomock.Call
Ack indicates an expected call of Ack.
func (*MockExecutableMockRecorder) Attempt ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) Attempt() *gomock.Call
Attempt indicates an expected call of Attempt.
func (*MockExecutableMockRecorder) Cancel ¶ added in v1.17.3
func (mr *MockExecutableMockRecorder) Cancel() *gomock.Call
Cancel indicates an expected call of Cancel.
func (*MockExecutableMockRecorder) Execute ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) Execute() *gomock.Call
Execute indicates an expected call of Execute.
func (*MockExecutableMockRecorder) GetCategory ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetCategory() *gomock.Call
GetCategory indicates an expected call of GetCategory.
func (*MockExecutableMockRecorder) GetKey ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetKey() *gomock.Call
GetKey indicates an expected call of GetKey.
func (*MockExecutableMockRecorder) GetNamespaceID ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetNamespaceID() *gomock.Call
GetNamespaceID indicates an expected call of GetNamespaceID.
func (*MockExecutableMockRecorder) GetPriority ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetPriority() *gomock.Call
GetPriority indicates an expected call of GetPriority.
func (*MockExecutableMockRecorder) GetRunID ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetRunID() *gomock.Call
GetRunID indicates an expected call of GetRunID.
func (*MockExecutableMockRecorder) GetScheduledTime ¶ added in v1.18.0
func (mr *MockExecutableMockRecorder) GetScheduledTime() *gomock.Call
GetScheduledTime indicates an expected call of GetScheduledTime.
func (*MockExecutableMockRecorder) GetTask ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetTask() *gomock.Call
GetTask indicates an expected call of GetTask.
func (*MockExecutableMockRecorder) GetTaskID ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetTaskID() *gomock.Call
GetTaskID indicates an expected call of GetTaskID.
func (*MockExecutableMockRecorder) GetType ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetType() *gomock.Call
GetType indicates an expected call of GetType.
func (*MockExecutableMockRecorder) GetVersion ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetVersion() *gomock.Call
GetVersion indicates an expected call of GetVersion.
func (*MockExecutableMockRecorder) GetVisibilityTime ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetVisibilityTime() *gomock.Call
GetVisibilityTime indicates an expected call of GetVisibilityTime.
func (*MockExecutableMockRecorder) GetWorkflowID ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) GetWorkflowID() *gomock.Call
GetWorkflowID indicates an expected call of GetWorkflowID.
func (*MockExecutableMockRecorder) HandleErr ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) HandleErr(err interface{}) *gomock.Call
HandleErr indicates an expected call of HandleErr.
func (*MockExecutableMockRecorder) IsRetryableError ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) IsRetryableError(err interface{}) *gomock.Call
IsRetryableError indicates an expected call of IsRetryableError.
func (*MockExecutableMockRecorder) Nack ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) Nack(err interface{}) *gomock.Call
Nack indicates an expected call of Nack.
func (*MockExecutableMockRecorder) Reschedule ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) Reschedule() *gomock.Call
Reschedule indicates an expected call of Reschedule.
func (*MockExecutableMockRecorder) RetryPolicy ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) RetryPolicy() *gomock.Call
RetryPolicy indicates an expected call of RetryPolicy.
func (*MockExecutableMockRecorder) SetScheduledTime ¶ added in v1.18.0
func (mr *MockExecutableMockRecorder) SetScheduledTime(arg0 interface{}) *gomock.Call
SetScheduledTime indicates an expected call of SetScheduledTime.
func (*MockExecutableMockRecorder) SetTaskID ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) SetTaskID(id interface{}) *gomock.Call
SetTaskID indicates an expected call of SetTaskID.
func (*MockExecutableMockRecorder) SetVisibilityTime ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) SetVisibilityTime(timestamp interface{}) *gomock.Call
SetVisibilityTime indicates an expected call of SetVisibilityTime.
func (*MockExecutableMockRecorder) State ¶ added in v1.17.0
func (mr *MockExecutableMockRecorder) State() *gomock.Call
State indicates an expected call of State.
type MockExecutor ¶ added in v1.17.0
type MockExecutor struct {
// contains filtered or unexported fields
}
MockExecutor is a mock of Executor interface.
func NewMockExecutor ¶ added in v1.17.0
func NewMockExecutor(ctrl *gomock.Controller) *MockExecutor
NewMockExecutor creates a new mock instance.
func (*MockExecutor) EXPECT ¶ added in v1.17.0
func (m *MockExecutor) EXPECT() *MockExecutorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockExecutor) Execute ¶ added in v1.17.0
func (m *MockExecutor) Execute(arg0 context.Context, arg1 Executable) ExecuteResponse
Execute mocks base method.
type MockExecutorMockRecorder ¶ added in v1.17.0
type MockExecutorMockRecorder struct {
// contains filtered or unexported fields
}
MockExecutorMockRecorder is the mock recorder for MockExecutor.
func (*MockExecutorMockRecorder) Execute ¶ added in v1.17.0
func (mr *MockExecutorMockRecorder) Execute(arg0, arg1 interface{}) *gomock.Call
Execute indicates an expected call of Execute.
type MockExecutorWrapper ¶ added in v1.22.0
type MockExecutorWrapper struct {
// contains filtered or unexported fields
}
MockExecutorWrapper is a mock of ExecutorWrapper interface.
func NewMockExecutorWrapper ¶ added in v1.22.0
func NewMockExecutorWrapper(ctrl *gomock.Controller) *MockExecutorWrapper
NewMockExecutorWrapper creates a new mock instance.
func (*MockExecutorWrapper) EXPECT ¶ added in v1.22.0
func (m *MockExecutorWrapper) EXPECT() *MockExecutorWrapperMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockExecutorWrapper) Wrap ¶ added in v1.22.0
func (m *MockExecutorWrapper) Wrap(delegate Executor) Executor
Wrap mocks base method.
type MockExecutorWrapperMockRecorder ¶ added in v1.22.0
type MockExecutorWrapperMockRecorder struct {
// contains filtered or unexported fields
}
MockExecutorWrapperMockRecorder is the mock recorder for MockExecutorWrapper.
func (*MockExecutorWrapperMockRecorder) Wrap ¶ added in v1.22.0
func (mr *MockExecutorWrapperMockRecorder) Wrap(delegate interface{}) *gomock.Call
Wrap indicates an expected call of Wrap.
type MockMaybeTerminalTaskError ¶ added in v1.24.0
type MockMaybeTerminalTaskError struct {
// contains filtered or unexported fields
}
MockMaybeTerminalTaskError is a mock of MaybeTerminalTaskError interface.
func NewMockMaybeTerminalTaskError ¶ added in v1.24.0
func NewMockMaybeTerminalTaskError(ctrl *gomock.Controller) *MockMaybeTerminalTaskError
NewMockMaybeTerminalTaskError creates a new mock instance.
func (*MockMaybeTerminalTaskError) EXPECT ¶ added in v1.24.0
func (m *MockMaybeTerminalTaskError) EXPECT() *MockMaybeTerminalTaskErrorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockMaybeTerminalTaskError) IsTerminalTaskError ¶ added in v1.24.0
func (m *MockMaybeTerminalTaskError) IsTerminalTaskError() bool
IsTerminalTaskError mocks base method.
type MockMaybeTerminalTaskErrorMockRecorder ¶ added in v1.24.0
type MockMaybeTerminalTaskErrorMockRecorder struct {
// contains filtered or unexported fields
}
MockMaybeTerminalTaskErrorMockRecorder is the mock recorder for MockMaybeTerminalTaskError.
func (*MockMaybeTerminalTaskErrorMockRecorder) IsTerminalTaskError ¶ added in v1.24.0
func (mr *MockMaybeTerminalTaskErrorMockRecorder) IsTerminalTaskError() *gomock.Call
IsTerminalTaskError indicates an expected call of IsTerminalTaskError.
type MockQueue ¶ added in v1.17.3
type MockQueue struct {
// contains filtered or unexported fields
}
MockQueue is a mock of Queue interface.
func NewMockQueue ¶ added in v1.17.3
func NewMockQueue(ctrl *gomock.Controller) *MockQueue
NewMockQueue creates a new mock instance.
func (*MockQueue) EXPECT ¶ added in v1.17.3
func (m *MockQueue) EXPECT() *MockQueueMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockQueue) FailoverNamespace ¶ added in v1.17.3
FailoverNamespace mocks base method.
func (*MockQueue) NotifyNewTasks ¶ added in v1.17.3
NotifyNewTasks mocks base method.
type MockQueueMockRecorder ¶ added in v1.17.3
type MockQueueMockRecorder struct {
// contains filtered or unexported fields
}
MockQueueMockRecorder is the mock recorder for MockQueue.
func (*MockQueueMockRecorder) Category ¶ added in v1.17.3
func (mr *MockQueueMockRecorder) Category() *gomock.Call
Category indicates an expected call of Category.
func (*MockQueueMockRecorder) FailoverNamespace ¶ added in v1.17.3
func (mr *MockQueueMockRecorder) FailoverNamespace(namespaceID interface{}) *gomock.Call
FailoverNamespace indicates an expected call of FailoverNamespace.
func (*MockQueueMockRecorder) NotifyNewTasks ¶ added in v1.17.3
func (mr *MockQueueMockRecorder) NotifyNewTasks(tasks interface{}) *gomock.Call
NotifyNewTasks indicates an expected call of NotifyNewTasks.
func (*MockQueueMockRecorder) Start ¶ added in v1.17.3
func (mr *MockQueueMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockQueueMockRecorder) Stop ¶ added in v1.17.3
func (mr *MockQueueMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
type MockRescheduler ¶ added in v1.17.0
type MockRescheduler struct {
// contains filtered or unexported fields
}
MockRescheduler is a mock of Rescheduler interface.
func NewMockRescheduler ¶ added in v1.17.0
func NewMockRescheduler(ctrl *gomock.Controller) *MockRescheduler
NewMockRescheduler creates a new mock instance.
func (*MockRescheduler) Add ¶ added in v1.17.0
func (m *MockRescheduler) Add(task Executable, rescheduleTime time.Time)
Add mocks base method.
func (*MockRescheduler) EXPECT ¶ added in v1.17.0
func (m *MockRescheduler) EXPECT() *MockReschedulerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockRescheduler) Len ¶ added in v1.17.0
func (m *MockRescheduler) Len() int
Len mocks base method.
func (*MockRescheduler) Reschedule ¶ added in v1.17.0
func (m *MockRescheduler) Reschedule(namespaceID string)
Reschedule mocks base method.
func (*MockRescheduler) Start ¶ added in v1.17.3
func (m *MockRescheduler) Start()
Start mocks base method.
func (*MockRescheduler) Stop ¶ added in v1.17.3
func (m *MockRescheduler) Stop()
Stop mocks base method.
type MockReschedulerMockRecorder ¶ added in v1.17.0
type MockReschedulerMockRecorder struct {
// contains filtered or unexported fields
}
MockReschedulerMockRecorder is the mock recorder for MockRescheduler.
func (*MockReschedulerMockRecorder) Add ¶ added in v1.17.0
func (mr *MockReschedulerMockRecorder) Add(task, rescheduleTime interface{}) *gomock.Call
Add indicates an expected call of Add.
func (*MockReschedulerMockRecorder) Len ¶ added in v1.17.0
func (mr *MockReschedulerMockRecorder) Len() *gomock.Call
Len indicates an expected call of Len.
func (*MockReschedulerMockRecorder) Reschedule ¶ added in v1.17.0
func (mr *MockReschedulerMockRecorder) Reschedule(namespaceID interface{}) *gomock.Call
Reschedule indicates an expected call of Reschedule.
func (*MockReschedulerMockRecorder) Start ¶ added in v1.17.3
func (mr *MockReschedulerMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockReschedulerMockRecorder) Stop ¶ added in v1.17.3
func (mr *MockReschedulerMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
type MockScheduler ¶ added in v1.17.0
type MockScheduler struct {
// contains filtered or unexported fields
}
MockScheduler is a mock of Scheduler interface.
func NewMockScheduler ¶ added in v1.17.0
func NewMockScheduler(ctrl *gomock.Controller) *MockScheduler
NewMockScheduler creates a new mock instance.
func (*MockScheduler) EXPECT ¶ added in v1.17.0
func (m *MockScheduler) EXPECT() *MockSchedulerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockScheduler) Start ¶ added in v1.17.0
func (m *MockScheduler) Start()
Start mocks base method.
func (*MockScheduler) Stop ¶ added in v1.17.0
func (m *MockScheduler) Stop()
Stop mocks base method.
func (*MockScheduler) Submit ¶ added in v1.17.0
func (m *MockScheduler) Submit(arg0 Executable)
Submit mocks base method.
func (*MockScheduler) TaskChannelKeyFn ¶ added in v1.18.0
func (m *MockScheduler) TaskChannelKeyFn() TaskChannelKeyFn
TaskChannelKeyFn mocks base method.
func (*MockScheduler) TrySubmit ¶ added in v1.17.0
func (m *MockScheduler) TrySubmit(arg0 Executable) bool
TrySubmit mocks base method.
type MockSchedulerMockRecorder ¶ added in v1.17.0
type MockSchedulerMockRecorder struct {
// contains filtered or unexported fields
}
MockSchedulerMockRecorder is the mock recorder for MockScheduler.
func (*MockSchedulerMockRecorder) Start ¶ added in v1.17.0
func (mr *MockSchedulerMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockSchedulerMockRecorder) Stop ¶ added in v1.17.0
func (mr *MockSchedulerMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
func (*MockSchedulerMockRecorder) Submit ¶ added in v1.17.0
func (mr *MockSchedulerMockRecorder) Submit(arg0 interface{}) *gomock.Call
Submit indicates an expected call of Submit.
func (*MockSchedulerMockRecorder) TaskChannelKeyFn ¶ added in v1.18.0
func (mr *MockSchedulerMockRecorder) TaskChannelKeyFn() *gomock.Call
TaskChannelKeyFn indicates an expected call of TaskChannelKeyFn.
func (*MockSchedulerMockRecorder) TrySubmit ¶ added in v1.17.0
func (mr *MockSchedulerMockRecorder) TrySubmit(arg0 interface{}) *gomock.Call
TrySubmit indicates an expected call of TrySubmit.
type Monitor ¶ added in v1.18.0
type Monitor interface { GetTotalPendingTaskCount() int GetSlicePendingTaskCount(slice Slice) int SetSlicePendingTaskCount(slice Slice, count int) GetReaderWatermark(readerID int64) (tasks.Key, bool) SetReaderWatermark(readerID int64, watermark tasks.Key) GetTotalSliceCount() int GetSliceCount(readerID int64) int SetSliceCount(readerID int64, count int) RemoveSlice(slice Slice) RemoveReader(readerID int64) ResolveAlert(AlertType) SilenceAlert(AlertType) AlertCh() <-chan *Alert Close() }
Monitor tracks Queue statistics and sends an Alert to the AlertCh if any statistics becomes abnormal
type MonitorOptions ¶ added in v1.18.0
type MonitorOptions struct { PendingTasksCriticalCount dynamicconfig.IntPropertyFn ReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn SliceCountCriticalThreshold dynamicconfig.IntPropertyFn }
type NamespaceIDAndDestination ¶ added in v1.24.0
NamespaceIDAndDestination is the key for grouping tasks by namespace ID and destination.
type Options ¶ added in v1.17.3
type Options struct { ReaderOptions MonitorOptions MaxPollRPS dynamicconfig.IntPropertyFn MaxPollInterval dynamicconfig.DurationPropertyFn MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn CheckpointInterval dynamicconfig.DurationPropertyFn CheckpointIntervalJitterCoefficient dynamicconfig.FloatPropertyFn MaxReaderCount dynamicconfig.IntPropertyFn }
type PaginationFnProvider ¶ added in v1.17.3
type PaginationFnProvider func(Range) collection.PaginationFn[tasks.Task]
type PriorityAssigner ¶ added in v1.17.0
type PriorityAssigner interface {
Assign(Executable) tasks.Priority
}
PriorityAssigner assigns priority to task executables
func NewNoopPriorityAssigner ¶ added in v1.17.0
func NewNoopPriorityAssigner() PriorityAssigner
func NewPriorityAssigner ¶ added in v1.17.0
func NewPriorityAssigner() PriorityAssigner
func NewStaticPriorityAssigner ¶ added in v1.23.0
func NewStaticPriorityAssigner(priority tasks.Priority) PriorityAssigner
type QueueWriter ¶ added in v1.23.0
type QueueWriter interface { CreateQueue( ctx context.Context, request *persistence.CreateQueueRequest, ) (*persistence.CreateQueueResponse, error) EnqueueTask( ctx context.Context, request *persistence.EnqueueTaskRequest, ) (*persistence.EnqueueTaskResponse, error) }
QueueWriter is a subset of persistence.HistoryTaskQueueManager.
type Range ¶ added in v1.17.0
func FromPersistenceRange ¶ added in v1.17.3
func FromPersistenceRange( r *persistencespb.QueueSliceRange, ) Range
func NewRandomOrderedRangesInRange ¶ added in v1.17.3
func NewRandomRange ¶ added in v1.17.0
func NewRandomRange() Range
func (*Range) ContainsRange ¶ added in v1.17.0
type RateLimitedSchedulerOptions ¶ added in v1.23.0
type RateLimitedSchedulerOptions struct { EnableShadowMode dynamicconfig.BoolPropertyFn StartupDelay dynamicconfig.DurationPropertyFn }
type Reader ¶ added in v1.17.3
type Reader interface { Scopes() []Scope WalkSlices(SliceIterator) SplitSlices(SliceSplitter) MergeSlices(...Slice) AppendSlices(...Slice) ClearSlices(SlicePredicate) CompactSlices(SlicePredicate) ShrinkSlices() int Notify() Pause(time.Duration) Start() Stop() }
type ReaderCompletionFn ¶ added in v1.21.0
type ReaderCompletionFn func(readerID int64)
type ReaderGroup ¶ added in v1.17.3
func NewReaderGroup ¶ added in v1.17.3
func NewReaderGroup( initializer ReaderInitializer, ) *ReaderGroup
func (*ReaderGroup) ForEach ¶ added in v1.21.0
func (g *ReaderGroup) ForEach(f func(int64, Reader))
func (*ReaderGroup) GetOrCreateReader ¶ added in v1.21.0
func (g *ReaderGroup) GetOrCreateReader(readerID int64) Reader
func (*ReaderGroup) NewReader ¶ added in v1.17.3
func (g *ReaderGroup) NewReader(readerID int64, slices ...Slice) Reader
func (*ReaderGroup) ReaderByID ¶ added in v1.17.3
func (g *ReaderGroup) ReaderByID(readerID int64) (Reader, bool)
func (*ReaderGroup) Readers ¶ added in v1.17.3
func (g *ReaderGroup) Readers() map[int64]Reader
func (*ReaderGroup) RemoveReader ¶ added in v1.18.0
func (g *ReaderGroup) RemoveReader(readerID int64)
func (*ReaderGroup) Start ¶ added in v1.17.3
func (g *ReaderGroup) Start()
func (*ReaderGroup) Stop ¶ added in v1.17.3
func (g *ReaderGroup) Stop()
type ReaderImpl ¶ added in v1.17.3
func NewReader ¶ added in v1.17.3
func NewReader( readerID int64, slices []Slice, options *ReaderOptions, scheduler Scheduler, rescheduler Rescheduler, timeSource clock.TimeSource, ratelimiter quotas.RequestRateLimiter, monitor Monitor, completionFn ReaderCompletionFn, logger log.Logger, metricsHandler metrics.Handler, ) *ReaderImpl
func (*ReaderImpl) AppendSlices ¶ added in v1.18.0
func (r *ReaderImpl) AppendSlices(incomingSlices ...Slice)
func (*ReaderImpl) ClearSlices ¶ added in v1.17.3
func (r *ReaderImpl) ClearSlices(predicate SlicePredicate)
func (*ReaderImpl) CompactSlices ¶ added in v1.18.0
func (r *ReaderImpl) CompactSlices(predicate SlicePredicate)
func (*ReaderImpl) MergeSlices ¶ added in v1.17.3
func (r *ReaderImpl) MergeSlices(incomingSlices ...Slice)
func (*ReaderImpl) Notify ¶ added in v1.21.0
func (r *ReaderImpl) Notify()
func (*ReaderImpl) Pause ¶ added in v1.17.3
func (r *ReaderImpl) Pause(duration time.Duration)
func (*ReaderImpl) Scopes ¶ added in v1.17.3
func (r *ReaderImpl) Scopes() []Scope
func (*ReaderImpl) ShrinkSlices ¶ added in v1.17.3
func (r *ReaderImpl) ShrinkSlices() int
Shrink all queue slices, returning the number of tasks removed (completed)
func (*ReaderImpl) SplitSlices ¶ added in v1.17.3
func (r *ReaderImpl) SplitSlices(splitter SliceSplitter)
func (*ReaderImpl) Start ¶ added in v1.17.3
func (r *ReaderImpl) Start()
func (*ReaderImpl) Stop ¶ added in v1.17.3
func (r *ReaderImpl) Stop()
func (*ReaderImpl) WalkSlices ¶ added in v1.17.3
func (r *ReaderImpl) WalkSlices(iterator SliceIterator)
type ReaderInitializer ¶ added in v1.17.3
type ReaderOptions ¶ added in v1.17.3
type ReaderOptions struct { BatchSize dynamicconfig.IntPropertyFn MaxPendingTasksCount dynamicconfig.IntPropertyFn PollBackoffInterval dynamicconfig.DurationPropertyFn }
type Rescheduler ¶ added in v1.17.0
type Rescheduler interface { // Add task executable to the rescheduler. Add(task Executable, rescheduleTime time.Time) // Reschedule triggers an immediate reschedule for provided namespace // ignoring executable's reschedule time. // Used by namespace failover logic Reschedule(namespaceID string) // Len returns the total number of task executables waiting to be rescheduled. Len() int Start() Stop() }
Rescheduler buffers task executables that are failed to process and resubmit them to the task scheduler when the Reschedule method is called.
type Scheduler ¶ added in v1.17.0
type Scheduler interface { Start() Stop() Submit(Executable) TrySubmit(Executable) bool TaskChannelKeyFn() TaskChannelKeyFn }
Scheduler is the component for scheduling and processing task executables, it's based on the common/tasks.Scheduler interface and provide the additional information of how tasks are grouped during scheduling.
func NewRateLimitedScheduler ¶ added in v1.23.0
func NewRateLimitedScheduler( baseScheduler Scheduler, options RateLimitedSchedulerOptions, currentClusterName string, namespaceRegistry namespace.Registry, rateLimiter SchedulerRateLimiter, timeSource clock.TimeSource, logger log.Logger, metricsHandler metrics.Handler, ) Scheduler
func NewScheduler ¶ added in v1.17.0
type SchedulerOptions ¶ added in v1.17.0
type SchedulerOptions struct { WorkerCount dynamicconfig.IntPropertyFn ActiveNamespaceWeights dynamicconfig.MapPropertyFnWithNamespaceFilter StandbyNamespaceWeights dynamicconfig.MapPropertyFnWithNamespaceFilter }
type SchedulerRateLimiter ¶ added in v1.19.0
type SchedulerRateLimiter quotas.RequestRateLimiter
func NewPrioritySchedulerRateLimiter ¶ added in v1.23.0
func NewPrioritySchedulerRateLimiter( namespaceRateFn quotas.NamespaceRateFn, hostRateFn quotas.RateFn, persistenceNamespaceRateFn quotas.NamespaceRateFn, persistenceHostRateFn quotas.RateFn, ) (SchedulerRateLimiter, error)
type Scope ¶ added in v1.17.0
func FromPersistenceScope ¶ added in v1.17.3
func FromPersistenceScope( scope *persistencespb.QueueSliceScope, ) Scope
func NewRandomScopes ¶ added in v1.17.3
func (*Scope) CanMergeByPredicate ¶ added in v1.17.0
func (*Scope) CanMergeByRange ¶ added in v1.17.0
func (*Scope) CanSplitByRange ¶ added in v1.17.0
func (*Scope) MergeByPredicate ¶ added in v1.17.0
func (*Scope) MergeByRange ¶ added in v1.17.0
func (*Scope) SplitByPredicate ¶ added in v1.17.0
type Slice ¶ added in v1.17.3
type Slice interface { Scope() Scope CanSplitByRange(tasks.Key) bool SplitByRange(tasks.Key) (left Slice, right Slice) SplitByPredicate(tasks.Predicate) (pass Slice, fail Slice) CanMergeWithSlice(Slice) bool MergeWithSlice(Slice) []Slice CompactWithSlice(Slice) Slice ShrinkScope() int SelectTasks(readerID int64, batchSize int) ([]Executable, error) MoreTasks() bool TaskStats() TaskStats Clear() }
Slice manages the loading and status tracking of all tasks within its Scope. It also provides methods for splitting or merging with another slice either by range or by predicate.
type SliceImpl ¶ added in v1.17.3
type SliceImpl struct {
// contains filtered or unexported fields
}
func NewSlice ¶ added in v1.17.3
func NewSlice( paginationFnProvider PaginationFnProvider, executableFactory ExecutableFactory, monitor Monitor, scope Scope, grouper Grouper, ) *SliceImpl
func (*SliceImpl) CanMergeWithSlice ¶ added in v1.17.3
func (*SliceImpl) CanSplitByRange ¶ added in v1.17.3
func (*SliceImpl) CompactWithSlice ¶ added in v1.18.0
func (*SliceImpl) MergeWithSlice ¶ added in v1.17.3
func (*SliceImpl) SelectTasks ¶ added in v1.17.3
func (s *SliceImpl) SelectTasks(readerID int64, batchSize int) ([]Executable, error)
func (*SliceImpl) ShrinkScope ¶ added in v1.18.0
func (*SliceImpl) SplitByPredicate ¶ added in v1.17.3
func (*SliceImpl) SplitByRange ¶ added in v1.17.3
type SliceIterator ¶ added in v1.17.3
type SliceIterator func(s Slice)
type SlicePredicate ¶ added in v1.17.3
type SliceSplitter ¶ added in v1.17.3
type SpeculativeWorkflowTaskTimeoutQueue ¶ added in v1.21.0
type SpeculativeWorkflowTaskTimeoutQueue struct {
// contains filtered or unexported fields
}
func NewSpeculativeWorkflowTaskTimeoutQueue ¶ added in v1.21.0
func NewSpeculativeWorkflowTaskTimeoutQueue( scheduler ctasks.Scheduler[ctasks.Task], priorityAssigner PriorityAssigner, executor Executor, namespaceRegistry namespace.Registry, clusterMetadata cluster.Metadata, timeSource clock.TimeSource, metricsHandler metrics.Handler, logger log.SnTaggedLogger, ) *SpeculativeWorkflowTaskTimeoutQueue
func (SpeculativeWorkflowTaskTimeoutQueue) Category ¶ added in v1.21.0
func (q SpeculativeWorkflowTaskTimeoutQueue) Category() tasks.Category
func (SpeculativeWorkflowTaskTimeoutQueue) FailoverNamespace ¶ added in v1.21.0
func (q SpeculativeWorkflowTaskTimeoutQueue) FailoverNamespace(_ string)
func (SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks ¶ added in v1.21.0
func (q SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks(ts []tasks.Task)
func (SpeculativeWorkflowTaskTimeoutQueue) Start ¶ added in v1.21.0
func (q SpeculativeWorkflowTaskTimeoutQueue) Start()
func (SpeculativeWorkflowTaskTimeoutQueue) Stop ¶ added in v1.21.0
func (q SpeculativeWorkflowTaskTimeoutQueue) Stop()
type StaleStateError ¶
type StaleStateError struct {
Message string
}
StaleStateError is an indicator that after loading the state for a task it was detected as stale. It's possible that state reload solves this issue but otherwise it is unexpected and considered terminal.
func NewStateStaleError ¶
func NewStateStaleError(message string) StaleStateError
NewStateStaleError returns a new StateStaleError from given message.
func (StaleStateError) Error ¶
func (e StaleStateError) Error() string
func (StaleStateError) IsTerminalTaskError ¶
func (StaleStateError) IsTerminalTaskError() bool
IsTerminalTaskError marks this error as terminal to be handled appropriately.
type StateMachineTaskTypeNamespaceIDAndDestination ¶ added in v1.24.0
type StateMachineTaskTypeNamespaceIDAndDestination struct { StateMachineTaskType int32 NamespaceID string Destination string }
StateMachineTaskTypeNamespaceIDAndDestination is the key for grouping tasks by task type namespace ID and destination.
type TaskChannelKey ¶ added in v1.18.0
type TaskChannelKeyFn ¶ added in v1.18.0
type TaskChannelKeyFn = tasks.TaskChannelKeyFn[Executable, TaskChannelKey]
type UnprocessableTaskError ¶ added in v1.24.0
type UnprocessableTaskError struct {
Message string
}
UnprocessableTaskError is an indicator that an executor does not know how to handle a task. Considered terminal.
func NewUnprocessableTaskError ¶ added in v1.24.0
func NewUnprocessableTaskError(message string) UnprocessableTaskError
NewUnprocessableTaskError returns a new UnprocessableTaskError from given message.
func (UnprocessableTaskError) Error ¶ added in v1.24.0
func (e UnprocessableTaskError) Error() string
func (UnprocessableTaskError) IsTerminalTaskError ¶ added in v1.24.0
func (UnprocessableTaskError) IsTerminalTaskError() bool
IsTerminalTaskError marks this error as terminal to be handled appropriately.
Source Files ¶
- action.go
- action_pending_task_count.go
- action_reader_stuck.go
- action_slice_count.go
- action_slice_predicate.go
- active_standby_executor.go
- alerts.go
- convert.go
- dlq_writer.go
- executable.go
- executable_factory.go
- executable_mock.go
- grouper.go
- iterator.go
- memory_scheduled_queue.go
- metrics.go
- mitigator.go
- monitor.go
- priority_assigner.go
- queue.go
- queue_base.go
- queue_immediate.go
- queue_mock.go
- queue_scheduled.go
- range.go
- reader.go
- reader_group.go
- reader_quotas.go
- rescheduler.go
- rescheduler_mock.go
- scheduler.go
- scheduler_mock.go
- scheduler_monitor.go
- scheduler_quotas.go
- scope.go
- slice.go
- speculative_workflow_task_timeout_executable.go
- speculative_workflow_task_timeout_queue.go
- test_util.go
- tracker.go
- util.go