task

package
v1.0.0-beta2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2017 License: MIT Imports: 10 Imported by: 0

README

Asynchronous Task Execution Module

The async task module presents a distributed task execution framework for services to execute a function asynchronously and durably.

Backend

Backends are messaging transports used by the framework to guarantee durability.

Usage

To use the module, initialize it at service startup and register any functions that will be invoked asynchronously. Call task.Enqueue on a function and the execution framework will send it to the backend implementation. Workers are running in parallel and listening to the backend. Once they receive a message from the backend, they will execute the function.

package main

import (
  "context"

  "go.uber.org/fx/modules/task"
  "go.uber.org/fx/service"
  "go.uber.org/fx/ulog"
)

func main() {
  svc, err := service.WithModule(task.New(newBackend)).Build()
  if err := task.Register(updateCache); err != nil {
    ulog.Logger().Fatal("could not register task", "error", err)
  }
  svc.Start()
}

func newBackend(host service.Host) (task.Backend, error) {
  b := // create backend here
  return b, nil
}

func runActivity(ctx context.Context, input string) error {
  // do things and calculate results
  results := "results"
  return task.Enqueue(updateCache, ctx, input, results)
}

func updateCache(ctx context.Context, input string, results string) error {
  // update cache with the name
  return nil
}

The async task module is a singleton and a service can initialize only one at this time. Users are free to define their own backends and encodings for message passing.

Async function requirements

For the function to be invoked asynchronously, the following criteria must be met:

  • The first input argument should be of type context.Context
  • The function should return only one value, which should be an error.
  • The caller does not receive a return value from the called function.
  • The function should not take variadic arguments as input (support coming soon).
  • If functions take in an interface, the implementation must be registered on startup.

Documentation

Overview

Package task is the Asynchronous Task Execution Module.

The async task module presents a distributed task execution framework for services to execute a function asynchronously and durably.

Backend

Backends are messaging transports used by the framework to guarantee durability.

Usage

To use the module, initialize it at service startup and register any functions that will be invoked asynchronously. Call task.Enqueue on a function and the execution framework will send it to the backend implementation. Workers are running in parallel and listening to the backend. Once they receive a message from the backend, they will execute the function.

package main

import (
  "context"

  "go.uber.org/fx/modules/task"
  "go.uber.org/fx/service"
  "go.uber.org/fx/ulog"
)

func main() {
  svc, err := service.WithModule(task.New(newBackend)).Build()
  if err := task.Register(updateCache); err != nil {
    ulog.Logger().Fatal("could not register task", "error", err)
  }
  svc.Start()
}

func newBackend(host service.Host) (task.Backend, error) {
  b := // create backend here
  return b, nil
}

func runActivity(ctx context.Context, input string) error {
  // do things and calculate results
  results := "results"
  return task.Enqueue(updateCache, ctx, input, results)
}

func updateCache(ctx context.Context, input string, results string) error {
  // update cache with the name
  return nil
}

The async task module is a singleton and a service can initialize only one at this time. Users are free to define their own backends and encodings for message passing.

Async function requirements

For the function to be invoked asynchronously, the following criteria must be met:

• The first input argument should be of type context.Context (https://golang.org/pkg/context/#Context)

• The function should return only one value, which should be an error.

• The caller does not receive a return value from the called function.

• The function should not take variadic arguments as input (support coming soon).

• If functions take in an interface, the implementation must be registered on startup.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Enqueue

func Enqueue(fn interface{}, args ...interface{}) error

Enqueue sends a func before sending to the task queue

func New

New creates an async task queue ModuleProvider.

func Register

func Register(fn interface{}) error

Register registers a function for async tasks

func Run

func Run(ctx context.Context, message []byte) error

Run decodes the message and executes as a task

Types

type Backend

type Backend interface {
	service.Module
	Encoder() Encoding
	Publish(ctx context.Context, message []byte) error
}

Backend represents a task backend

func GlobalBackend

func GlobalBackend() Backend

GlobalBackend returns global instance of the backend TODO (madhu): Make work with multiple backends

func NewInMemBackend

func NewInMemBackend(host service.Host) Backend

NewInMemBackend creates a new in memory backend, designed for use in tests

type BackendCreateFunc

type BackendCreateFunc func(host service.Host) (Backend, error)

BackendCreateFunc creates a backend implementation

type Encoding

type Encoding interface {
	Register(interface{}) error
	Marshal(interface{}) ([]byte, error)
	Unmarshal([]byte, interface{}) error
}

Encoding is capable of encoding and decoding objects

type GobEncoding

type GobEncoding struct {
}

GobEncoding encapsulates gob encoding and decoding

func (GobEncoding) Marshal

func (g GobEncoding) Marshal(obj interface{}) ([]byte, error)

Marshal encodes an object into bytes

func (GobEncoding) Register

func (g GobEncoding) Register(obj interface{}) error

Register implements the Encoding interface

func (GobEncoding) Unmarshal

func (g GobEncoding) Unmarshal(data []byte, obj interface{}) error

Unmarshal decodes a byte array into the passed in object

type NopBackend

type NopBackend struct{}

NopBackend is a noop implementation of the Backend interface

func (NopBackend) Encoder

func (b NopBackend) Encoder() Encoding

Encoder implements the Backend interface

func (NopBackend) Publish

func (b NopBackend) Publish(ctx context.Context, message []byte) error

Publish implements the Backend interface

func (NopBackend) Start

func (b NopBackend) Start() error

Start implements the Module interface

func (NopBackend) Stop

func (b NopBackend) Stop() error

Stop implements the Module interface

type NopEncoding

type NopEncoding struct {
}

NopEncoding is a noop encoder

func (NopEncoding) Marshal

func (g NopEncoding) Marshal(obj interface{}) ([]byte, error)

Marshal implements the Encoding interface

func (NopEncoding) Register

func (g NopEncoding) Register(obj interface{}) error

Register implements the Encoding interface

func (NopEncoding) Unmarshal

func (g NopEncoding) Unmarshal(data []byte, obj interface{}) error

Unmarshal implements the Encoding interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL