Version: v1.0.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2023 License: MIT Imports: 22 Imported by: 2




View Source
const (

	// BatcherTaskListName is the tasklist name
	BatcherTaskListName = "cadence-sys-batcher-tasklist"
	// BatchWFTypeName is the workflow type
	BatchWFTypeName = "cadence-sys-batch-workflow"

	// InfiniteDuration is a long duration(20 yrs) we used for infinite workflow running
	InfiniteDuration = 20 * 365 * 24 * time.Hour

	// DefaultRPS is the default RPS
	DefaultRPS = 50
	// DefaultConcurrency is the default concurrency
	DefaultConcurrency = 5
	// DefaultPageSize is the default page size
	DefaultPageSize = 1000
	// DefaultAttemptsOnRetryableError is the default value for AttemptsOnRetryableError
	DefaultAttemptsOnRetryableError = 50
	// DefaultActivityHeartBeatTimeout is the default value for ActivityHeartBeatTimeout
	DefaultActivityHeartBeatTimeout = time.Second * 10
View Source
const (
	// BatchTypeTerminate is batch type for terminating workflows
	BatchTypeTerminate = "terminate"
	// BatchTypeCancel is the batch type for canceling workflows
	BatchTypeCancel = "cancel"
	// BatchTypeSignal is batch type for signaling workflows
	BatchTypeSignal = "signal"
	// BatchTypeReplicate is batch type for replicating workflows
	BatchTypeReplicate = "replicate"


AllBatchTypes is the batch types we supported


This section is empty.


type BatchParams

type BatchParams struct {
	// Target domain to execute batch operation
	DomainName string
	// To get the target workflows for processing
	Query string
	// Reason for the operation
	Reason string
	// Supporting: reset,terminate
	BatchType string

	// Below are all optional
	// TerminateParams is params only for BatchTypeTerminate
	TerminateParams TerminateParams
	// CancelParams is params only for BatchTypeCancel
	CancelParams CancelParams
	// SignalParams is params only for BatchTypeSignal
	SignalParams SignalParams
	// ReplicateParams is params only for BatchTypeReplicate
	ReplicateParams ReplicateParams
	// RPS of processing. Default to DefaultRPS
	// TODO we will implement smarter way than this static rate limiter:
	RPS int
	// Number of goroutines running in parallel to process
	Concurrency int
	// Number of workflows processed in a batch
	PageSize int
	// Number of attempts for each workflow to process in case of retryable error before giving up
	AttemptsOnRetryableError int
	// timeout for activity heartbeat
	ActivityHeartBeatTimeout time.Duration
	// errors that will not retry which consumes AttemptsOnRetryableError. Default to empty
	NonRetryableErrors []string
	// contains filtered or unexported fields

BatchParams is the parameters for batch operation workflow

type Batcher

type Batcher struct {
	// contains filtered or unexported fields

Batcher is the background sub-system that execute workflow for batch operations It is also the context object that get's passed around within the scanner workflows / activities

func New

func New(params *BootstrapParams) *Batcher

New returns a new instance of batcher daemon Batcher

func (*Batcher) Start

func (s *Batcher) Start() error

Start starts the scanner

type BootstrapParams

type BootstrapParams struct {
	// Config contains the configuration for scanner
	Config Config
	// ServiceClient is an instance of cadence service client
	ServiceClient workflowserviceclient.Interface
	// MetricsClient is an instance of metrics object for emitting stats
	MetricsClient metrics.Client
	Logger        log.Logger
	// TallyScope is an instance of tally metrics scope
	TallyScope tally.Scope
	// ClientBean is an instance of client.Bean for a collection of clients
	ClientBean client.Bean

BootstrapParams contains the set of params needed to bootstrap the batcher sub-system

type CancelParams

type CancelParams struct {
	// this indicates whether to cancel children workflow. Default to true.
	// TODO
	// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
	CancelChildren *bool

CancelParams is the parameters for canceling workflow

type Config

type Config struct {
	AdminOperationToken dynamicconfig.StringPropertyFn
	// ClusterMetadata contains the metadata for this cluster
	ClusterMetadata cluster.Metadata

Config defines the configuration for batcher

type HeartBeatDetails

type HeartBeatDetails struct {
	PageToken   []byte
	CurrentPage int
	// This is just an estimation for visibility
	TotalEstimate int64
	// Number of workflows processed successfully
	SuccessCount int
	// Number of workflows that give up due to errors.
	ErrorCount int

HeartBeatDetails is the struct for heartbeat details

func BatchActivity

func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetails, error)

BatchActivity is activity for processing batch operation

func BatchWorkflow

func BatchWorkflow(ctx workflow.Context, batchParams BatchParams) (HeartBeatDetails, error)

BatchWorkflow is the workflow that runs a batch job of resetting workflows

type ReplicateParams added in v0.24.0

type ReplicateParams struct {
	SourceCluster string
	TargetCluster string

ReplicateParams is the parameters for replicating workflow

type SignalParams

type SignalParams struct {
	SignalName string
	Input      string

SignalParams is the parameters for signaling workflow

type TerminateParams

type TerminateParams struct {
	// this indicates whether to terminate children workflow. Default to true.
	// TODO
	// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
	TerminateChildren *bool

TerminateParams is the parameters for terminating workflow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL