Documentation ¶
Overview ¶
Package dlq contains the workflow for deleting and re-enqueueing DLQ tasks. Both of these operations are performed by the same workflow to avoid concurrent deletion and re-enqueueing of the same task.
Index ¶
Constants ¶
const ( // WorkflowName is the name of the DLQ workflow. WorkflowName = "temporal-sys-dlq-workflow" // WorkflowTypeDelete is what the value of WorkflowParams.WorkflowType should be to delete DLQ tasks. When this is // specified, the workflow will simply delete all tasks up to the specified max message ID. WorkflowTypeDelete = "delete" // WorkflowTypeMerge is for re-enqueuing DLQ tasks. When this is specified, the workflow will operate in batches. // For each batch, it will read up to MergeParams.BatchSize tasks from the DLQ, re-enqueue them, and then delete // them from the DLQ. It will repeat this process until it reaches the specified max message ID. WorkflowTypeMerge = "merge" // MaxMergeBatchSize is the maximum value for MergeParams.BatchSize. MaxMergeBatchSize = 1000 // DefaultMergeBatchSize is the default value for MergeParams.BatchSize. DefaultMergeBatchSize = 100 // QueryTypeProgress is the query to get the progress of the DLQ workflow. QueryTypeProgress = "dlq-job-progress-query" )
Variables ¶
var ( // Module provides a [workercommon.WorkerComponent] annotated with [workercommon.WorkerComponentTag] to the graph, // given a [HistoryClient], a [TaskClientDialer], and a value for [CurrentClusterName]. Module = workercommon.AnnotateWorkerComponentProvider(newComponent) ErrNegativeBatchSize = errors.New("BatchSize must be positive or 0 to use the default") ErrMergeBatchSizeTooLarge = errors.New("BatchSize too large") )
Functions ¶
This section is empty.
Types ¶
type AddTasksFn ¶
type AddTasksFn func( ctx context.Context, req *adminservice.AddTasksRequest, ) (*adminservice.AddTasksResponse, error)
AddTasksFn provides a convenient method for implementing the TaskClient interface.
func (AddTasksFn) AddTasks ¶
func (f AddTasksFn) AddTasks( ctx context.Context, in *adminservice.AddTasksRequest, ) (*adminservice.AddTasksResponse, error)
AddTasks implements TaskClient by calling the AddTasksFn with the request.
type CurrentClusterName ¶
type CurrentClusterName string
CurrentClusterName is its own type just to make fx injection easier. It's similar to the same type that exists in the persistence package, but I thought that re-using that would look weird here because it has nothing to do with persistence.
type DeleteParams ¶
DeleteParams contain the target DLQ and the max message ID to delete up to.
type HistoryClient ¶
type HistoryClient interface { DeleteDLQTasks( ctx context.Context, in *historyservice.DeleteDLQTasksRequest, opts ...grpc.CallOption, ) (*historyservice.DeleteDLQTasksResponse, error) GetDLQTasks( ctx context.Context, in *historyservice.GetDLQTasksRequest, opts ...grpc.CallOption, ) (*historyservice.GetDLQTasksResponse, error) }
HistoryClient contains the subset of methods from historyservice.HistoryServiceClient that we need, to make it easier to implement in tests.
type Key ¶
type Key struct { // TaskCategoryID is the id used by [go.temporal.io/server/service/history/tasks.TaskCategoryRegistry]. TaskCategoryID int // SourceCluster is the cluster that the replication tasks are coming from if the task category is replication. // Otherwise, it is equal to TargetCluster, which is the cluster that both the DLQ workflow is running in, and // the cluster that contains the DLQ itself. SourceCluster string // TargetCluster is always the cluster that the DLQ workflow is running in currently. However, that may change // if we add cross-cluster tasks in the future. TargetCluster string }
Key uniquely identifies a DLQ.
type MergeParams ¶
type MergeParams struct { Key // MaxMessageID is inclusive. MaxMessageID int64 // BatchSize controls the number of tasks to both read and re-enqueue at a time. // The maximum is MaxMergeBatchSize. The default is DefaultMergeBatchSize. BatchSize int }
MergeParams contain the target DLQ and the max message ID to merge up to.
type ProgressQueryResponse ¶
type ProgressQueryResponse struct { MaxMessageIDToProcess int64 LastProcessedMessageID int64 NumberOfMessagesProcessed int64 WorkflowType string DlqKey Key }
ProgressQueryResponse is the response to progress query.
type TaskClient ¶
type TaskClient interface { AddTasks( ctx context.Context, in *adminservice.AddTasksRequest, ) (*adminservice.AddTasksResponse, error) }
TaskClient contains the subset of methods from adminservice.AdminServiceClient that we need, to make it easier to implement in tests.
type TaskClientDialer ¶
type TaskClientDialer interface { // Dial returns a [TaskClient] given a cluster name. You don't need to close this client. Note that some // implementations will cache clients. Dial(ctx context.Context, cluster string) (TaskClient, error) }
TaskClientDialer is a function that returns a TaskClient given a cluster name.
type TaskClientDialerFn ¶
type TaskClientDialerFn func(ctx context.Context, address string) (TaskClient, error)
TaskClientDialerFn is a function that returns a TaskClient given an address.
func (TaskClientDialerFn) Dial ¶
func (f TaskClientDialerFn) Dial(ctx context.Context, cluster string) (TaskClient, error)
Dial implements TaskClientDialer by calling the TaskClientDialerFn with the cluster name.
type WorkflowParams ¶
type WorkflowParams struct { // WorkflowType options are available via the WorkflowType* constants. WorkflowType string // DeleteParams is only used for WorkflowTypeDelete. DeleteParams DeleteParams // MergeParams is only used for WorkflowTypeMerge. MergeParams MergeParams }
WorkflowParams is the single argument to the DLQ workflow.