Documentation ¶
Overview ¶
Package task is the Asynchronous Task Execution Module.
The async task module presents a distributed task execution framework for services to execute a function asynchronously and durably.
Backend ¶
Backends are messaging transports used by the framework to guarantee durability.
Usage ¶
To use the module, initialize it at service startup and register any functions that will be invoked asynchronously. Call task.Enqueue on a function and the execution framework will send it to the backend implementation. Workers are running in parallel and listening to the backend. Once they receive a message from the backend, they will execute the function.
package main import ( "context" "go.uber.org/fx/modules/task" "go.uber.org/fx/service" "go.uber.org/fx/ulog" ) func main() { svc, err := service.WithModule(task.New(newBackend)).Build() if err := task.Register(updateCache); err != nil { ulog.Logger().Fatal("could not register task", "error", err) } svc.Start() } func newBackend(host service.Host) (task.Backend, error) { b := // create backend here return b, nil } func runActivity(ctx context.Context, input string) error { // do things and calculate results results := "results" return task.Enqueue(updateCache, ctx, input, results) } func updateCache(ctx context.Context, input string, results string) error { // update cache with the name return nil }
The async task module is a singleton and a service can initialize only one at this time. Users are free to define their own backends and encodings for message passing.
Async function requirements ¶
For the function to be invoked asynchronously, the following criteria must be met:
• The first input argument should be of type context.Context (https://golang.org/pkg/context/#Context)
• The function should return only one value, which should be an error.
• The caller does not receive a return value from the called function.
• The function should not take variadic arguments as input (support coming soon).
• If functions take in an interface, the implementation must be registered on startup.
Index ¶
- func Enqueue(fn interface{}, args ...interface{}) error
- func New(createFunc BackendCreateFunc) service.ModuleProvider
- func Register(fn interface{}) error
- func Run(ctx context.Context, message []byte) error
- type Backend
- type BackendCreateFunc
- type Encoding
- type GobEncoding
- type NopBackend
- type NopEncoding
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Enqueue ¶
func Enqueue(fn interface{}, args ...interface{}) error
Enqueue sends a func before sending to the task queue
func New ¶
func New(createFunc BackendCreateFunc) service.ModuleProvider
New creates an async task queue ModuleProvider.
Types ¶
type Backend ¶
type Backend interface { service.Module Encoder() Encoding Publish(ctx context.Context, message []byte) error }
Backend represents a task backend
func GlobalBackend ¶
func GlobalBackend() Backend
GlobalBackend returns global instance of the backend TODO (madhu): Make work with multiple backends
func NewInMemBackend ¶
NewInMemBackend creates a new in memory backend, designed for use in tests
type BackendCreateFunc ¶
BackendCreateFunc creates a backend implementation
type Encoding ¶
type Encoding interface { Register(interface{}) error Marshal(interface{}) ([]byte, error) Unmarshal([]byte, interface{}) error }
Encoding is capable of encoding and decoding objects
type GobEncoding ¶
type GobEncoding struct { }
GobEncoding encapsulates gob encoding and decoding
func (GobEncoding) Marshal ¶
func (g GobEncoding) Marshal(obj interface{}) ([]byte, error)
Marshal encodes an object into bytes
func (GobEncoding) Register ¶
func (g GobEncoding) Register(obj interface{}) error
Register implements the Encoding interface
func (GobEncoding) Unmarshal ¶
func (g GobEncoding) Unmarshal(data []byte, obj interface{}) error
Unmarshal decodes a byte array into the passed in object
type NopBackend ¶
type NopBackend struct{}
NopBackend is a noop implementation of the Backend interface
func (NopBackend) Encoder ¶
func (b NopBackend) Encoder() Encoding
Encoder implements the Backend interface
type NopEncoding ¶
type NopEncoding struct { }
NopEncoding is a noop encoder
func (NopEncoding) Marshal ¶
func (g NopEncoding) Marshal(obj interface{}) ([]byte, error)
Marshal implements the Encoding interface
func (NopEncoding) Register ¶
func (g NopEncoding) Register(obj interface{}) error
Register implements the Encoding interface
func (NopEncoding) Unmarshal ¶
func (g NopEncoding) Unmarshal(data []byte, obj interface{}) error
Unmarshal implements the Encoding interface