temporal

package module
v0.17.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: MIT Imports: 25 Imported by: 0

README

helix.go - Temporal integration

Website Go API reference Go Report Card GitHub Release

The Temporal integration provides an opinionated way to interact with Temporal for durable executions for helix services.

Documentation

Overview

Package temporal exposes an opinionated way to interact with Temporal.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Connect

func Connect(cfg Config) (Client, Worker, error)

Connect tries to connect to the Temporal server given the Config. Returns an error if Config is not valid or if the connection failed.

func EventFromActivity

func EventFromActivity(ctx context.Context) (event.Event, bool)

EventFromActivity tries to retrieve an Event from the activity's context. Returns true if an Event has been found, false otherwise.

If an Event was found, it is added to the span attributes.

func EventFromWorkflow

func EventFromWorkflow(ctx workflow.Context) (event.Event, bool)

EventFromWorkflow tries to retrieve an Event from the workflow's context. Returns true if an Event has been found, false otherwise.

If an Event was found, it is added to the span attributes.

Types

type Client

type Client interface {
	ExecuteWorkflow(ctx context.Context, opts client.StartWorkflowOptions, workflowType string, payload ...any) (client.WorkflowRun, error)
	GetWorkflow(ctx context.Context, workflowID string, runID string) client.WorkflowRun
	SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg any) error
	SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg any, opts client.StartWorkflowOptions, workflowType string, payload any) (client.WorkflowRun, error)
	CancelWorkflow(ctx context.Context, workflowID string, runID string) error
	TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string) error
	GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType enums.HistoryEventFilterType) client.HistoryEventIterator
	CompleteActivity(ctx context.Context, namespace string, workflowID string, runID string, activityID string, result any, err error) error
	RecordActivityHeartbeat(ctx context.Context, namespace string, workflowID string, runID string, activityID string) error

	ListClosedWorkflow(ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest) (*workflowservice.ListClosedWorkflowExecutionsResponse, error)
	ListOpenWorkflow(ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest) (*workflowservice.ListOpenWorkflowExecutionsResponse, error)
	ListWorkflow(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*workflowservice.ListWorkflowExecutionsResponse, error)
	ListArchivedWorkflow(ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest) (*workflowservice.ListArchivedWorkflowExecutionsResponse, error)
	ScanWorkflow(ctx context.Context, request *workflowservice.ScanWorkflowExecutionsRequest) (*workflowservice.ScanWorkflowExecutionsResponse, error)
	CountWorkflow(ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest) (*workflowservice.CountWorkflowExecutionsResponse, error)
	GetSearchAttributes(ctx context.Context) (*workflowservice.GetSearchAttributesResponse, error)
	QueryWorkflow(ctx context.Context, request *client.QueryWorkflowWithOptionsRequest) (*client.QueryWorkflowWithOptionsResponse, error)
	DescribeWorkflowExecution(ctx context.Context, workflowID string, runID string) (*workflowservice.DescribeWorkflowExecutionResponse, error)
	DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enums.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error)
	ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

	ScheduleClient() ScheduleClient
}

Client exposes an opinionated way to interact with Temporal's client capabilities.

type Config

type Config struct {

	// Address is the Temporal server address to connect to.
	//
	// Default:
	//
	//   "127.0.0.1:7233"
	Address string `json:"address"`

	// Namespace sets the namespace to connect to.
	//
	// Default:
	//
	//   "default"
	Namespace string `json:"namespace"`

	// DataConverter customizes serialization/deserialization of arguments in
	// Temporal.
	DataConverter converter.DataConverter `json:"-"`

	// Worker configures a Temporal worker if the helix service should run as worker
	// for Temporal.
	Worker ConfigWorker `json:"worker"`

	// TLSConfig configures TLS to communicate with the Temporal server.
	TLS integration.ConfigTLS `json:"tls"`
}

Config is used to configure the Temporal integration.

type ConfigWorker

type ConfigWorker struct {

	// Enabled creates a Temporal worker, to run workflows and activities.
	Enabled bool `json:"enabled"`

	// TaskQueue is the task queue name you use to identify your client worker,
	// also identifies group of workflow and activity implementations that are hosted
	// by a single worker process.
	//
	// Required when enabled.
	TaskQueue string `json:"taskqueue,omitempty"`

	// WorkerActivitiesPerSecond sets the rate limiting on number of activities that
	// can be executed per second per worker. This can be used to limit resources
	// used by the worker.
	//
	// Notice that the number is represented in float, so that you can set it to
	// less than 1 if needed. For example, set the number to 0.1 means you want
	// your activity to be executed once for every 10 seconds. This can be used to
	// protect down stream services from flooding.
	//
	// Default:
	//
	//   100 000
	WorkerActivitiesPerSecond float64 `json:"worker_activities_per_second,omitempty"`

	// TaskQueueActivitiesPerSecond sets the rate limiting on number of activities
	// that can be executed per second. This is managed by the server and controls
	// activities per second for your entire taskqueue.
	//
	// Notice that the number is represented in float, so that you can set it to
	// less than 1 if needed. For example, set the number to 0.1 means you want
	// your activity to be executed once for every 10 seconds. This can be used to
	// protect down stream services from flooding.
	//
	// Default:
	//
	//   100 000
	TaskQueueActivitiesPerSecond float64 `json:"taskqueue_activities_per_second,omitempty"`
}

ConfigWorker configures a Temporal worker for the helix service running. When enabled, this starts a Temporal worker for the given task queue and namespace (set in Config).

type ScheduleClient

type ScheduleClient interface {
	Create(ctx context.Context, options client.ScheduleOptions) (ScheduleHandle, error)
	List(ctx context.Context, options client.ScheduleListOptions) (client.ScheduleListIterator, error)
	Handle(ctx context.Context, scheduleID string) ScheduleHandle
}

ScheduleClient exposes an opinionated way to interact with Temporal scheduling capabilities.

type ScheduleHandle

type ScheduleHandle interface {
	GetID(ctx context.Context) string
	Delete(ctx context.Context) error
	Backfill(ctx context.Context, options client.ScheduleBackfillOptions) error
	Update(ctx context.Context, options client.ScheduleUpdateOptions) error
	Describe(ctx context.Context) (*client.ScheduleDescription, error)
	Trigger(ctx context.Context, options client.ScheduleTriggerOptions) error
	Pause(ctx context.Context, options client.SchedulePauseOptions) error
	Unpause(ctx context.Context, options client.ScheduleUnpauseOptions) error
}

ScheduleHandle exposes an opinionated way to interact with Temporal scheduling capabilities.

type Worker

type Worker interface {
	RegisterWorkflow(w any, opts workflow.RegisterOptions)
	RegisterActivity(a any, opts activity.RegisterOptions)
}

Worker exposes an opinionated way to interact with Temporal's worker capabilities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL