queues

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2023 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClientNil is returned when the temporal client is nil.
	ErrClientNil = errors.New("client is nil")

	// ErrChildWorkflowExecutionAttempt is returned when attempting to execute a child workflow without the parent.
	ErrChildWorkflowExecutionAttempt = errors.New("attempting to execute child workflow directly. use ExecuteWorkflow instead")

	// ErrExternalWorkflowSignalAttempt is returned when attempting to signal an external workflow from within a workflow.
	ErrExternalWorkflowSignalAttempt = errors.New("attempting to signal external workflow directly. use SignalExternalWorkflow instead")
)

Functions

func DefaultPrefix

func DefaultPrefix() string

DefaultPrefix gets the default prefix.

func SetDefaultPrefix

func SetDefaultPrefix(val string)

SetDefaultPrefix sets the default prefix.

Types

type ChildWorkflowFuture

type ChildWorkflowFuture workflow.ChildWorkflowFuture

type Name

type Name string

Name is the name of the queue.

func (Name) String

func (q Name) String() string

type Queue

type Queue interface {
	// Name gets the name of the queue as string.
	Name() Name

	// Prefix gets the prefix of the queue as string.
	Prefix() string

	// WorkflowID sanitzes the workflow ID given the workflows.Options.
	WorkflowID(options workflows.Options) string

	// ExecuteWorkflow executes a workflow given the context, workflows.Options, workflow function or function name, and
	// optional payload.
	// Lets say, we have a queue called "default", we can either pass in the workflow function or the function name.
	//
	//  q := queues.New(queues.WithName("default"), queues.WithClient(client))
	//  q.ExecuteWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    WorkflowFn, // or "WorkflowFunctionName"
	//    payload...,    // optional.
	//  )
	ExecuteWorkflow(ctx context.Context, options workflows.Options, fn any, payload ...any) (client.WorkflowRun, error)

	// ExecuteChildWorkflow executes a child workflow given the parent workflow context, workflows.Options,
	// workflow function or function name and optional payload. It must be executed from within a workflow.
	//  future, err := q.ExecuteChildWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow.
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    WorkflowFn,    // or "WorkflowFunctionName"
	//    payload...,    // optional.
	//  )
	ExecuteChildWorkflow(ctx workflow.Context, options workflows.Options, fn any, payload ...any) (ChildWorkflowFuture, error)

	// SignalWorkflow signals a workflow given the workflow ID, signal name and optional payload.
	//
	//  if err := q.SignalWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow.
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    "signal-name",
	//    payload,    // or nil
	//  ); err != nil {
	//    // handle error
	//  }
	SignalWorkflow(ctx context.Context, options workflows.Options, signalName string, payload any) error

	// SignalExternalWorkflow signals a workflow given the workflow ID, signal name and optional payload.
	//
	//  future, err := q.SignalExternalWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow.
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    "signal-name",
	//    payload,    // or nil
	//  )
	SignalExternalWorkflow(ctx workflow.Context, options workflows.Options, signal string, payload any) (workflow.Future, error)

	// SignalWithStartWorkflow signals a workflow given the workflow ID, signal name and optional payload.
	//
	//  run, err := q.SignalWithStartWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow.
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    "signal-name",
	//    arg,    // or nil
	//    WorkflowFn, // or "WorkflowFunctionName"
	//    payload..., // optional.
	//  )
	SignalWithStartWorkflow(
		ctx context.Context, options workflows.Options, signal string, arg any, fn any, payload ...any,
	) (client.WorkflowRun, error)

	// CreateWorker creates a worker against the queue.
	CreateWorker() worker.Worker
}

Queue defines the queue interface.

func New

func New(opts ...QueueOption) Queue

New creates a new queue with the given options. For a queue named "default", we will defined it as follows:

var DefaultQueue = queue.New(
  queue.WithName("default"),
  queue.WithClient(client),
  queue.WithMaxWorkflowAttempts(1),
)

type QueueOption

type QueueOption func(Queue)

QueueOption is the option for a queue.

func WithClient

func WithClient(c client.Client) QueueOption

WithClient sets the client for the queue.

func WithName

func WithName(name string) QueueOption

WithName sets the queue name and the prefix for the workflow ID.

func WithWorkflowMaxAttempts

func WithWorkflowMaxAttempts(attempts int32) QueueOption

WithWorkflowMaxAttempts sets the maximum number of attempts for all the workflows in the queue. The default value is 0 i.e. RetryForever.

type Queues

type Queues map[Name]Queue

Queues is a map of queues.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL