cadence

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2017 License: MIT Imports: 37 Imported by: 0

README

Go framework for Cadence Build Status Coverage Status

Cadence is a distributed, scalable, durable, and highly available orchestration engine we developed at Uber Engineering to execute asynchronous long-running business logic in a scalable and resilient way.

cadence-client is the framework for authoring workflows and activites.

How to use

Make sure you clone this repo into the correct location.

git clone git@github.com:uber/cadence.git $GOPATH/src/go.uber.org/cadence

or

go get go.uber.org/cadence

See samples to get started

Activity

Activity is the implementation of a particular task in the business logic.

Activities are implemented as functions. Data can be passed directly to an activity via function parameters. The parameters can be either basic types or structs, with the only requirement being that the parameters need to be serializable. Even though it is not required, we recommand that the first parameter of an activity function is of type context.Context, in order to allow the activity to interact with other framework methods. The function must return an error value, and can optionally return a result value. The result value can be either a basic type or a struct with the only requirement being that it is serializable.

The values passed to activities through invocation parameters or returned through the result value is recorded in the execution history. The entire execution history is transfered from the Cadence service to workflow workers with every event that the workflow logic needs to process. A large execution history can thus adversily impact the performance of your workflow. Therefore be mindful of the amount of data you transfer via activity invocation parameters or return values. Other than that no additional limitations exist on activity implementations.

In order to make the activity visible to the worker process hosting it, the activity needs to be registered via a call to cadence.RegisterActivity.

package simple

import (
	"context"

	"go.uber.org/cadence"
	"go.uber.org/zap"
)

func init() {
	cadence.RegisterActivity(SimpleActivity)
}

// SimpleActivity is a sample Cadence activity function that takes one parameter and
// returns a string containing the parameter value.
func SimpleActivity(ctx context.Context, value string) (string, error) {
	cadence.GetActivityLogger(ctx).Info("SimpleActivity called.", zap.String("Value", value))
	return "Processed: " + value, nil
}
Workflow

Workflow is the implementation of coordination logic. Its sole purpose is to orchestrate activity executions.

Workflows are implemented as functions. Startup data can be passed to a workflow via function parameters. The parameters can be either basic types or structs, with the only requirement being that the parameters need to be serializable. The first parameter of a workflow function is of type cadence.Context. The function must return an error value, and can optional return a result value. The result value can be either a basic type or a struct with the only requirement being that the it is serializable.

Workflow functions need to execute deterministically. Therefore, here is a list of rules that workflow code should obey to be a good Cadence citizen:

  • Use cadence.Context everywhere.
  • Don’t use range over map.
  • Use cadence.SideEffect to call rand and similar nondeterministic functions like UUID generator.
  • Use cadence.Now to get current time. Use cadence.NewTimer or cadence.Sleep instead of standard Go functions.
  • Don’t use native channel and select. Use cadence.Channel and cadence.Selector.
  • Don’t use go func(...). Use cadence.Go(func(...)).
  • Don’t use non constant global variables as multiple instances of a workflow function can be executing in parallel.
  • Don’t use any blocking functions besides belonging to Channel, Selector or Future
  • Don’t use any synchronization primitives as they can cause blockage and there is no possibility of races when running under dispatcher.
  • Don’t change workflow code when there are open workflows using it. Cadence is going to provide versioning mechanism to deal with deploying code changes without breaking existing workflows.
  • Don’t perform any IO or service calls as they are not usually deterministic. Use activities for that.
  • Don’t access configuration APIs directly from a workflow as changes in configuration will affect the workflow execution path. Either return configuration from an activity or use cadence.SideEffect to load it.

In order to make the workflow visible to the worker process hosting it, the workflow needs to be registered via a call to cadence.RegisterWorkflow.

package simple

import (
	"time"

	"go.uber.org/cadence"
	"go.uber.org/zap"
)

func init() {
	cadence.RegisterWorkflow(SimpleWorkflow)
}

// SimpleWorkflow is a sample Cadence workflow that accepts one parameter and
// executes an activity to which it passes the aforementioned parameter.
func SimpleWorkflow(ctx cadence.Context, value string) error {
	options := cadence.ActivityOptions{
		ScheduleToStartTimeout: time.Second * 60,
		StartToCloseTimeout:    time.Second * 60,
	}
	ctx = cadence.WithActivityOptions(ctx, options)

	var result string
	err := cadence.ExecuteActivity(ctx, activity.SimpleActivity, value).Get(ctx, &result)
	if err != nil {
		return err
	}
	cadence.GetLogger(ctx).Info(
		"SimpleActivity returned successfully!", zap.String("Result", result))

	cadence.GetLogger(ctx).Info("SimpleWorkflow completed!")
	return nil
}
Worker

A worker or “worker service” is a services hosting the workflow and activity implementations. The worker polls the “Cadence service” for tasks, performs those tasks and communicates task execution results back to the “Cadence service”. Worker services are developed, deployed and operated by Cadence customers.

You can run a Cadence worker in a new or an exiting service. Use the framework APIs to start the Cadence worker and link in all activity and workflow implementations that you require this service to execute.

package main

import (
	"github.com/uber/tchannel-go"
	"github.com/uber/tchannel-go/thrift"

	"github.com/uber-go/tally"

	"go.uber.org/cadence"
	t "go.uber.org/cadence/.gen/go/cadence"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

var HostPort = "127.0.0.1:7933"
var Domain = "SimpleDomain"
var TaskListName = "SimpleWorker"
var ClientName = "SimpleWorker"
var CadenceService = "CadenceServiceFrontend"

func main() {
	startWorker(
		buildLogger(),
		buildCadenceClient())
}

func buildLogger() *zap.Logger {
	config := zap.NewDevelopmentConfig()
	config.Level.SetLevel(zapcore.InfoLevel)

	var err error
	logger, err := config.Build()
	if err != nil {
		panic("Failed to setup logger")
	}

	return logger
}

func buildCadenceClient() t.TChanWorkflowService {
	tchan, err := tchannel.NewChannel(ClientName, nil)
	if err != nil {
		panic("Failed to setup channel")
	}

	opts := &thrift.ClientOptions{
		HostPort: HostPort,
	}
	return t.NewTChanWorkflowServiceClient(
		thrift.NewClient(tchan, CadenceService, opts))
}

func startWorker(logger *zap.Logger, client t.TChanWorkflowService) {
	// TaskListName - identifies set of client workflows, activities and workers.
	// it could be your group or client or application name.
	workerOptions := cadence.WorkerOptions{
		Logger:       logger,
		MetricsScope: tally.NewTestScope(TaskListName, map[string]string{}),
	}

	worker := cadence.NewWorker(
		client,
		Domain,
		TaskListName,
		workerOptions)
	err := worker.Start()
	if err != nil {
		panic("Failed to start worker")
	}

	logger.Info("Started Worker.", zap.String("worker", TaskListName))
}

Contributing

We'd love your help in making Cadence-client great. Please review our instructions.

License

MIT License, please see LICENSE for details.

Documentation

Index

Constants

View Source
const LibraryVersion = "v0.3.1"

LibraryVersion is a semver string that represents the version of this cadence client library it will be embedded as a "version" header in every rpc call made by this client to cadence server. In addition, the version string will be used by the server to enforce compatibility checks Update to this version number is typically done by the cadence team as part of a major feature or behavior change

View Source
const QueryTypeStackTrace string = "__stack_trace"

QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call stack of the workflow. The result will be a string encoded in the EncodedValue.

Variables

View Source
var ErrActivityResultPending = errors.New("not error: do not autocomplete, using Client.CompleteActivity() to complete")

ErrActivityResultPending is returned from activity's implementation to indicate the activity is not completed when activity method returns. Activity needs to be completed by Client.CompleteActivity() separately. For example, if an activity require human interaction (like approve an expense report), the activity could return ErrActivityResultPending which indicate the activity is not done yet. Then, when the waited human action happened, it needs to trigger something that could report the activity completed event to cadence server via Client.CompleteActivity() API.

View Source
var ErrCanceled = NewCanceledError()

ErrCanceled is the error returned by Context.Err when the context is canceled.

ErrDeadlineExceeded is the error returned by Context.Err when the context's deadline passes.

Functions

func AddActivityRegistrationInterceptor

func AddActivityRegistrationInterceptor(
	i func(name string, activity interface{}) (string, interface{}))

AddActivityRegistrationInterceptor adds interceptor that is called for each RegisterActivity call. This function guarantees that the interceptor function is called for each registration even if it itself is called from init()

func AddWorkflowRegistrationInterceptor

func AddWorkflowRegistrationInterceptor(
	i func(name string, workflow interface{}) (string, interface{}),
)

AddWorkflowRegistrationInterceptor adds interceptor that is called for each RegisterWorkflow call. This function guarantees that the interceptor function is called for each registration even if it itself is called from init()

func DeserializeFnResults

func DeserializeFnResults(result []byte, to interface{}) error

DeserializeFnResults de-serializes a function results. The input result doesn't include the error. The cadence server has result, error. This is to de-serialize the result.

func EnableVerboseLogging

func EnableVerboseLogging(enable bool)

EnableVerboseLogging enable or disable verbose logging. This is for internal use only.

func GetActivityLogger

func GetActivityLogger(ctx context.Context) *zap.Logger

GetActivityLogger returns a logger that can be used in activity

func GetActivityMetricsScope

func GetActivityMetricsScope(ctx context.Context) tally.Scope

GetActivityMetricsScope returns a metrics scope that can be used in activity

func GetLogger

func GetLogger(ctx Context) *zap.Logger

GetLogger returns a logger to be used in workflow's context

func GetMetricsScope

func GetMetricsScope(ctx Context) tally.Scope

GetMetricsScope returns a metrics scope to be used in workflow's context

func GetWorkflowStackTrace

func GetWorkflowStackTrace(h *s.History) (string, error)

GetWorkflowStackTrace returns a stack trace of all goroutines of a workflow given its current history. It requires workflow function that was used to create the history to be registered through RegisterWorkflow. Use Client.GetWorkflowStackTrace to get a stack trace given workflowID and runID.

func Go

func Go(ctx Context, f func(ctx Context))

Go creates a new coroutine. It has similar semantic to goroutine in a context of the workflow.

func GoNamed

func GoNamed(ctx Context, name string, f func(ctx Context))

GoNamed creates a new coroutine with a given human readable name. It has similar semantic to goroutine in a context of the workflow. Name appears in stack traces that are blocked on this Channel.

func NewFuture

func NewFuture(ctx Context) (Future, Settable)

NewFuture creates a new future as well as associated Settable that is used to set its value.

func Now

func Now(ctx Context) time.Time

Now returns the current time when the decision is started or replayed. The workflow needs to use this Now() to get the wall clock time instead of the Go lang library one.

func RecordActivityHeartbeat

func RecordActivityHeartbeat(ctx context.Context, details ...interface{})

RecordActivityHeartbeat sends heartbeat for the currently executing activity If the activity is either cancelled (or) workflow/activity doesn't exist then we would cancel the context with error context.Canceled.

TODO: we don't have a way to distinguish between the two cases when context is cancelled because
context doesn't support overriding value of ctx.Error.
TODO: Implement automatic heartbeating with cancellation through ctx.

details - the details that you provided here can be seen in the worflow when it receives TimeoutError, you

can check error TimeOutType()/Details().

func RegisterActivity

func RegisterActivity(activityFunc interface{})

RegisterActivity - register a activity function with the framework. A activity takes a context and input and returns a (result, error) or just error. Examples:

func sampleActivity(ctx context.Context, input []byte) (result []byte, err error)
func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error)
func sampleActivity(ctx context.Context) (err error)
func sampleActivity() (result string, err error)
func sampleActivity(arg1 bool) (result int, err error)
func sampleActivity(arg1 bool) (err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if activityFunc doesn't comply with the expected format.

func RegisterActivityWithOptions

func RegisterActivityWithOptions(activityFunc interface{}, opts RegisterActivityOptions)

RegisterActivityWithOptions registers the activity function with options The user can use options to provide an external name for the activity or leave it empty if no external name is required. This can be used as client.RegisterActivity(barActivity, RegisterActivityOptions{}) client.RegisterActivity(barActivity, RegisterActivityOptions{Name: "barExternal"}) A activity takes a context and input and returns a (result, error) or just error. Examples:

func sampleActivity(ctx context.Context, input []byte) (result []byte, err error)
func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error)
func sampleActivity(ctx context.Context) (err error)
func sampleActivity() (result string, err error)
func sampleActivity(arg1 bool) (result int, err error)
func sampleActivity(arg1 bool) (err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if activityFunc doesn't comply with the expected format.

func RegisterWorkflow

func RegisterWorkflow(workflowFunc interface{})

RegisterWorkflow - registers a workflow function with the framework. A workflow takes a cadence context and input and returns a (result, error) or just error. Examples:

func sampleWorkflow(ctx cadence.Context, input []byte) (result []byte, err error)
func sampleWorkflow(ctx cadence.Context, arg1 int, arg2 string) (result []byte, err error)
func sampleWorkflow(ctx cadence.Context) (result []byte, err error)
func sampleWorkflow(ctx cadence.Context, arg1 int) (result string, err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if workflowFunc doesn't comply with the expected format.

func RegisterWorkflowWithOptions

func RegisterWorkflowWithOptions(workflowFunc interface{}, opts RegisterWorkflowOptions)

RegisterWorkflowWithOptions registers the workflow function with options The user can use options to provide an external name for the workflow or leave it empty if no external name is required. This can be used as client.RegisterWorkflow(sampleWorkflow, RegisterWorkflowOptions{}) client.RegisterWorkflow(sampleWorkflow, RegisterWorkflowOptions{Name: "foo"}) A workflow takes a cadence context and input and returns a (result, error) or just error. Examples:

func sampleWorkflow(ctx cadence.Context, input []byte) (result []byte, err error)
func sampleWorkflow(ctx cadence.Context, arg1 int, arg2 string) (result []byte, err error)
func sampleWorkflow(ctx cadence.Context) (result []byte, err error)
func sampleWorkflow(ctx cadence.Context, arg1 int) (result string, err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if workflowFunc doesn't comply with the expected format.

func RequestCancelWorkflow

func RequestCancelWorkflow(ctx Context, workflowID, runID string) error

RequestCancelWorkflow can be used to request cancellation of an external workflow. - workflowID - name of the workflow ID. - runID - Optional - indicates the instance of a workflow. You can specify the domain of the workflow using the context like

ctx := WithWorkflowDomain(ctx, "domain-name")

func SerializeFnArgs

func SerializeFnArgs(args ...interface{}) ([]byte, error)

SerializeFnArgs serializes an activity function arguments.

func SetQueryHandler added in v0.3.2

func SetQueryHandler(ctx Context, queryType string, handler interface{}) error

SetQueryHandler sets the query handler to handle workflow query. The queryType specify which query type this handler should handle. The handler must be a function that returns 2 values. The first return value must be a serializable result. The second return value must be an error. The handler function could receive any number of input parameters. All the input parameter must be serializable. You should call cadence.SetQueryHandler() at the beginning of the workflow code. When client calls Client.QueryWorkflow() to cadence server, a task will be generated on server that will be dispatched to a workflow worker, which will replay the history events and then execute a query handler based on the query type. The query handler will be invoked out of the context of the workflow, meaning that the handler code must not use cadence context to do things like cadence.NewChannel(), cadence.Go() or to call any workflow blocking functions like Channel.Get() or Future.Get(). Trying to do so in query handler code will fail the query and client will receive QueryFailedError. Example of workflow code that support query type "current_state":

 func MyWorkflow(ctx cadence.Context, input string) error {
   currentState := "started" // this could be any serializable struct
   err := cadence.SetQueryHandler(ctx, "current_state", func() (string, error) {
     return currentState, nil
   })
   if err != nil {
     currentState = "failed to register query handler"
     return err
   }
   // your normal workflow code begins here, and you update the currentState as the code makes progress.
   currentState = "waiting timer"
   err = NewTimer(ctx, time.Hour).Get(ctx, nil)
   if err != nil {
     currentState = "timer failed"
     return err
   }

   currentState = "waiting activity"
   ctx = WithActivityOptions(ctx, myActivityOptions)
   err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil)
   if err != nil {
     currentState = "activity failed"
     return err
   }
   currentState = "done"
   return nil
}

func Sleep

func Sleep(ctx Context, d time.Duration) (err error)

Sleep pauses the current goroutine for at least the duration d. A negative or zero duration causes Sleep to return immediately.

  • The current timer resolution implementation is in seconds but is subjected to change.
  • The workflow needs to use this Sleep() to sleep instead of the Go lang library one(timer.Sleep())
  • You can also cancel the pending sleep using context(WithCancel(ctx)) and that will cancel the sleep with error TimerCanceledError.

func WithActivityTask

func WithActivityTask(
	ctx context.Context,
	task *shared.PollForActivityTaskResponse,
	invoker ServiceInvoker,
	logger *zap.Logger,
	scope tally.Scope,
) context.Context

WithActivityTask adds activity specific information into context. Use this method to unit test activity implementations that use context extractor methodshared.

func WithCancel

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

func WithTestTags

func WithTestTags(ctx context.Context, testTags map[string]map[string]string) context.Context

WithTestTags - is used for internal cadence use to pass any test tags. TODO: Build the tags on top of the context and pass it around instead of map of maps.

Types

type ActivityInfo

type ActivityInfo struct {
	TaskToken         []byte
	WorkflowExecution WorkflowExecution
	ActivityID        string
	ActivityType      ActivityType
}

ActivityInfo contains information about currently executing activity.

func GetActivityInfo

func GetActivityInfo(ctx context.Context) ActivityInfo

GetActivityInfo returns information about currently executing activity.

type ActivityOptions

type ActivityOptions struct {
	// TaskList that the activity needs to be scheduled on.
	// optional: The default task list with the same name as the workflow task list.
	TaskList string

	// ScheduleToCloseTimeout - The end to end time out for the activity needed.
	// The zero value of this uses default value.
	// Optional: The default value is the sum of ScheduleToStartTimeout and StartToCloseTimeout
	ScheduleToCloseTimeout time.Duration

	// ScheduleToStartTimeout - The queue time out before the activity starts executed.
	// Mandatory: No default.
	ScheduleToStartTimeout time.Duration

	// StartToCloseTimeout - The time out from the start of execution to end of it.
	// Mandatory: No default.
	StartToCloseTimeout time.Duration

	// HeartbeatTimeout - The periodic timeout while the activity is in execution. This is
	// the max interval the server needs to hear at-least one ping from the activity.
	// Optional: Default zero, means no heart beating is needed.
	HeartbeatTimeout time.Duration

	// WaitForCancellation - Whether to wait for cancelled activity to be completed(
	// activity can be failed, completed, cancel accepted)
	// Optional: default false
	WaitForCancellation bool

	// ActivityID - Business level activity ID, this is not needed for most of the cases if you have
	// to specify this then talk to cadence team. This is something will be done in future.
	// Optional: default empty string
	ActivityID string
}

ActivityOptions stores all activity-specific parameters that will be stored inside of a context.

type ActivityTaskHandler

type ActivityTaskHandler interface {
	// Execute the activity task
	// The return interface{} can have three requests, use switch to find the type of it.
	// - RespondActivityTaskCompletedRequest
	// - RespondActivityTaskFailedRequest
	// - RespondActivityTaskCancelRequest
	Execute(task *s.PollForActivityTaskResponse) (interface{}, error)
}

ActivityTaskHandler represents activity task handlers.

func NewActivityTaskHandler

func NewActivityTaskHandler(service m.TChanWorkflowService, identity string, logger *zap.Logger) ActivityTaskHandler

NewActivityTaskHandler creates an instance of a WorkflowTaskHandler from a decision poll response using activity functions registered through RegisterActivity. service parameter is used for heartbeating from activity implementation. To be used to invoke registered functions for debugging purposes.

type ActivityType

type ActivityType struct {
	Name string
}

ActivityType identifies a activity type.

type CancelFunc

type CancelFunc func()

A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. After the first call, subsequent calls to a CancelFunc do nothing.

type CanceledError

type CanceledError struct {
	// contains filtered or unexported fields
}

CanceledError returned when operation was canceled.

func NewCanceledError

func NewCanceledError(details ...interface{}) *CanceledError

NewCanceledError creates CanceledError instance

func (*CanceledError) Details

func (e *CanceledError) Details(d ...interface{})

Details extracts strong typed detail data of this error.

func (*CanceledError) Error

func (e *CanceledError) Error() string

Error from error interface

type Channel

type Channel interface {
	// Blocks until it gets a value. when it gets a value assigns to the provided pointer.
	// Example:
	//   var v string
	//   c.Receive(ctx, &v)
	Receive(ctx Context, valuePtr interface{}) (more bool)              // more is false when channel is closed
	ReceiveAsync(valuePtr interface{}) (ok bool)                        // ok is true when value was returned
	ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) // ok is true when value was returned, more is false when channel is closed
	Send(ctx Context, v interface{})
	SendAsync(v interface{}) (ok bool) // ok when value was sent
	Close()                            // prohibit sends
}

Channel must be used instead of native go channel by workflow code. Use Context.NewChannel method to create an instance.

func GetSignalChannel

func GetSignalChannel(ctx Context, signalName string) Channel

GetSignalChannel returns channel corresponding to the signal name.

func NewBufferedChannel

func NewBufferedChannel(ctx Context, size int) Channel

NewBufferedChannel create new buffered Channel instance

func NewChannel

func NewChannel(ctx Context) Channel

NewChannel create new Channel instance

func NewNamedBufferedChannel

func NewNamedBufferedChannel(ctx Context, name string, size int) Channel

NewNamedBufferedChannel create new BufferedChannel instance with a given human readable name. Name appears in stack traces that are blocked on this Channel.

func NewNamedChannel

func NewNamedChannel(ctx Context, name string) Channel

NewNamedChannel create new Channel instance with a given human readable name. Name appears in stack traces that are blocked on this channel.

type ChildWorkflowFuture

type ChildWorkflowFuture interface {
	Future
	// GetChildWorkflowExecution returns a future that will be ready when child workflow execution started. You can
	// get the WorkflowExecution of the child workflow from the future. Then you can use Workflow ID and RunID of
	// child workflow to cancel or send signal to child workflow.
	GetChildWorkflowExecution() Future
}

ChildWorkflowFuture represents the result of a child workflow execution

func ExecuteChildWorkflow

func ExecuteChildWorkflow(ctx Context, f interface{}, args ...interface{}) ChildWorkflowFuture

ExecuteChildWorkflow requests child workflow execution in the context of a workflow.

  • Context can be used to pass the settings for the child workflow. For example: task list that this child workflow should be routed, timeouts that need to be configured. Use ChildWorkflowOptions to pass down the options. cwo := ChildWorkflowOptions{ ExecutionStartToCloseTimeout: 10 * time.Minute, TaskStartToCloseTimeout: time.Minute, } ctx1 := WithChildWorkflowOptions(ctx, cwo)
  • f - Either a workflow name or a workflow function that is getting scheduled.
  • args - The arguments that need to be passed to the child workflow function represented by 'f'.
  • If the child workflow failed to complete then the future get error would indicate the failure

and it can be one of CustomError, TimeoutError, CanceledError, GenericError.

  • You can also cancel the pending child workflow using context(WithCancel(ctx)) and that will fail the workflow with

error CanceledError. - returns ChildWorkflowFuture

type ChildWorkflowOptions

type ChildWorkflowOptions struct {
	// Domain of the child workflow.
	// Optional: the current workflow (parent)'s domain will be used if this is not provided.
	Domain string

	// WorkflowID of the child workflow to be scheduled.
	// Optional: an auto generated workflowID will be used if this is not provided.
	WorkflowID string

	// TaskList that the child workflow needs to be scheduled on.
	// Optional: the parent workflow task list will be used if this is not provided.
	TaskList string

	// ExecutionStartToCloseTimeout - The end to end timeout for the child workflow execution.
	// Mandatory: no default
	ExecutionStartToCloseTimeout time.Duration

	// TaskStartToCloseTimeout - The decision task timeout for the child workflow.
	// Optional: default is 10s if this is not provided (or if 0 is provided).
	TaskStartToCloseTimeout time.Duration

	// ChildPolicy defines the behavior of child workflow when parent workflow is terminated.
	// Optional: default to use ChildWorkflowPolicyTerminate if this is not provided
	ChildPolicy ChildWorkflowPolicy

	// WaitForCancellation - Whether to wait for cancelled child workflow to be ended (child workflow can be ended
	// as: completed/failed/timedout/terminated/canceled)
	// Optional: default false
	WaitForCancellation bool
}

ChildWorkflowOptions stores all child workflow specific parameters that will be stored inside of a Context.

type ChildWorkflowPolicy

type ChildWorkflowPolicy int32

ChildWorkflowPolicy defines child workflow behavior when parent workflow is terminated.

const (
	// ChildWorkflowPolicyTerminate is policy that will terminate all child workflows when parent workflow is terminated.
	ChildWorkflowPolicyTerminate ChildWorkflowPolicy = 0
	// ChildWorkflowPolicyRequestCancel is policy that will send cancel request to all open child workflows when parent
	// workflow is terminated.
	ChildWorkflowPolicyRequestCancel ChildWorkflowPolicy = 1
	// ChildWorkflowPolicyAbandon is policy that will have no impact to child workflow execution when parent workflow is
	// terminated.
	ChildWorkflowPolicyAbandon ChildWorkflowPolicy = 2
)

type Client

type Client interface {
	// StartWorkflow starts a workflow execution
	// The user can use this to start using a function or workflow type name.
	// Either by
	//     StartWorkflow(ctx, options, "workflowTypeName", input)
	//     or
	//     StartWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3)
	// The errors it can return:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- WorkflowExecutionAlreadyStartedError
	StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecution, error)

	// SignalWorkflow sends a signals to a workflow in execution
	// - workflow ID of the workflow.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// - signalName name to identify the signal.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- InternalServiceError
	SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error

	// CancelWorkflow cancels a workflow in execution
	// - workflow ID of the workflow.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	CancelWorkflow(ctx context.Context, workflowID string, runID string) error

	// TerminateWorkflow terminates a workflow execution.
	// workflowID is required, other parameters are optional.
	// - workflow ID of the workflow.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error

	// GetWorkflowHistory gets history of a particular workflow.
	// - workflow ID of the workflow.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	GetWorkflowHistory(ctx context.Context, workflowID string, runID string) (*s.History, error)

	// GetWorkflowStackTrace gets a stack trace of all goroutines of a particular workflow.
	// atDecisionTaskCompletedEventID is the eventID of the CompleteDecisionTask event at which stack trace should be taken.
	// It allows to look at the past states of a workflow.
	// 0 value indicates that the whole existing history should be used.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	GetWorkflowStackTrace(ctx context.Context, workflowID string, runID string, atDecisionTaskCompletedEventID int64) (string, error)

	// CompleteActivity reports activity completed.
	// activity Execute method can return cadence.ErrActivityResultPending to
	// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
	// should be called when that activity is completed with the actual result and error. If err is nil, activity task
	// completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise,
	// activity task failed event will be reported.
	// An activity implementation should use GetActivityInfo(ctx).TaskToken function to get task token to use for completion.
	// Example:-
	//	To complete with a result.
	//  	CompleteActivity(token, "Done", nil)
	//	To fail the activity with an error.
	//      CompleteActivity(token, nil, NewErrorWithDetails("reason", details)
	// The activity can fail with below errors ErrorWithDetails, TimeoutError, CanceledError.
	CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error

	// RecordActivityHeartbeat records heartbeat for an activity.
	// details - is the progress you want to record along with heart beat for this activity.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- InternalServiceError
	RecordActivityHeartbeat(ctx context.Context, taskToken []byte, details ...interface{}) error

	// ListClosedWorkflow gets closed workflow executions based on request filters
	// The errors it can return:
	//  - BadRequestError
	//  - InternalServiceError
	//  - EntityNotExistError
	ListClosedWorkflow(ctx context.Context, request *s.ListClosedWorkflowExecutionsRequest) (*s.ListClosedWorkflowExecutionsResponse, error)

	// ListClosedWorkflow gets open workflow executions based on request filters
	// The errors it can return:
	//  - BadRequestError
	//  - InternalServiceError
	//  - EntityNotExistError
	ListOpenWorkflow(ctx context.Context, request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error)

	// QueryWorkflow queries a given workflow execution and returns the query result synchronously. Parameter workflowID
	// and queryType are required, other parameters are optional. The workflowID and runID (optional) identify the
	// target workflow execution that this query will be send to. If runID is not specified (empty string), server will
	// use the currently running execution of that workflowID. The queryType specifies the type of query you want to
	// run. By default, cadence supports "__stack_trace" as a standard query type, which will return string value
	// representing the call stack of the target workflow. The target workflow could also setup different query handler
	// to handle custom query types.
	// See comments at cadence.SetQueryHandler(ctx Context, queryType string, handler interface{}) for more details
	// on how to setup query handler within the target workflow.
	// - workflowID is required.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// - queryType is the type of the query.
	// - args... are the optional query parameters.
	// The errors it can return:
	//  - BadRequestError
	//  - InternalServiceError
	//  - EntityNotExistError
	//  - QueryFailError
	QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (EncodedValue, error)
}

Client is the client for starting and getting information about a workflow executions as well as completing activities asynchronously.

func NewClient

func NewClient(service m.TChanWorkflowService, domain string, options *ClientOptions) Client

NewClient creates an instance of a workflow client

type ClientOptions

type ClientOptions struct {
	MetricsScope tally.Scope
	Identity     string
}

ClientOptions are optional parameters for Client creation.

type Context

type Context interface {
	// Deadline returns the time when work done on behalf of this context
	// should be canceled.  Deadline returns ok==false when no deadline is
	// set.  Successive calls to Deadline return the same results.
	Deadline() (deadline time.Time, ok bool)

	// Done returns a channel that's closed when work done on behalf of this
	// context should be canceled.  Done may return nil if this context can
	// never be canceled.  Successive calls to Done return the same value.
	//
	// WithCancel arranges for Done to be closed when cancel is called;
	// WithDeadline arranges for Done to be closed when the deadline
	// expires; WithTimeout arranges for Done to be closed when the timeout
	// elapses.
	//
	// Done is provided for use in select statements:
	//
	//  // Stream generates values with DoSomething and sends them to out
	//  // until DoSomething returns an error or ctx.Done is closed.
	//  func Stream(ctx Context, out Channel) (err error) {
	//	for {
	//		v, err := DoSomething(ctx)
	//		if err != nil {
	//			return err
	//		}
	//		s := NewSelector(ctx)
	//		s.AddReceive(ctx.Done(),  func(v interface{}) { err = ctx.Err() })
	//		s.AddReceive(v, func(v interface{}, more bool) { out.Send(ctx, v) })
	//		s.Select(ctx)
	//		if err != nil {
	//			return err
	//		}
	//	}
	//  }
	//
	// See http://blog.golang.org/pipelines for more examples of how to use
	// a Done channel for cancelation.
	Done() Channel

	// Err returns a non-nil error value after Done is closed.  Err returns
	// Canceled if the context was canceled or DeadlineExceeded if the
	// context's deadline passed.  No other values for Err are defined.
	// After Done is closed, successive calls to Err return the same value.
	Err() error

	// Value returns the value associated with this context for key, or nil
	// if no value is associated with key.  Successive calls to Value with
	// the same key returns the same result.
	//
	// Use context values only for request-scoped data that transits
	// processes and API boundaries, not for passing optional parameters to
	// functions.
	//
	// A key identifies a specific value in a Context.  Functions that wish
	// to store values in Context typically allocate a key in a global
	// variable then use that key as the argument to context.WithValue and
	// Context.Value.  A key can be any type that supports equality;
	// packages should define keys as an unexported type to avoid
	// collisions.
	//
	// Packages that define a Context key should provide type-safe accessors
	// for the values stores using that key:
	//
	// 	// Package user defines a User type that's stored in Contexts.
	// 	package user
	//
	// 	import "golang.org/x/net/context"
	//
	// 	// User is the type of value stored in the Contexts.
	// 	type User struct {...}
	//
	// 	// key is an unexported type for keys defined in this package.
	// 	// This prevents collisions with keys defined in other packages.
	// 	type key int
	//
	// 	// userKey is the key for user.User values in Contexts.  It is
	// 	// unexported; clients use user.NewContext and user.FromContext
	// 	// instead of using this key directly.
	// 	var userKey key = 0
	//
	// 	// NewContext returns a new Context that carries value u.
	// 	func NewContext(ctx context.Context, u *User) context.Context {
	// 		return context.WithValue(ctx, userKey, u)
	// 	}
	//
	// 	// FromContext returns the User value stored in ctx, if any.
	// 	func FromContext(ctx context.Context) (*User, bool) {
	// 		u, ok := ctx.Value(userKey).(*User)
	// 		return u, ok
	// 	}
	Value(key interface{}) interface{}
}

Context is a clone of context.Context with Done() returning Channel instead of native channel. A Context carries a deadline, a cancelation signal, and other values across API boundaries.

Context's methods may be called by multiple goroutines simultaneously.

func WithActivityOptions

func WithActivityOptions(ctx Context, options ActivityOptions) Context

WithActivityOptions adds all options to the context.

func WithChildPolicy

func WithChildPolicy(ctx Context, childPolicy ChildWorkflowPolicy) Context

WithChildPolicy adds a ChildWorkflowPolicy to the context.

func WithChildWorkflowOptions

func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context

WithChildWorkflowOptions adds all workflow options to the context.

func WithExecutionStartToCloseTimeout

func WithExecutionStartToCloseTimeout(ctx Context, d time.Duration) Context

WithExecutionStartToCloseTimeout adds a workflow execution timeout to the context.

func WithHeartbeatTimeout

func WithHeartbeatTimeout(ctx Context, d time.Duration) Context

WithHeartbeatTimeout adds a timeout to the context.

func WithScheduleToCloseTimeout

func WithScheduleToCloseTimeout(ctx Context, d time.Duration) Context

WithScheduleToCloseTimeout adds a timeout to the context.

func WithScheduleToStartTimeout

func WithScheduleToStartTimeout(ctx Context, d time.Duration) Context

WithScheduleToStartTimeout adds a timeout to the context.

func WithStartToCloseTimeout

func WithStartToCloseTimeout(ctx Context, d time.Duration) Context

WithStartToCloseTimeout adds a timeout to the context.

func WithTaskList

func WithTaskList(ctx Context, name string) Context

WithTaskList adds a task list to the context.

func WithValue

func WithValue(parent Context, key interface{}, val interface{}) Context

WithValue returns a copy of parent in which the value associated with key is val.

Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

func WithWaitForCancellation

func WithWaitForCancellation(ctx Context, wait bool) Context

WithWaitForCancellation adds wait for the cacellation to the context.

func WithWorkflowDomain

func WithWorkflowDomain(ctx Context, name string) Context

WithWorkflowDomain adds a domain to the context.

func WithWorkflowID

func WithWorkflowID(ctx Context, workflowID string) Context

WithWorkflowID adds a workflowID to the context.

func WithWorkflowTaskList

func WithWorkflowTaskList(ctx Context, name string) Context

WithWorkflowTaskList adds a task list to the context.

func WithWorkflowTaskStartToCloseTimeout

func WithWorkflowTaskStartToCloseTimeout(ctx Context, d time.Duration) Context

WithWorkflowTaskStartToCloseTimeout adds a decision timeout to the context.

type ContinueAsNewError

type ContinueAsNewError struct {
	// contains filtered or unexported fields
}

ContinueAsNewError contains information about how to continue the workflow as new.

func NewContinueAsNewError

func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *ContinueAsNewError

NewContinueAsNewError creates ContinueAsNewError instance If the workflow main function returns this error then the current execution is ended and the new execution with same workflow ID is started automatically with options provided to this function.

 ctx - use context to override any options for the new workflow like execution time out, decision task time out, task list.
	  if not mentioned it would use the defaults that the current workflow is using.
       ctx := WithExecutionStartToCloseTimeout(ctx, 30 * time.Minute)
       ctx := WithWorkflowTaskStartToCloseTimeout(ctx, time.Minute)
	  ctx := WithWorkflowTaskList(ctx, "example-group")
 wfn - workflow function. for new execution it can be different from the currently running.
 args - arguments for the new workflow.

func (*ContinueAsNewError) Error

func (e *ContinueAsNewError) Error() string

Error from error interface

type CustomError

type CustomError struct {
	// contains filtered or unexported fields
}

CustomError returned from workflow and activity implementations with reason and optional details.

func NewCustomError

func NewCustomError(reason string, details ...interface{}) *CustomError

NewCustomError create new instance of *CustomError with reason and optional details.

func (*CustomError) Details

func (e *CustomError) Details(d ...interface{})

Details extracts strong typed detail data of this custom error

func (*CustomError) Error

func (e *CustomError) Error() string

Error from error interface

func (*CustomError) Reason

func (e *CustomError) Reason() string

Reason gets the reason of this custom error

type DomainClient

type DomainClient interface {
	// Register a domain with cadence server
	// The errors it can throw:
	//	- DomainAlreadyExistsError
	//	- BadRequestError
	//	- InternalServiceError
	Register(ctx context.Context, request *s.RegisterDomainRequest) error

	// Describe a domain. The domain has two part of information.
	// DomainInfo - Which has Name, Status, Description, Owner Email.
	// DomainConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics.
	// The errors it can throw:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	Describe(ctx context.Context, name string) (*s.DomainInfo, *s.DomainConfiguration, error)

	// Update a domain. The domain has two part of information.
	// UpdateDomainInfo - To update domain Description and Owner Email.
	// DomainConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics.
	// The errors it can throw:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	Update(ctx context.Context, name string, domainInfo *s.UpdateDomainInfo, domainConfig *s.DomainConfiguration) error
}

DomainClient is the client for managing operations on the domain. CLI, tools, ... can use this layer to manager operations on domain.

func NewDomainClient

func NewDomainClient(service m.TChanWorkflowService, options *ClientOptions) DomainClient

NewDomainClient creates an instance of a domain client, to manager lifecycle of domains.

type EncodedValue

type EncodedValue []byte

EncodedValue is type alias used to encapsulate/extract encoded result from workflow/activity.

func SideEffect

func SideEffect(ctx Context, f func(ctx Context) interface{}) EncodedValue

SideEffect executes provided function once, records its result into the workflow history and doesn't reexecute it on replay returning recorded result instead. It can be seen as an "inline" activity. Use it only for short nondeterministic code snippets like getting random value or generating UUID. The only way to fail SideEffect is to panic which causes decision task failure. The decision task after timeout is rescheduled and reexecuted giving SideEffect another chance to succeed. Be careful to not return any data from SideEffect function any other way than through its recorded return value. For example this code is BROKEN:

var executed bool

cadence.SideEffect(func(ctx cadence.Context) interface{} {
       executed = true
       return nil
})
if executed {
       ....
} else {
       ....
}

On replay the function is not executed, the executed flag is not set to true and the workflow takes a different path breaking the determinism.

Here is the correct way to use SideEffect:

encodedRandom := SideEffect(func(ctx cadence.Context) interface{} {
      return rand.Intn(100)
})

var random int encodedRandom.Get(&random)

if random < 50 {
       ....
} else {
       ....
}

func (EncodedValue) Get

func (b EncodedValue) Get(valuePtr interface{}) error

Get extract data from encoded data to desired value type. valuePtr is pointer to the actual value type.

type EncodedValues

type EncodedValues []byte

EncodedValues is a type alias used to encapsulate/extract encoded arguments from workflow/activity.

func (EncodedValues) Get

func (b EncodedValues) Get(valuePtr ...interface{}) error

Get extract data from encoded data to desired value type. valuePtr is pointer to the actual value type.

type Future

type Future interface {
	// Get blocks until the future is ready. When ready it either returns non nil error
	// or assigns result value to the provided pointer.
	// Example:
	// var v string
	// if err := f.Get(ctx, &v); err != nil {
	//     return err
	// }
	// fmt.Printf("Value=%v", v)
	Get(ctx Context, valuePtr interface{}) error
	// When true Get is guaranteed to not block
	IsReady() bool
}

Future represents the result of an asynchronous computation.

func ExecuteActivity

func ExecuteActivity(ctx Context, f interface{}, args ...interface{}) Future

ExecuteActivity requests activity execution in the context of a workflow.

  • Context can be used to pass the settings for this activity. For example: task list that this need to be routed, timeouts that need to be configured. Use ActivityOptions to pass down the options. ao := ActivityOptions{ TaskList: "exampleTaskList", ScheduleToStartTimeout: 10 * time.Second, StartToCloseTimeout: 5 * time.Second, ScheduleToCloseTimeout: 10 * time.Second, HeartbeatTimeout: 0, } ctx1 := WithActivityOptions(ctx, ao)

    or to override a single option

    ctx1 := WithTaskList(ctx, "exampleTaskList")

  • f - Either a activity name or a function that is getting scheduled.

  • args - The arguments that need to be passed to the function represented by 'f'.

  • If the activity failed to complete then the future get error would indicate the failure

and it can be one of CustomError, TimeoutError, CanceledError, PanicError, GenericError.

  • You can also cancel the pending activity using context(WithCancel(ctx)) and that will fail the activity with

error CanceledError. - returns Future with activity result or failure

func NewTimer

func NewTimer(ctx Context, d time.Duration) Future

NewTimer returns immediately and the future becomes ready after the specified timeout.

  • The current timer resolution implementation is in seconds but is subjected to change.
  • The workflow needs to use this NewTimer() to get the timer instead of the Go lang library one(timer.NewTimer())
  • You can also cancel the pending timer using context(WithCancel(ctx)) and that will cancel the timer with

error TimerCanceledError.

type GenericError

type GenericError struct {
	// contains filtered or unexported fields
}

GenericError returned from workflow/workflow when the implementations return errors other than from NewCustomError() API.

func (*GenericError) Error

func (e *GenericError) Error() string

Error from error interface

type GetHistoryPage

type GetHistoryPage func(nextToken []byte) (*s.History, []byte, error)

GetHistoryPage - Fetches a history page given a continuation token.

type MockCallWrapper

type MockCallWrapper struct {
	// contains filtered or unexported fields
}

MockCallWrapper is a wrapper to mock.Call. It offers the ability to wait on workflow's clock instead of wall clock.

func (*MockCallWrapper) After

After sets how long to wait on workflow's clock before the mock call returns.

func (*MockCallWrapper) Once

func (c *MockCallWrapper) Once() *MockCallWrapper

Once indicates that that the mock should only return the value once.

func (*MockCallWrapper) Return

func (c *MockCallWrapper) Return(returnArguments ...interface{}) *MockCallWrapper

Return specifies the return arguments for the expectation.

func (*MockCallWrapper) Run

func (c *MockCallWrapper) Run(fn func(args mock.Arguments)) *MockCallWrapper

Run sets a handler to be called before returning. It can be used when mocking a method such as unmarshalers that takes a pointer to a struct and sets properties in such struct.

func (*MockCallWrapper) Times

func (c *MockCallWrapper) Times(i int) *MockCallWrapper

Times indicates that that the mock should only return the indicated number of times.

func (*MockCallWrapper) Twice

func (c *MockCallWrapper) Twice() *MockCallWrapper

Twice indicates that that the mock should only return the value twice.

type PanicError

type PanicError struct {
	// contains filtered or unexported fields
}

PanicError contains information about panicked workflow/activity.

func (*PanicError) Error

func (e *PanicError) Error() string

Error from error interface

func (*PanicError) StackTrace

func (e *PanicError) StackTrace() string

StackTrace return stack trace of the panic

type RegisterActivityOptions

type RegisterActivityOptions struct {
	Name string
}

RegisterActivityOptions consists of options for registering an activity

type RegisterWorkflowOptions

type RegisterWorkflowOptions struct {
	Name string
}

RegisterWorkflowOptions consists of options for registering a workflow

type Selector

type Selector interface {
	AddReceive(c Channel, f func(c Channel, more bool)) Selector
	AddSend(c Channel, v interface{}, f func()) Selector
	AddFuture(future Future, f func(f Future)) Selector
	AddDefault(f func())
	Select(ctx Context)
}

Selector must be used instead of native go select by workflow code Use Context.NewSelector method to create an instance.

func NewNamedSelector

func NewNamedSelector(ctx Context, name string) Selector

NewNamedSelector creates a new Selector instance with a given human readable name. Name appears in stack traces that are blocked on this Selector.

func NewSelector

func NewSelector(ctx Context) Selector

NewSelector creates a new Selector instance.

type ServiceInvoker

type ServiceInvoker interface {
	// Returns ActivityTaskCanceledError if activity is cancelled
	Heartbeat(details []byte) error
	Close()
}

ServiceInvoker abstracts calls to the Cadence service from an activity implementation. Implement to unit test activities.

type Settable

type Settable interface {
	Set(value interface{}, err error)
	SetValue(value interface{})
	SetError(err error)
	Chain(future Future) // Value (or error) of the future become the same of the chained one.
}

Settable is used to set value or error on a future. See NewFuture function.

type StartWorkflowOptions

type StartWorkflowOptions struct {
	// ID - The business identifier of the workflow execution.
	// Optional: defaulted to a uuid.
	ID string

	// TaskList - The decisions of the workflow are scheduled on this queue.
	// This is also the default task list on which activities are scheduled. The workflow author can choose
	// to override this using activity options.
	// Mandatory: No default.
	TaskList string

	// ExecutionStartToCloseTimeout - The time out for duration of workflow execution.
	// The resolution is seconds.
	// Mandatory: No default.
	ExecutionStartToCloseTimeout time.Duration

	// DecisionTaskStartToCloseTimeout - The time out for processing decision task from the time the worker
	// pulled this task.
	// The resolution is seconds.
	// Optional: defaulted to 20 secs.
	DecisionTaskStartToCloseTimeout time.Duration
}

StartWorkflowOptions configuration parameters for starting a workflow execution.

type TestActivityEnvironment

type TestActivityEnvironment struct {
	// contains filtered or unexported fields
}

TestActivityEnvironment is the environment that you use to test activity

func (*TestActivityEnvironment) ExecuteActivity

func (t *TestActivityEnvironment) ExecuteActivity(activityFn interface{}, args ...interface{}) (EncodedValue, error)

ExecuteActivity executes an activity. The tested activity will be executed synchronously in the calling goroutinue. Caller should use EncodedValue.Get() to extract strong typed result value.

func (*TestActivityEnvironment) SetWorkerOptions added in v0.3.2

func (t *TestActivityEnvironment) SetWorkerOptions(options WorkerOptions) *TestActivityEnvironment

SetWorkerOptions sets the WorkerOptions that will be use by TestActivityEnvironment. TestActivityEnvironment will use options of Identity, MetricsScope and BackgroundActivityContext on the WorkerOptions. Other options are ignored.

type TestWorkflowEnvironment

type TestWorkflowEnvironment struct {
	mock.Mock
	// contains filtered or unexported fields
}

TestWorkflowEnvironment is the environment that you use to test workflow

func (*TestWorkflowEnvironment) CancelWorkflow

func (t *TestWorkflowEnvironment) CancelWorkflow()

CancelWorkflow requests cancellation (through workflow Context) to the currently running test workflow.

func (*TestWorkflowEnvironment) CompleteActivity

func (t *TestWorkflowEnvironment) CompleteActivity(taskToken []byte, result interface{}, err error) error

CompleteActivity complete an activity that had returned ErrActivityResultPending error

func (*TestWorkflowEnvironment) ExecuteWorkflow

func (t *TestWorkflowEnvironment) ExecuteWorkflow(workflowFn interface{}, args ...interface{})

ExecuteWorkflow executes a workflow, wait until workflow complete. It will fail the test if workflow is blocked and cannot complete within TestTimeout (set by SetTestTimeout()).

func (*TestWorkflowEnvironment) GetWorkflowError

func (t *TestWorkflowEnvironment) GetWorkflowError() error

GetWorkflowError return the error from test workflow

func (*TestWorkflowEnvironment) GetWorkflowResult

func (t *TestWorkflowEnvironment) GetWorkflowResult(valuePtr interface{}) error

GetWorkflowResult extracts the encoded result from test workflow, it returns error if the extraction failed.

func (*TestWorkflowEnvironment) IsWorkflowCompleted

func (t *TestWorkflowEnvironment) IsWorkflowCompleted() bool

IsWorkflowCompleted check if test is completed or not

func (*TestWorkflowEnvironment) Now

Now returns the current workflow time (a.k.a cadence.Now() time) of this TestWorkflowEnvironment.

func (*TestWorkflowEnvironment) OnActivity

func (t *TestWorkflowEnvironment) OnActivity(activity interface{}, args ...interface{}) *MockCallWrapper

OnActivity setup a mock call for activity. Parameter activity must be activity function (func) or activity name (string). You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to the Return() call should either be a function that has exact same signature as the mocked activity, or it should be mock values with the same types as the mocked activity function returns. Example: assume the activity you want to mock has function signature as:

func MyActivity(ctx context.Context, msg string) (string, error)

You can mock it by return a function with exact same signature:

t.OnActivity(MyActivity, mock.Anything, mock.Anything).Return(func(ctx context.Context, msg string) (string, error) {
   // your mock function implementation
   return "", nil
})

OR return mock values with same types as activity function's return types:

t.OnActivity(MyActivity, mock.Anything, mock.Anything).Return("mock_result", nil)

func (*TestWorkflowEnvironment) OnWorkflow

func (t *TestWorkflowEnvironment) OnWorkflow(workflow interface{}, args ...interface{}) *MockCallWrapper

OnWorkflow setup a mock call for workflow. Parameter workflow must be workflow function (func) or workflow name (string). You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to the Return() call should either be a function that has exact same signature as the mocked workflow, or it should be mock values with the same types as the mocked workflow function returns. Example: assume the workflow you want to mock has function signature as:

func MyChildWorkflow(ctx cadence.Context, msg string) (string, error)

You can mock it by return a function with exact same signature:

t.OnWorkflow(MyChildWorkflow, mock.Anything, mock.Anything).Return(func(ctx cadence.Context, msg string) (string, error) {
   // your mock function implementation
   return "", nil
})

OR return mock values with same types as workflow function's return types:

t.OnWorkflow(MyChildWorkflow, mock.Anything, mock.Anything).Return("mock_result", nil)

func (*TestWorkflowEnvironment) QueryWorkflow added in v0.3.2

func (t *TestWorkflowEnvironment) QueryWorkflow(queryType string, args ...interface{}) (EncodedValue, error)

QueryWorkflow queries to the currently running test workflow and returns result synchronously.

func (*TestWorkflowEnvironment) RegisterDelayedCallback

func (t *TestWorkflowEnvironment) RegisterDelayedCallback(callback func(), delayDuration time.Duration)

RegisterDelayedCallback creates a new timer with specified delayDuration using workflow clock (not wall clock). When the timer fires, the callback will be called. By default, this test suite uses mock clock which automatically move forward to fire next timer when workflow is blocked. You can use this API to make some event (like activity completion, signal or workflow cancellation) at desired time.

func (*TestWorkflowEnvironment) SetActivityTaskList

func (t *TestWorkflowEnvironment) SetActivityTaskList(tasklist string, activityFn ...interface{})

SetActivityTaskList set the affinity between activity and tasklist. By default, activity can be invoked by any tasklist in this test environment. You can use this SetActivityTaskList() to set affinity between activity and a tasklist. Once activity is set to a particular tasklist, that activity will only be available to that tasklist.

func (*TestWorkflowEnvironment) SetOnActivityCanceledListener

func (t *TestWorkflowEnvironment) SetOnActivityCanceledListener(
	listener func(activityInfo *ActivityInfo)) *TestWorkflowEnvironment

SetOnActivityCanceledListener sets a listener that will be called after an activity is canceled.

func (*TestWorkflowEnvironment) SetOnActivityCompletedListener

func (t *TestWorkflowEnvironment) SetOnActivityCompletedListener(
	listener func(activityInfo *ActivityInfo, result EncodedValue, err error)) *TestWorkflowEnvironment

SetOnActivityCompletedListener sets a listener that will be called after an activity is completed.

func (*TestWorkflowEnvironment) SetOnActivityHeartbeatListener

func (t *TestWorkflowEnvironment) SetOnActivityHeartbeatListener(
	listener func(activityInfo *ActivityInfo, details EncodedValues)) *TestWorkflowEnvironment

SetOnActivityHeartbeatListener sets a listener that will be called when activity heartbeat.

func (*TestWorkflowEnvironment) SetOnActivityStartedListener

func (t *TestWorkflowEnvironment) SetOnActivityStartedListener(
	listener func(activityInfo *ActivityInfo, ctx context.Context, args EncodedValues)) *TestWorkflowEnvironment

SetOnActivityStartedListener sets a listener that will be called before activity starts execution.

func (*TestWorkflowEnvironment) SetOnChildWorkflowCanceledListener

func (t *TestWorkflowEnvironment) SetOnChildWorkflowCanceledListener(
	listener func(workflowInfo *WorkflowInfo)) *TestWorkflowEnvironment

SetOnChildWorkflowCanceledListener sets a listener that will be called when a child workflow is canceled.

func (*TestWorkflowEnvironment) SetOnChildWorkflowCompletedListener

func (t *TestWorkflowEnvironment) SetOnChildWorkflowCompletedListener(
	listener func(workflowInfo *WorkflowInfo, result EncodedValue, err error)) *TestWorkflowEnvironment

SetOnChildWorkflowCompletedListener sets a listener that will be called after a child workflow is completed.

func (*TestWorkflowEnvironment) SetOnChildWorkflowStartedListener

func (t *TestWorkflowEnvironment) SetOnChildWorkflowStartedListener(
	listener func(workflowInfo *WorkflowInfo, ctx Context, args EncodedValues)) *TestWorkflowEnvironment

SetOnChildWorkflowStartedListener sets a listener that will be called before a child workflow starts execution.

func (*TestWorkflowEnvironment) SetOnTimerCancelledListener

func (t *TestWorkflowEnvironment) SetOnTimerCancelledListener(listener func(timerID string)) *TestWorkflowEnvironment

SetOnTimerCancelledListener sets a listener that will be called after a timer is cancelled

func (*TestWorkflowEnvironment) SetOnTimerFiredListener

func (t *TestWorkflowEnvironment) SetOnTimerFiredListener(listener func(timerID string)) *TestWorkflowEnvironment

SetOnTimerFiredListener sets a listener that will be called after a timer is fired.

func (*TestWorkflowEnvironment) SetOnTimerScheduledListener

func (t *TestWorkflowEnvironment) SetOnTimerScheduledListener(
	listener func(timerID string, duration time.Duration)) *TestWorkflowEnvironment

SetOnTimerScheduledListener sets a listener that will be called before a timer is scheduled.

func (*TestWorkflowEnvironment) SetTestTimeout

func (t *TestWorkflowEnvironment) SetTestTimeout(idleTimeout time.Duration) *TestWorkflowEnvironment

SetTestTimeout sets the wall clock timeout for this workflow test run. When test timeout happen, it means workflow is blocked and cannot make progress. This could happen if workflow is waiting for activity result for too long. This is real wall clock time, not the workflow time (a.k.a cadence.Now() time).

func (*TestWorkflowEnvironment) SetWorkerOptions added in v0.3.2

func (t *TestWorkflowEnvironment) SetWorkerOptions(options WorkerOptions) *TestWorkflowEnvironment

SetWorkerOptions sets the WorkerOptions for TestWorkflowEnvironment. TestWorkflowEnvironment will use options set by use options of Identity, MetricsScope and BackgroundActivityContext on the WorkerOptions. Other options are ignored.

func (*TestWorkflowEnvironment) SignalWorkflow

func (t *TestWorkflowEnvironment) SignalWorkflow(name string, input interface{})

SignalWorkflow sends signal to the currently running test workflow.

type TimeoutError

type TimeoutError struct {
	// contains filtered or unexported fields
}

TimeoutError returned when activity or child workflow timed out.

func NewHeartbeatTimeoutError

func NewHeartbeatTimeoutError(details ...interface{}) *TimeoutError

NewHeartbeatTimeoutError creates TimeoutError instance WARNING: This function is public only to support unit testing of workflows. It shouldn't be used by application level code.

func NewTimeoutError

func NewTimeoutError(timeoutType shared.TimeoutType) *TimeoutError

NewTimeoutError creates TimeoutError instance. Use NewHeartbeatTimeoutError to create heartbeat TimeoutError WARNING: This function is public only to support unit testing of workflows. It shouldn't be used by application level code.

func (*TimeoutError) Details

func (e *TimeoutError) Details(d ...interface{})

Details extracts strong typed detail data of this error

func (*TimeoutError) Error

func (e *TimeoutError) Error() string

Error from error interface

func (*TimeoutError) TimeoutType

func (e *TimeoutError) TimeoutType() shared.TimeoutType

TimeoutType return timeout type of this error

type Version

type Version int

Version represents a change version. See GetVersion call.

const DefaultVersion Version = -1

DefaultVersion is a version returned by GetVersion for code that wasn't versioned before

func GetVersion

func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version

GetVersion is used to safely perform backwards incompatible changes to workflow definitions. It is not allowed to update workflow code while there are workflows running as it is going to break determinism. The solution is to have both old code that is used to replay existing workflows as well as the new one that is used when it is executed for the first time. GetVersion returns maxSupported version when is executed for the first time. This version is recorded into the workflow history as a marker event. Even if maxSupported version is changed the version that was recorded is returned on replay. DefaultVersion constant contains version of code that wasn't versioned before. For example initially workflow has the following code: err = cadence.ExecuteActivity(ctx, foo).Get(ctx, nil) it should be updated to err = cadence.ExecuteActivity(ctx, bar).Get(ctx, nil) The backwards compatible way to execute the update is v := GetVersion(ctx, "fooChange", DefaultVersion, 1)

if v  == DefaultVersion {
    err = cadence.ExecuteActivity(ctx, foo).Get(ctx, nil)
} else {
    err = cadence.ExecuteActivity(ctx, bar).Get(ctx, nil)
}

Then bar has to be changed to baz:

v := GetVersion(ctx, "fooChange", DefaultVersion, 2)

if v  == DefaultVersion {
    err = cadence.ExecuteActivity(ctx, foo).Get(ctx, nil)
} else if v == 1 {
    err = cadence.ExecuteActivity(ctx, bar).Get(ctx, nil)
} else {
    err = cadence.ExecuteActivity(ctx, baz).Get(ctx, nil)
}

Later when there are no workflows running DefaultVersion the correspondent branch can be removed:

v := GetVersion(ctx, "fooChange", 1, 2)

if v == 1 {
    err = cadence.ExecuteActivity(ctx, bar).Get(ctx, nil)
} else {
    err = cadence.ExecuteActivity(ctx, baz).Get(ctx, nil)
}

Currently there is no supported way to completely remove GetVersion call after it was introduced. Keep it even if single branch is left:

GetVersion(ctx, "fooChange", 2, 2) err = cadence.ExecuteActivity(ctx, baz).Get(ctx, nil)

It is necessary as GetVersion performs validation of a version against a workflow history and fails decisions if a workflow code is not compatible with it.

type Worker

type Worker interface {
	// Start starts the worker in a non-blocking fashion
	Start() error
	// Run is a blocking start and cleans up resources when killed
	// returns error only if it fails to start the worker
	Run() error
	// Stop cleans up any resources opened by worker
	Stop()
}

Worker represents objects that can be started and stopped.

func NewActivityTaskWorker

func NewActivityTaskWorker(
	taskHandler ActivityTaskHandler,
	service m.TChanWorkflowService,
	domain string,
	taskList string,
	options WorkerOptions,
) Worker

NewActivityTaskWorker returns instance of an activity task handler worker. To be used by framework level code that requires access to the original workflow task.

func NewWorker

func NewWorker(
	service m.TChanWorkflowService,
	domain string,
	taskList string,
	options WorkerOptions,
) Worker

NewWorker creates an instance of worker for managing workflow and activity executions. service - thrift connection to the cadence server. domain - the name of the cadence domain. taskList - is the task list name you use to identify your client worker, also

identifies group of workflow and activity implementations that are hosted by a single worker process.

options - configure any worker specific options like logger, metrics, identity.

func NewWorkflowTaskWorker

func NewWorkflowTaskWorker(
	taskHandler WorkflowTaskHandler,
	service m.TChanWorkflowService,
	domain string,
	taskList string,
	options WorkerOptions,
) (worker Worker)

NewWorkflowTaskWorker returns an instance of a workflow task handler worker. To be used by framework level code that requires access to the original workflow task.

type WorkerOptions

type WorkerOptions struct {
	// Optional: To set the maximum concurrent activity executions this host can have.
	// The zero value of this uses the default value.
	// default: defaultMaxConcurrentActivityExecutionSize(1k)
	MaxConcurrentActivityExecutionSize int

	// Optional: Sets the rate limiting on number of activities that can be executed per second. Notice that the
	// number is represented in float, so that you can set it to less than 1 if needed. For example, set the number
	// to 0.1 means you want your activity to be executed once for every 10 seconds. This can be used to protect
	// down stream services from flooding.
	// The zero value of this uses the default value.
	// default: defaultMaxActivityExecutionRate(100k)
	// Warning: activity's StartToCloseTimeout starts ticking even if a task is blocked due to rate limiting.
	MaxActivityExecutionPerSecond float64

	// Optional: if the activities need auto heart beating for those activities
	// by the framework
	// default: false not to heartbeat.
	AutoHeartBeat bool

	// Optional: Sets an identify that can be used to track this host for debugging.
	// default: default identity that include hostname, groupName and process ID.
	Identity string

	// Optional: Metrics to be reported.
	// default: no metrics.
	MetricsScope tally.Scope

	// Optional: Logger framework can use to log.
	// default: default logger provided.
	Logger *zap.Logger

	// Optional: Enable logging in replay.
	// In the decider you can use Cadence.GetLogger(ctx) to access logger that is replay aware.
	// This will enable workflow decider code to log during
	// the replay mode as well. This will be too verbose and often repeated logs.
	// default: false
	EnableLoggingInReplay bool

	// Optional: Disable running workflow workers.
	// default: false
	DisableWorkflowWorker bool

	// Optional: Disable running activity workers.
	// default: false
	DisableActivityWorker bool

	// Optional: sets context for activity. The context can be used to pass any configuration to activity
	// like common logger for all activities.
	BackgroundActivityContext context.Context
}

WorkerOptions is to configure a worker instance, for example (1) the logger or any specific metrics.

(2) Whether to heart beat for activities automatically.

Use NewWorkerOptions function to create an instance.

type WorkflowExecution

type WorkflowExecution struct {
	ID    string
	RunID string
}

WorkflowExecution Details.

type WorkflowInfo

type WorkflowInfo struct {
	WorkflowExecution                   WorkflowExecution
	WorkflowType                        WorkflowType
	TaskListName                        string
	ExecutionStartToCloseTimeoutSeconds int32
	TaskStartToCloseTimeoutSeconds      int32
	Domain                              string
}

WorkflowInfo information about currently executing workflow

func GetWorkflowInfo

func GetWorkflowInfo(ctx Context) *WorkflowInfo

GetWorkflowInfo extracts info of a current workflow from a context.

type WorkflowTaskHandler

type WorkflowTaskHandler interface {
	// Process the workflow task
	ProcessWorkflowTask(
		task *s.PollForDecisionTaskResponse,
		getHistoryPage GetHistoryPage,
		emitStack bool) (response interface{}, stackTrace string, err error)
}

WorkflowTaskHandler represents decision task handlers.

func NewWorkflowTaskHandler

func NewWorkflowTaskHandler(domain string, identity string, logger *zap.Logger) WorkflowTaskHandler

NewWorkflowTaskHandler creates an instance of a WorkflowTaskHandler from a decision poll response using workflow functions registered through RegisterWorkflow To be used to replay a workflow in a debugger.

type WorkflowTestSuite

type WorkflowTestSuite struct {
	// contains filtered or unexported fields
}

WorkflowTestSuite is the test suite to run unit tests for workflow/activity.

func (*WorkflowTestSuite) GetLogger

func (s *WorkflowTestSuite) GetLogger() *zap.Logger

GetLogger gets the logger for this WorkflowTestSuite.

func (*WorkflowTestSuite) NewTestActivityEnvironment

func (s *WorkflowTestSuite) NewTestActivityEnvironment() *TestActivityEnvironment

NewTestActivityEnvironment creates a new instance of TestActivityEnvironment. You can use the returned TestActivityEnvironment to run your activity in the test environment.

func (*WorkflowTestSuite) NewTestWorkflowEnvironment

func (s *WorkflowTestSuite) NewTestWorkflowEnvironment() *TestWorkflowEnvironment

NewTestWorkflowEnvironment creates a new instance of TestWorkflowEnvironment. You can use the returned TestWorkflowEnvironment to run your workflow in the test environment.

func (*WorkflowTestSuite) SetLogger

func (s *WorkflowTestSuite) SetLogger(logger *zap.Logger)

SetLogger sets the logger for this WorkflowTestSuite. If you don't set logger, test suite will create a default logger with Debug level logging enabled.

func (*WorkflowTestSuite) SetMetricsScope

func (s *WorkflowTestSuite) SetMetricsScope(scope tally.Scope)

SetMetricsScope sets the metrics scope for this WorkflowTestSuite. If you don't set scope, test suite will use tally.NoopScope

type WorkflowType

type WorkflowType struct {
	Name string
}

WorkflowType identifies a workflow type.

Directories

Path Synopsis
cmd
tools/copyright command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL