tempts

package module
v0.0.0-...-c5dbd03 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: MIT Imports: 15 Imported by: 0

README

tempts

tempts stands for "Temporal Type-Safe", a Go SDK wrapper for interacting with temporal more safely.

Go Reference

Example Usage

Add this dependency with

go get github.com/vikstrous/tempts@latest

Below is a simple example demonstrating how to define a workflow and an activity, register them, and execute the workflow using tempts.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/vikstrous/tempts"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
    "go.temporal.io/sdk/workflow"
)

// Define a new namespace and task queue.
var nsDefault = tempts.NewNamespace(client.DefaultNamespace)
var queueMain = tempts.NewQueue(nsDefault, "main")

// Define a workflow with no parameters and no return.
var workflowTypeHello = tempts.NewWorkflow[struct{}, struct{}](queueMain, "HelloWorkflow")

// Define an activity with no parameters and no return.
var activityTypeHello = tempts.NewActivity[struct{}, struct{}](queueMain, "HelloActivity")

func main() {
    // Create a new client connected to the Temporal server.
    c, err := tempts.Dial(client.Options{})
    if err != nil {
        panic(err)
    }
    defer c.Close()

    // Register the workflow and activity in a new worker.
    wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{
        workflowTypeHello.WithImplementation(helloWorkflow),
        activityTypeHello.WithImplementation(helloActivity),
    })
    if err != nil {
        panic(err)
    }
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go func() {
        err = wrk.Run(ctx, c, worker.Options{})
        if err != nil {
            panic(err)
        }
    }()

    // Execute the workflow and wait for it to complete.
    _, err = workflowTypeHello.Run(ctx, c, client.StartWorkflowOptions{}, struct{}{})
    if err != nil {
        panic(err)
    }

    fmt.Println("Workflow completed.")
}

// helloWorkflow is a workflow function that calls the HelloActivity.
func helloWorkflow(ctx workflow.Context, _ struct{}) (struct{}, error) {
    return activityTypeHello.Run(ctx, struct{}{})
}

// helloActivity is an activity function that prints "Hello, Temporal!".
func helloActivity(ctx context.Context, _ struct{}) (struct{}, error) {
    fmt.Println("Hello, Temporal!")
    return struct{}{}, nil
}

This example sets up a workflow and an activity that simply prints a greeting. It demonstrates the basic setup and execution flow using tempts. To see a more complex example, look in the example directory.

WARNING: This library can change without notice while I respond to feedback and improve the API. I'll remove this warning when I'm happy with the API and can promise it won't change.

Guarantees

List of guarantees provided by this wrapper:

Workers:

  • Have all the right activities and workflows registered before starting

Activities:

  • Are called on the right namespace and queue
  • Are called with the right parameter types
  • Return the right response types
  • Registered functions match the right type signature

Workflows:

  • Are called on the right namespace and queue
  • Are called with the right parameter types
  • Return the right response types
  • Registered functions match the right type signature

Schedules:

  • Set arguments with the right types
  • Can be set upon application startup, automatically applying the intended effect to the schedule's state on the cluster

Queries and updates:

  • Are called with the right types
  • Return the right types
  • Registered functions match the right type signature
Tools

There are two functions in this library that make it easy to write fixture based replyabaility tests for your tempts workflows and activities. See example/main_test.go for an example of how to use them.

func GetWorkflowHistoriesBundle(ctx context.Context, client *tempts.Client, w *tempts.WorkflowWithImpl) ([]byte, error)
func ReplayWorkflow(historiesBytes []byte, w *tempts.WorkflowWithImpl) error

User guide by example

These examples assume that you are already familiar with the Go SDK and just need to know how to do the equivalent thing using this library.

Connect to temporal

Instead of:

c, err := tempts.Dial(client.Options{})

Do:

c, err := client.Dial(client.Options{})

This creates a wrapper that keeps track of which namespace the client is connected to so that actions are attempted against the wrong namespace, such as starting the wrong workflow, can be caught early.

Run a workflow to completion

Instead of:

var ret ReturnType
c.ExecuteWorkflow(ctx, opts, name, param).Get(ctx, &ret)

Do:

// Globally define the namespace, queue and workflow type (one time, in a centralized package for the queue)
var nsDefault = tempts.NewNamespace(client.DefaultNamespace)
var queueMain = tempts.NewQueue(nsDefault, "main")
type exampleWorkflowParamType struct{
    Param string
}
type exampleWorkflowReturnType struct{
    Return string
}
var exampleWorkflowType = tempts.NewWorkflow[exampleWorkflowParamType, exampleWorkflowReturnType](queueMain, "ExampleWorkflow")

// Run and get the result of the workflow
ret, err := exampleWorkflowType.Run(ctx, c, opts, param)
// Use Execute instead of Run to not wait for completion

This ensures that the workflow is run in the right namespace, on the right queue, with the right name, with the right parameter types and it returns the right type.

Run an activity to completion

Instead of:

var ret ReturnType
workflow.ExecuteActivity(ctx, name, param).Get(ctx, &ret)

Do:

// Globally define the namespace, queue and activity type (one time, in a centralized package for the queue)
var nsDefault = tempts.NewNamespace(client.DefaultNamespace)
var queueMain = tempts.NewQueue(nsDefault, "main")
type exampleActivityParamType struct{
    Param string
}
type exampleActivityReturnType struct{
    Return string
}
var exampleActivityType = tempts.NewActivity[exampleActivityParamType, exampleActivityReturnType](queueMain, "ExampleActivity")

// Run and get the result of the activity from a workflow
ret, err := exampleActivityType.Run(ctx, param)
// Use Execute instead of Run to not wait for completion

This ensures that the activity is run in the right namespace, on the right queue, with the right name, with the right parameter types and it returns the right type.

Create a worker

Instead of:

wrk := worker.New(c, queueName, options)
err = wrk.Run(worker.InterruptCh())

Do:

// Globally define the namespace and queue (one time, in a centralized package for the queue)
var nsDefault = tempts.NewNamespace(client.DefaultNamespace)
var queueMain = tempts.NewQueue(nsDefault, "main")

// On start up of your service
wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{
    exampleWorkflowType.WithImplementation(exampleWorkflow),
    exampleActivityType.WithImplementation(exampleActivity),
})

err = wrk.Run(ctx, c, worker.Options{})

This ensures that all the right workflows and activities are registered for this worker to satisfy the expectations for this queue. No more and no less.

Query a workflow

Instead of:

// In the workflow
workflow.SetQueryHandler(ctx, queryName, func(queryParamType) (queryReturnType, error) {
    return queryReturnType{}, nil
})

// Querying it from your application
response, err := c.QueryWorkflow(ctx, workflowID, runID, queryName, param)

var value Return
err = response.Get(&value)

Do:

// Globally define the query type
var exampleQueryType = tempts.NewQueryHandler[queryParamType, queryReturnType]("exampleQuery")

// In the workflow
exampleQueryType.SetHandler(ctx, func(queryParamType) (queryReturnType, error) {
    return queryReturnType{}, nil
})

// Query it from your application
ret, err := exampleQueryType.Query(ctx, c, workflowID, runID, param)

This ensures that the types match in the application calling the workflow and in the workflow handler function. Unfortunately, we don't guarantee that the workflow is the expected type.

Create a schedule

Instead of:

_, err = c.ScheduleClient().Create(ctx, client.ScheduleOptions{
    ID:   scheduleID,
    Spec: client.ScheduleSpec{
        Intervals: []client.ScheduleIntervalSpec{
            {
                Every: time.Second * 5,
            },
        },
    },
    Action: &client.ScheduleWorkflowAction{
        ID:        workflowID,
        Workflow:  workflowName,
        TaskQueue: queueName,
        Args:      []any{param},
    },
})
// Ignore the error if it exists already

Do:

// This assumes the workflow is already globally registered

// Call this to make sure the schedule matches what your service expects
err = workflowTypeFormatAndGreet.SetSchedule(ctx, c, client.ScheduleOptions{
    ID: scheduleID,
    Spec: client.ScheduleSpec{
        Intervals: []client.ScheduleIntervalSpec{
            {
                Every: time.Second * 5,
            },
        },
    },
}, param)

This ensures that the namespace and queue are correct for the workflow. It also ensures that the parameter is the right type and that the schedule is updated to match the one defined in your code. It doesn't handle every possible difference yet because temporal doesn't support arbitrary changes to schedules. This feature is a bit less polished than the rest of the package, so let me know how to improve it!

Fixture based tests

Instead of: coming up with your own strategy to build fixture based tests Do:

c, err := tempts.Dial(client.Options{})
if err != nil {
    t.Fatal(err)
}
historiesData, err = tempts.GetWorkflowHistoriesBundle(ctx, c, exampleWorkflowType)
if err != nil {
    t.Fatal(err)
}
// Now store historiesData somewhere! (Or don't and make sure your test is always connected to a temporal instance with example workflow runs)
err := tempts.ReplayWorkflow(historiesData, exampleWorkflow)
if err != nil {
    t.Fatal(err)
}

This is really just the cherry on top once you have your type safety in place. By running fixture based tests, you can make sure to not introduce backwards incompatible changes without versioning them correctly. Even if you don't end up using this package, feel free to adapt this pattern for your own needs. It's not a lot of code for how much extra safety it provides.

Migration for Go SDK users

Since this library is opinionated, it doesn't support all temporal features. To use this library effectively, the temporal queue you are migrating must meet these pre-requisities:

  • All workflows must take at most one parameter and return at most one parameter.
  • All activities must take at most one parameter and return at most one parameter.
  • Namespaces names and queues names must be static (assuming that your types are defined as global variables so they can be used anywhere).
  • All workflows and activities for a given queue must be migrated at once.
  • All types must be defined as Go structs that follow the Temporal Go SDK's marshaling and unmarshaling logic.

There maybe more restrictions that I'm not aware of yet. Open an issue if any are missed.

Potential future improvements

  • Wrap the APIs for channels and signals
  • Return typed futures instead of generic ones
  • Ensure that all queries are defined on the right workflows

Documentation

Index

Constants

This section is empty.

Variables

DefaultNamespace a convenient way to refer to temporal's default namespace.

Functions

func GetWorkflowHistoriesBundle

func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowDeclaration) ([]byte, error)

GetWorkflowHistoriesBundle connects to the temporal server and fetches the most recent 10 open and 10 closed executions. It returns a byte seralized piece of data that can be used immediately or in the future to call ReplayWorkflow.

func ReplayWorkflow

func ReplayWorkflow(historiesBytes []byte, fn any) error

ReplayWorkflow is meant to be used in tests with the output of GetWorkflowHistoriesBundle to check if the given workflow implementation (fn) is compatible with previous executions captured at the time when GetWorkflowHistoriesBundle was run.

Types

type Activity

type Activity[Param, Return any] struct {
	Name string
	// contains filtered or unexported fields
}

Activity is used for interacting with activities in a safe way that takes into account the input and output types, queue name and other properties.

func NewActivity

func NewActivity[Param, Return any](q *Queue, name string) Activity[Param, Return]

NewActivity declares the existence of an activity on a given queue with a given name.

func (Activity[Param, Return]) Execute

func (a Activity[Param, Return]) Execute(ctx workflow.Context, param Param) workflow.Future

Execute asynchnronously executes the activity and returns a promise.

func (Activity[Param, Return]) Run

func (a Activity[Param, Return]) Run(ctx workflow.Context, param Param) (Return, error)

Run executes the activity and synchronously returns the output.

func (Activity[Param, Return]) WithImplementation

func (a Activity[Param, Return]) WithImplementation(fn func(context.Context, Param) (Return, error)) *ActivityWithImpl

WithImplementation should be called to create the parameters for NewWorker(). It declares which function implements the activity.

type ActivityWithImpl

type ActivityWithImpl struct {
	// contains filtered or unexported fields
}

ActivityWithImpl is a temporary struct that implements Registerable. It's meant to be passed into `tempts.NewWorker`.

type Client

type Client struct {
	Client client.Client
	// contains filtered or unexported fields
}

Client is a wrapper for the temporal SDK client that keeps track of which namepace the client is connected to to return more useful errors if the wrong namespace is used.

func Dial

func Dial(opts client.Options) (*Client, error)

Dial connects to the temporal server and returns a client.

func NewFromSDK

func NewFromSDK(c client.Client, namespace string) (*Client, error)

NewFromSDK allows the caller to pass in an existing temporal SDK client and manually specify which name that client was connected to.

func NewLazyClient

func NewLazyClient(opts client.Options) (*Client, error)

NewLazyClient is equivalent to Dial, but doesn't conect to the server until necessary.

func (*Client) Close

func (c *Client) Close()

Close terminates the connection to the temporal server.

type Namespace

type Namespace struct {
	// contains filtered or unexported fields
}

Namespace declares the name of a namespace for use later. This is a pre-requisite for defining any other resource such as workflows or activities.

func NewNamespace

func NewNamespace(name string) *Namespace

NewNamespace is used for declaring namespaces. Unless using namespaces in your application, you can use `tempts.DefaultNamespace` instead of defining one.

type QueryHandler

type QueryHandler[Param, Return any] struct {
	// contains filtered or unexported fields
}

QueryHandler is used for interacting with queries on workflows. Currently there's no way to enforce the connection between the query and the workflow it should be valid on.

func NewQueryHandler

func NewQueryHandler[Param, Return any](queryName string) *QueryHandler[Param, Return]

NewQueryHandler declares the name and types for a query to a workflow.

func (*QueryHandler[Param, Return]) Query

func (q *QueryHandler[Param, Return]) Query(ctx context.Context, temporalClient *Client, workflowID, runID string, p Param) (Return, error)

Query executes the query and returns the response.

func (*QueryHandler[Param, Return]) SetHandler

func (q *QueryHandler[Param, Return]) SetHandler(ctx workflow.Context, fn func(Param) (Return, error))

SetHandler should be called by a workflow to define how the query should be handled when sent to this workflow to execute.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the declaration of a temporal, queue, which is used for routing workflows and activities to workers.

func NewQueue

func NewQueue(namespace *Namespace, name string) *Queue

NewQueue declares the existence of a queue in a given namespace. A nil namespace is equivalent to using `tempts.DefaultNamespace`

type Registerable

type Registerable interface {
	// contains filtered or unexported methods
}

Registerable can be created by calling WithImplementation() on activity or workflow definitions. It's a parameter to `tempts.NewWorker()`.

type UpdateHandler

type UpdateHandler[Param, Return any] struct {
	// contains filtered or unexported fields
}

UpdateHandler is used for interacting with updates on workflows. Currently there's no way to enforce the connection between the update and the workflow it should be valid on.

func NewUpdateHandler

func NewUpdateHandler[Param, Return any](updateName string) *UpdateHandler[Param, Return]

NewUpdateHandler declares the name and types for an update to a workflow.

func (*UpdateHandler[Param, Return]) SetHandler

func (q *UpdateHandler[Param, Return]) SetHandler(ctx workflow.Context, fn func(workflow.Context, Param) (Return, error))

SetHandler should be called by a workflow to define how the update should be handled when sent to this workflow to execute.

func (*UpdateHandler[Param, Return]) SetHandlerWithValidator

func (q *UpdateHandler[Param, Return]) SetHandlerWithValidator(ctx workflow.Context, fn func(workflow.Context, Param) (Return, error), validator func(workflow.Context, Param) error)

SetHandlerWithValidator should be called by a workflow to define how the query should be handled when sent to this workflow to execute. This is the same as SetHandler, except that it gives access to the experimental Validator function which can be used to validate the request before it's executed.

func (*UpdateHandler[Param, Return]) Update

func (q *UpdateHandler[Param, Return]) Update(ctx context.Context, temporalClient *Client, workflowID, runID string, p Param) (Return, error)

Update executes the update and returns the response.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a temporal worker that connects to the temporal server to execute activities and workflows.

func NewWorker

func NewWorker(queue *Queue, registerables []Registerable) (*Worker, error)

NewWorker defines a worker along with all of the workflows and activities. Example usage:

wrk, err := tempts.NewWorker(queueMain, []tempts.Registerable{
	activityTypeFormatName.WithImplementation(activityFormatName),
	activityTypeGreet.WithImplementation(activityGreet),
	workflowTypeFormatAndGreet.WithImplementation(workflowFormatAndGreet),
	workflowTypeJustGreet.WithImplementation(workflowJustGreet),
})

func (*Worker) Register

func (w *Worker) Register(wrk worker.Registry)

Register is useful in unit tests to define all of the worker's workflows and activities in the test environment.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context, client *Client, options worker.Options) error

Run starts the worker. To stop it, cancel the context. This function returns when the worker completes. Make sure to always cancel the context eventually, or a goroutine will be leaked.

type Workflow

type Workflow[Param any, Return any] struct {
	// contains filtered or unexported fields
}

Workflow is used for interacting with workflows in a safe way that takes into account the input and output types, queue name and other properties. Workflows are resumable functions registered on workers that execute activities.

func NewWorkflow

func NewWorkflow[
	Param any,
	Return any,
](queue *Queue, name string,
) Workflow[Param, Return]

NewWorkflow declares the existence of a workflow on a given queue with a given name.

func (Workflow[Param, Return]) Execute

func (w Workflow[Param, Return]) Execute(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (client.WorkflowRun, error)

Execute asynchnronously executes the workflow and returns a promise.

func (Workflow[Param, Return]) ExecuteChild

func (w Workflow[Param, Return]) ExecuteChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) workflow.ChildWorkflowFuture

Execute asynchnronously executes the workflow from another parent workflow and returns a promise.

func (Workflow[Param, Return]) Name

func (w Workflow[Param, Return]) Name() string

Name returns the name of the workflow.

func (Workflow[Param, Return]) Run

func (w Workflow[Param, Return]) Run(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (Return, error)

Run executes the workflow and synchronously returns the output.

func (Workflow[Param, Return]) RunChild

func (w Workflow[Param, Return]) RunChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) (Return, error)

Run executes the workflow from another parent workflow and synchronously returns the output.

func (Workflow[Param, Return]) SetSchedule

func (w Workflow[Param, Return]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, param Param) error

SetSchedule creates or updates the schedule to match the given definition. WARNING: This feature is not as seamless as it could be because of the complex API exposed by temporal. In some cases, when the schedule has been modified in some non-updateable way, this method can't update the schedule and it returns an error.

func (Workflow[Param, Return]) WithImplementation

func (w Workflow[Param, Return]) WithImplementation(fn func(workflow.Context, Param) (Return, error)) *WorkflowWithImpl[Param, Return]

WithImplementation should be called to create the parameters for NewWorker(). It declares which function implements the workflow.

type WorkflowDeclaration

type WorkflowDeclaration interface {
	Name() string
	// contains filtered or unexported methods
}

WorkflowDeclaration always contains Workflow but doesn't have type parameters, so it can be passed into non-generic functions.

type WorkflowWithImpl

type WorkflowWithImpl[Param any, Return any] struct {
	// contains filtered or unexported fields
}

WorkflowWithImpl is a temporary struct that implements Registerable. It's meant to be passed into `tempts.NewWorker`.

func (WorkflowWithImpl[Param, Return]) ExecuteInTest

func (w WorkflowWithImpl[Param, Return]) ExecuteInTest(e testEnvironment, p Param) (Return, error)

ExecuteInTest executes the given workflow implementation in a unit test and returns the output of the workflow.

func (WorkflowWithImpl[Param, Return]) Name

func (w WorkflowWithImpl[Param, Return]) Name() string

Name returns the name of the workflow being implemented.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL