task

package
v1.0.0-beta3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2017 License: MIT Imports: 12 Imported by: 0

README

Asynchronous Task Execution Module

The async task module presents a distributed task execution framework for services to execute a function asynchronously and durably.

Backend

Backends are messaging transports used by the framework to guarantee durability.

Usage

To use the module, initialize it at service startup and register any functions that will be invoked asynchronously. Call task.Enqueue on a function and the execution framework will send it to the backend implementation. Workers are running in parallel and listening to the backend. Once they receive a message from the backend, they will execute the function.

package main

import (
  "context"

  "go.uber.org/fx/modules/task"
  "go.uber.org/fx/modules/task/cherami"
  "go.uber.org/fx/service"
  "go.uber.org/fx/ulog"
)

func main() {
  svc, err := service.WithModule(task.New(cherami.NewBackend)).Build()
  if err != nil {
    log.Fatal("Failed to initialize module", err)
  }
  if err := task.Register(updateCache); err != nil {
    ulog.Logger().Fatal("could not register task", "error", err)
  }
  svc.Start()
}

func newBackend(host service.Host) (task.Backend, error) {
  b := // create backend here
  return b, nil
}

func runActivity(ctx context.Context, input string) error {
  // do things and calculate results
  results := "results"
  return task.Enqueue(updateCache, ctx, input, results)
}

func updateCache(ctx context.Context, input string, results string) error {
  // update cache with the name
  return nil
}

The async task module is a singleton and a service can initialize only one at this time. Users are free to define their own backends and encodings for message passing.

It is possible to enqueue and execute tasks on different clusters if you want to separate workloads. The default configuration spins up workers that will execute the task. In order to spin up just task enqueueing without execution, initialize the module as follows:

  svc, err := service.WithModule(
    task.New(cherami.NewBackend, task.DisableExecution()),
  ).Build()
  if err != nil {
    log.Fatal("Failed to initialize module", err)
  }

Async function requirements

For the function to be invoked asynchronously, the following criteria must be met:

  • The first input argument should be of type context.Context
  • The function should return only one value, which should be an error.
  • The caller does not receive a return value from the called function.
  • The function should not take variadic arguments as input (support coming soon).
  • If functions take in an interface, the implementation must be registered on startup.

Documentation

Overview

Package task is the Asynchronous Task Execution Module.

The async task module presents a distributed task execution framework for services to execute a function asynchronously and durably.

Backend

Backends are messaging transports used by the framework to guarantee durability.

Usage

To use the module, initialize it at service startup and register any functions that will be invoked asynchronously. Call task.Enqueue on a function and the execution framework will send it to the backend implementation. Workers are running in parallel and listening to the backend. Once they receive a message from the backend, they will execute the function.

package main

import (
  "context"

  "go.uber.org/fx/modules/task"
  "go.uber.org/fx/modules/task/cherami"
  "go.uber.org/fx/service"
  "go.uber.org/fx/ulog"
)

func main() {
  svc, err := service.WithModule(task.New(cherami.NewBackend)).Build()
  if err != nil {
    log.Fatal("Failed to initialize module", err)
  }
  if err := task.Register(updateCache); err != nil {
    ulog.Logger().Fatal("could not register task", "error", err)
  }
  svc.Start()
}

func newBackend(host service.Host) (task.Backend, error) {
  b := // create backend here
  return b, nil
}

func runActivity(ctx context.Context, input string) error {
  // do things and calculate results
  results := "results"
  return task.Enqueue(updateCache, ctx, input, results)
}

func updateCache(ctx context.Context, input string, results string) error {
  // update cache with the name
  return nil
}

The async task module is a singleton and a service can initialize only one at this time. Users are free to define their own backends and encodings for message passing.

It is possible to enqueue and execute tasks on different clusters if you want to separate workloads. The default configuration spins up workers that will execute the task. In order to spin up just task enqueueing without execution, initialize the module as follows:

svc, err := service.WithModule(
  task.New(cherami.NewBackend, task.DisableExecution()),
).Build()
if err != nil {
  log.Fatal("Failed to initialize module", err)
}

Async function requirements

For the function to be invoked asynchronously, the following criteria must be met:

• The first input argument should be of type context.Context (https://golang.org/pkg/context/#Context)

• The function should return only one value, which should be an error.

• The caller does not receive a return value from the called function.

• The function should not take variadic arguments as input (support coming soon).

• If functions take in an interface, the implementation must be registered on startup.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Enqueue

func Enqueue(fn interface{}, args ...interface{}) error

Enqueue sends a func before sending to the task queue

func MustRegister

func MustRegister(fn interface{})

MustRegister registers a function for an async task or panics Since Task registration is performed in main() we want to fail-fast and reduce the amount of error-checking

func New

func New(createFunc BackendCreateFunc, options ...ModuleOption) service.ModuleProvider

New creates an async task queue ModuleProvider.

func Register

func Register(fn interface{}) error

Register registers a function for async tasks

func Run

func Run(ctx context.Context, message []byte) error

Run decodes the message and executes as a task

Types

type Backend

type Backend interface {
	service.Module
	// Encoder returns the encoding used by the backend
	Encoder() Encoding
	// Enqueue will add a message to the backend
	Enqueue(ctx context.Context, message []byte) error
	// ExecuteAsync kicks off workers that consume incoming messages and execute them as tasks
	ExecuteAsync() error
}

Backend represents a task backend

func GlobalBackend

func GlobalBackend() Backend

GlobalBackend returns global instance of the backend TODO (madhu): Make work with multiple backends

func NewInMemBackend

func NewInMemBackend(host service.Host) Backend

NewInMemBackend creates a new in memory backend, designed for use in tests

func NewManagedInMemBackend

func NewManagedInMemBackend(host service.Host, cfg Config) Backend

NewManagedInMemBackend creates a new in memory backend, designed for use in tests

type BackendCreateFunc

type BackendCreateFunc func(service.Host) (Backend, error)

BackendCreateFunc creates a backend implementation

type Config

type Config struct {
	DisableExecution bool
}

Config represents the options for the task module

type Encoding

type Encoding interface {
	Register(interface{}) error
	Marshal(interface{}) ([]byte, error)
	Unmarshal([]byte, interface{}) error
}

Encoding is capable of encoding and decoding objects

type GobEncoding

type GobEncoding struct {
}

GobEncoding encapsulates gob encoding and decoding

func (GobEncoding) Marshal

func (g GobEncoding) Marshal(obj interface{}) ([]byte, error)

Marshal encodes an object into bytes

func (GobEncoding) Register

func (g GobEncoding) Register(obj interface{}) error

Register implements the Encoding interface

func (GobEncoding) Unmarshal

func (g GobEncoding) Unmarshal(data []byte, obj interface{}) error

Unmarshal decodes a byte array into the passed in object

type ModuleOption

type ModuleOption func(*Config) error

ModuleOption is a function that configures module creation.

func DisableExecution

func DisableExecution() ModuleOption

DisableExecution disables task execution and only allows enqueuing

type NopBackend

type NopBackend struct{}

NopBackend is a noop implementation of the Backend interface

func (NopBackend) Encoder

func (b NopBackend) Encoder() Encoding

Encoder implements the Backend interface

func (NopBackend) Enqueue

func (b NopBackend) Enqueue(ctx context.Context, message []byte) error

Enqueue implements the Backend interface

func (NopBackend) ExecuteAsync

func (b NopBackend) ExecuteAsync() error

ExecuteAsync implements the Backend interface

func (NopBackend) Start

func (b NopBackend) Start() error

Start implements the Module interface

func (NopBackend) Stop

func (b NopBackend) Stop() error

Stop implements the Module interface

type NopEncoding

type NopEncoding struct {
}

NopEncoding is a noop encoder

func (NopEncoding) Marshal

func (g NopEncoding) Marshal(obj interface{}) ([]byte, error)

Marshal implements the Encoding interface

func (NopEncoding) Register

func (g NopEncoding) Register(obj interface{}) error

Register implements the Encoding interface

func (NopEncoding) Unmarshal

func (g NopEncoding) Unmarshal(data []byte, obj interface{}) error

Unmarshal implements the Encoding interface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL