cadence

package module
v0.7.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2018 License: MIT Imports: 2 Imported by: 0

README

Go framework for Cadence Build Status Coverage Status

Cadence is a distributed, scalable, durable, and highly available orchestration engine we developed at Uber Engineering to execute asynchronous long-running business logic in a scalable and resilient way.

cadence-client is the framework for authoring workflows and activities.

How to use

Make sure you clone this repo into the correct location.

git clone git@github.com:uber-go/cadence-client.git $GOPATH/src/go.uber.org/cadence

or

go get go.uber.org/cadence

See samples to get started

Activity

Activity is the implementation of a particular task in the business logic.

Activities are implemented as functions. Data can be passed directly to an activity via function parameters. The parameters can be either basic types or structs, with the only requirement being that the parameters need to be serializable. Even though it is not required, we recommand that the first parameter of an activity function is of type context.Context, in order to allow the activity to interact with other framework methods. The function must return an error value, and can optionally return a result value. The result value can be either a basic type or a struct with the only requirement being that it is serializable.

The values passed to activities through invocation parameters or returned through the result value is recorded in the execution history. The entire execution history is transfered from the Cadence service to workflow workers with every event that the workflow logic needs to process. A large execution history can thus adversily impact the performance of your workflow. Therefore be mindful of the amount of data you transfer via activity invocation parameters or return values. Other than that no additional limitations exist on activity implementations.

In order to make the activity visible to the worker process hosting it, the activity needs to be registered via a call to activity.Register.

package simple

import (
	"context"

    "go.uber.org/cadence/activity"
    "go.uber.org/zap"
)

func init() {
	activity.Register(SimpleActivity)
}

// SimpleActivity is a sample Cadence activity function that takes one parameter and
// returns a string containing the parameter value.
func SimpleActivity(ctx context.Context, value string) (string, error) {
	activity.GetLogger(ctx).Info("SimpleActivity called.", zap.String("Value", value))
	return "Processed: " + value, nil
}
Workflow

Workflow is the implementation of coordination logic. Its sole purpose is to orchestrate activity executions.

Workflows are implemented as functions. Startup data can be passed to a workflow via function parameters. The parameters can be either basic types or structs, with the only requirement being that the parameters need to be serializable. The first parameter of a workflow function is of type workflow.Context. The function must return an error value, and can optional return a result value. The result value can be either a basic type or a struct with the only requirement being that the it is serializable.

Workflow functions need to execute deterministically. Therefore, here is a list of rules that workflow code should obey to be a good Cadence citizen:

  • Use workflow.Context everywhere.
  • Don’t use range over map.
  • Use workflow.SideEffect to call rand and similar nondeterministic functions like UUID generator.
  • Use workflow.Now to get current time. Use workflow.NewTimer or workflow.Sleep instead of standard Go functions.
  • Don’t use native channel and select. Use workflow.Channel and workflow.Selector.
  • Don’t use go func(...). Use workflow.Go(func(...)).
  • Don’t use non constant global variables as multiple instances of a workflow function can be executing in parallel.
  • Don’t use any blocking functions besides belonging to Channel, Selector or Future
  • Don’t use any synchronization primitives as they can cause blockage and there is no possibility of races when running under dispatcher.
  • Don't just change workflow code when there are open workflows. Always update code using workflow.GetVersion.
  • Don’t perform any IO or service calls as they are not usually deterministic. Use activities for that.
  • Don’t access configuration APIs directly from a workflow as changes in configuration will affect the workflow execution path. Either return configuration from an activity or use workflow.SideEffect to load it.

In order to make the workflow visible to the worker process hosting it, the workflow needs to be registered via a call to workflow.Register.

package simple

import (
	"time"


	"go.uber.org/cadence/workflow"
	"go.uber.org/zap"
)

func init() {
	workflow.Register(SimpleWorkflow)
}

// SimpleWorkflow is a sample Cadence workflow that accepts one parameter and
// executes an activity to which it passes the aforementioned parameter.
func SimpleWorkflow(ctx workflow.Context, value string) error {
	options := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Second * 60,
		StartToCloseTimeout:    time.Second * 60,
	}
	ctx = workflow.WithActivityOptions(ctx, options)

	var result string
	err := workflow.ExecuteActivity(ctx, activity.SimpleActivity, value).Get(ctx, &result)
	if err != nil {
		return err
	}
	workflow.GetLogger(ctx).Info(
		"SimpleActivity returned successfully!", zap.String("Result", result))

	workflow.GetLogger(ctx).Info("SimpleWorkflow completed!")
	return nil
}
Worker

A worker or “worker service” is a services hosting the workflow and activity implementations. The worker polls the “Cadence service” for tasks, performs those tasks and communicates task execution results back to the “Cadence service”. Worker services are developed, deployed and operated by Cadence customers.

You can run a Cadence worker in a new or an exiting service. Use the framework APIs to start the Cadence worker and link in all activity and workflow implementations that you require this service to execute.

package main

import (

	t "go.uber.org/cadence/.gen/go/cadence"
	"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
	"go.uber.org/cadence/worker"

	"github.com/uber-go/tally"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
	"go.uber.org/yarpc"
	"go.uber.org/yarpc/api/transport"
	"go.uber.org/yarpc/transport/tchannel"
)

var HostPort = "127.0.0.1:7933"
var Domain = "SimpleDomain"
var TaskListName = "SimpleWorker"
var ClientName = "SimpleWorker"
var CadenceService = "CadenceServiceFrontend"

func main() {
	startWorker(buildLogger(), buildCadenceClient())
}

func buildLogger() *zap.Logger {
	config := zap.NewDevelopmentConfig()
	config.Level.SetLevel(zapcore.InfoLevel)

	var err error
	logger, err := config.Build()
	if err != nil {
		panic("Failed to setup logger")
	}

	return logger
}

func buildCadenceClient() workflowserviceclient.Interface {
	ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(ClientName), tchannel.ListenAddr("127.0.0.1:0"))
	if err != nil {
		panic("Failed to setup tchannel")
	}
	dispatcher := yarpc.NewDispatcher(yarpc.Config{
			Name: ClientName,
			Outbounds: yarpc.Outbounds{
				CadenceService: {Unary: ch.NewSingleOutbound(HostPort)},
			},
		})
	if err := dispatcher.Start(); err != nil {
		panic("Failed to start dispatcher")
	}

	return workflowserviceclient.New(dispatcher.ClientConfig(CadenceService))
}

func startWorker(logger *zap.Logger, service workflowserviceclient.Interface) {
	// TaskListName - identifies set of client workflows, activities and workers.
	// it could be your group or client or application name.
	workerOptions := worker.Options{
		Logger:       logger,
		MetricsScope: tally.NewTestScope(TaskListName, map[string]string{}),
	}

	worker := worker.New(
		service,
		Domain,
		TaskListName,
		workerOptions)
	err := worker.Start()
	if err != nil {
		panic("Failed to start worker")
	}

	logger.Info("Started Worker.", zap.String("worker", TaskListName))
}

Contributing

We'd love your help in making Cadence-client great. Please review our instructions.

License

MIT License, please see LICENSE for details.

Documentation

Overview

Package cadence and its subdirectories contain the Cadence client side framework.

The Cadence service is a task orchestrator for your application’s tasks. Applications using Cadence can execute a logical flow of tasks, especially long-running business logic, asynchronously or synchronously. They can also scale at runtime on distributed systems.

A quick example illustrates its use case. Consider Uber Eats where Cadence manages the entire business flow from placing an order, accepting it, handling shopping cart processes (adding, updating, and calculating cart items), entering the order in a pipeline (for preparing food and coordinating delivery), to scheduling delivery as well as handling payments.

Cadence consists of a programming framework (or client library) and a managed service (or backend). The framework enables developers to author and coordinate tasks in Go code.

The root cadence package contains common data structures. The subpackages are:

  • workflow - functions used to implement workflows
  • activity - functions used to implement activities
  • client - functions used to create Cadence service client used to start and monitor workflow executions.
  • worker - functions used to create worker instance used to host workflow and activity code.
  • testsuite - unit testing framework for activity and workflow testing

How Cadence works

The Cadence hosted service brokers and persists events generated during workflow execution. Worker nodes owned and operated by customers execute the coordination and task logic. To facilitate the implementation of worker nodes Cadence provides a client-side library for the Go language.

In Cadence, you can code the logical flow of events separately as a workflow and code business logic as activities. The workflow identifies the activities and sequences them, while an activity executes the logic.

Key Features

Dynamic workflow execution graphs - Determine the workflow execution graphs at runtime based on the data you are processing. Cadence does not pre-compute the execution graphs at compile time or at workflow start time. Therefore, you have the ability to write workflows that can dynamically adjust to the amount of data they are processing. If you need to trigger 10 instances of an activity to efficiently process all the data in one run, but only 3 for a subsequent run, you can do that.

Child Workflows - Orchestrate the execution of a workflow from within another workflow. Cadence will return the results of the child workflow execution to the parent workflow upon completion of the child workflow. No polling is required in the parent workflow to monitor status of the child workflow, making the process efficient and fault tolerant.

Durable Timers - Implement delayed execution of tasks in your workflows that are robust to worker failures. Cadence provides two easy to use APIs, **workflow.Sleep** and **workflow.Timer**, for implementing time based events in your workflows. Cadence ensures that the timer settings are persisted and the events are generated even if workers executing the workflow crash.

Signals - Modify/influence the execution path of a running workflow by pushing additional data directly to the workflow using a signal. Via the Signal facility, Cadence provides a mechanism to consume external events directly in workflow code.

Task routing - Efficiently process large amounts of data using a Cadence workflow, by caching the data locally on a worker and executing all activities meant to process that data on that same worker. Cadence enables you to choose the worker you want to execute a certain activity by scheduling that activity execution in the worker's specific task-list.

Unique workflow ID enforcement - Use business entity IDs for your workflows and let Cadence ensure that only one workflow is running for a particular entity at a time. Cadence implements an atomic "uniqueness check" and ensures that no race conditions are possible that would result in multiple workflow executions for the same workflow ID. Therefore, you can implement your code to attempt to start a workflow without checking if the ID is already in use, even in the cases where only one active execution per workflow ID is desired.

Perpetual/ContinueAsNew workflows - Run periodic tasks as a single perpetually running workflow. With the "ContinueAsNew" facility, Cadence allows you to leverage the "unique workflow ID enforcement" feature for periodic workflows. Cadence will complete the current execution and start the new execution atomically, ensuring you get to keep your workflow ID. By starting a new execution Cadence also ensures that workflow execution history does not grow indefinitely for perpetual workflows.

At-most once activity execution - Execute non-idempotent activities as part of your workflows. Cadence will not automatically retry activities on failure. For every activity execution Cadence will return a success result, a failure result, or a timeout to the workflow code and let the workflow code determine how each one of those result types should be handled.

Asynch Activity Completion - Incorporate human input or thrid-party service asynchronous callbacks into your workflows. Cadence allows a workflow to pause execution on an activity and wait for an external actor to resume it with a callback. During this pause the activity does not have any actively executing code, such as a polling loop, and is merely an entry in the Cadence datastore. Therefore, the workflow is unaffected by any worker failures happening over the duration of the pause.

Activity Heartbeating - Detect unexpected failures/crashes and track progress in long running activities early. By configuring your activity to report progress periodically to the Cadence server, you can detect a crash that occurs 10 minutes into an hour-long activity execution much sooner, instead of waiting for the 60-minute execution timeout. The recorded progress before the crash gives you sufficient information to determine whether to restart the activity from the beginning or resume it from the point of failure.

Timeouts for activities and workflow executions - Protect against stuck and unresponsive activities and workflows with appropriate timeout values. Cadence requires that timeout values are provided for every activity or workflow invocation. There is no upper bound on the timeout values, so you can set timeouts that span days, weeks, or even months.

Visibility - Get a list of all your active and/or completed workflow. Explore the execution history of a particular workflow execution. Cadence provides a set of visibility APIs that allow you, the workflow owner, to monitor past and current workflow executions.

Debuggability - Replay any workflow execution history locally under a debugger. The Cadence client library provides an API to allow you to capture a stack trace from any failed workflow execution history.

Index

Constants

View Source
const LibraryVersion = internal.LibraryVersion

LibraryVersion is a semver string that represents the version of this cadence client library it will be embedded as a "version" header in every rpc call made by this client to cadence server. In addition, the version string will be used by the server to enforce compatibility checks Update to this version number is typically done by the cadence team as part of a major feature or behavior change

Variables

View Source
var ErrNoData = internal.ErrNoData

ErrNoData is returned when trying to extract strong typed data while there is no data available.

Functions

func IsCanceledError added in v0.5.1

func IsCanceledError(err error) bool

IsCanceledError return if the err is a CanceledError

func IsCustomError added in v0.5.1

func IsCustomError(err error) bool

IsCustomError return if the err is a CustomError

func IsGenericError added in v0.5.1

func IsGenericError(err error) bool

IsGenericError return if the err is a GenericError

func IsPanicError added in v0.5.1

func IsPanicError(err error) bool

IsPanicError return if the err is a PanicError

func IsTerminatedError added in v0.5.1

func IsTerminatedError(err error) bool

IsTerminatedError return if the err is a TerminatedError

func IsTimeoutError added in v0.5.1

func IsTimeoutError(err error) bool

IsTimeoutError return if the err is a TimeoutError

Types

type CanceledError

type CanceledError = internal.CanceledError

CanceledError returned when operation was canceled.

func NewCanceledError

func NewCanceledError(details ...interface{}) *CanceledError

NewCanceledError creates CanceledError instance. Return this error from activity or child workflow to indicate that it was successfully cancelled.

type CustomError

type CustomError = internal.CustomError

CustomError returned from workflow and activity implementations with reason and optional details.

func NewCustomError

func NewCustomError(reason string, details ...interface{}) *CustomError

NewCustomError create new instance of *CustomError with reason and optional details. Use CustomError for any use case specific errors that cross activity and child workflow boundaries.

type RetryPolicy added in v0.7.5

type RetryPolicy = internal.RetryPolicy

RetryPolicy defines the retry policy for activity/workflow.

Directories

Path Synopsis
Package activity contains functions and types used to implement Cadence activities.
Package activity contains functions and types used to implement Cadence activities.
Package client contains functions to create Cadence clients used to communicate to Cadence service.
Package client contains functions to create Cadence clients used to communicate to Cadence service.
Package encoded contains wrappers that are used for binary payloads deserialization.
Package encoded contains wrappers that are used for binary payloads deserialization.
cmd/dummy
This file exists to force compilation of all code that doesn't have unit tests.
This file exists to force compilation of all code that doesn't have unit tests.
Code generated by mockery v1.0.0 Code generated by mockery v1.0.0
Code generated by mockery v1.0.0 Code generated by mockery v1.0.0
Package testsuite contains unit testing framework for Cadence workflows and activities.
Package testsuite contains unit testing framework for Cadence workflows and activities.
Package worker contains functions to manage lifecycle of a Cadence client side worker.
Package worker contains functions to manage lifecycle of a Cadence client side worker.
Package workflow contains functions and types used to implement Cadence workflows.
Package workflow contains functions and types used to implement Cadence workflows.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL