Documentation
¶
Overview ¶
Package task is the Asynchronous Task Execution Module.
The async task module presents a distributed task execution framework for services to execute a function asynchronously and durably.
Backend ¶
Backends are messaging transports used by the framework to guarantee durability.
Usage ¶
To use the module, initialize it at service startup and register any functions that will be invoked asynchronously. Call task.Enqueue on a function and the execution framework will send it to the backend implementation. Workers are running in parallel and listening to the backend. Once they receive a message from the backend, they will execute the function.
package main
import (
"context"
"go.uber.org/fx/modules/task"
"go.uber.org/fx/modules/task/cherami"
"go.uber.org/fx/service"
"go.uber.org/fx/ulog"
)
func main() {
svc, err := service.WithModule(task.New(cherami.NewBackend)).Build()
if err != nil {
log.Fatal("Failed to initialize module", err)
}
if err := task.Register(updateCache); err != nil {
ulog.Logger().Fatal("could not register task", "error", err)
}
svc.Start()
}
func newBackend(host service.Host) (task.Backend, error) {
b := // create backend here
return b, nil
}
func runActivity(ctx context.Context, input string) error {
// do things and calculate results
results := "results"
return task.Enqueue(updateCache, ctx, input, results)
}
func updateCache(ctx context.Context, input string, results string) error {
// update cache with the name
return nil
}
The async task module is a singleton and a service can initialize only one at this time. Users are free to define their own backends and encodings for message passing.
It is possible to enqueue and execute tasks on different clusters if you want to separate workloads. The default configuration spins up workers that will execute the task. In order to spin up just task enqueueing without execution, initialize the module as follows:
svc, err := service.WithModule(
task.New(cherami.NewBackend, task.DisableExecution()),
).Build()
if err != nil {
log.Fatal("Failed to initialize module", err)
}
Async function requirements ¶
For the function to be invoked asynchronously, the following criteria must be met:
• The first input argument should be of type context.Context (https://golang.org/pkg/context/#Context)
• The function should return only one value, which should be an error.
• The caller does not receive a return value from the called function.
• The function should not take variadic arguments as input (support coming soon).
• If functions take in an interface, the implementation must be registered on startup.
Index ¶
- func Enqueue(fn interface{}, args ...interface{}) error
- func MustRegister(fn interface{})
- func New(createFunc BackendCreateFunc, options ...ModuleOption) service.ModuleProvider
- func Register(fn interface{}) error
- func Run(ctx context.Context, message []byte) error
- type Backend
- type BackendCreateFunc
- type Config
- type Encoding
- type GobEncoding
- type ModuleOption
- type NopBackend
- type NopEncoding
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Enqueue ¶
func Enqueue(fn interface{}, args ...interface{}) error
Enqueue sends a func before sending to the task queue
func MustRegister ¶
func MustRegister(fn interface{})
MustRegister registers a function for an async task or panics Since Task registration is performed in main() we want to fail-fast and reduce the amount of error-checking
func New ¶
func New(createFunc BackendCreateFunc, options ...ModuleOption) service.ModuleProvider
New creates an async task queue ModuleProvider.
Types ¶
type Backend ¶
type Backend interface {
service.Module
// Encoder returns the encoding used by the backend
Encoder() Encoding
// Enqueue will add a message to the backend
Enqueue(ctx context.Context, message []byte) error
// ExecuteAsync kicks off workers that consume incoming messages and execute them as tasks
ExecuteAsync() error
}
Backend represents a task backend
func GlobalBackend ¶
func GlobalBackend() Backend
GlobalBackend returns global instance of the backend TODO (madhu): Make work with multiple backends
func NewInMemBackend ¶
NewInMemBackend creates a new in memory backend, designed for use in tests
type BackendCreateFunc ¶
BackendCreateFunc creates a backend implementation
type Config ¶
type Config struct {
DisableExecution bool
}
Config represents the options for the task module
type Encoding ¶
type Encoding interface {
Register(interface{}) error
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
}
Encoding is capable of encoding and decoding objects
type GobEncoding ¶
type GobEncoding struct {
}
GobEncoding encapsulates gob encoding and decoding
func (GobEncoding) Marshal ¶
func (g GobEncoding) Marshal(obj interface{}) ([]byte, error)
Marshal encodes an object into bytes
func (GobEncoding) Register ¶
func (g GobEncoding) Register(obj interface{}) error
Register implements the Encoding interface
func (GobEncoding) Unmarshal ¶
func (g GobEncoding) Unmarshal(data []byte, obj interface{}) error
Unmarshal decodes a byte array into the passed in object
type ModuleOption ¶
ModuleOption is a function that configures module creation.
func DisableExecution ¶
func DisableExecution() ModuleOption
DisableExecution disables task execution and only allows enqueuing
type NopBackend ¶
type NopBackend struct{}
NopBackend is a noop implementation of the Backend interface
func (NopBackend) Encoder ¶
func (b NopBackend) Encoder() Encoding
Encoder implements the Backend interface
func (NopBackend) Enqueue ¶
func (b NopBackend) Enqueue(ctx context.Context, message []byte) error
Enqueue implements the Backend interface
func (NopBackend) ExecuteAsync ¶
func (b NopBackend) ExecuteAsync() error
ExecuteAsync implements the Backend interface
type NopEncoding ¶
type NopEncoding struct {
}
NopEncoding is a noop encoder
func (NopEncoding) Marshal ¶
func (g NopEncoding) Marshal(obj interface{}) ([]byte, error)
Marshal implements the Encoding interface
func (NopEncoding) Register ¶
func (g NopEncoding) Register(obj interface{}) error
Register implements the Encoding interface
func (NopEncoding) Unmarshal ¶
func (g NopEncoding) Unmarshal(data []byte, obj interface{}) error
Unmarshal implements the Encoding interface