taskhawk

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2019 License: Apache-2.0 Imports: 22 Imported by: 0

README

TaskHawk Go

Build Status Go Report Card Godoc

TaskHawk is a replacement for celery that works on AWS SQS/SNS, while keeping things pretty simple and straight forward. Any unbound function can be converted into a TaskHawk task.

Only Go 1.10+ is supported currently.

This project uses semantic versioning.

Quick Start

First, install the library:

go get github.com/Automatic/taskhawk-go

Convert your function into a "Task" as shown here:

type SendEmailTaskInput struct {...}

type SendEmailTask struct {
    taskhawk.Task
}

func (t *SendEmailTask) Run(context context.Context, rawInput interface{}) error {
    input := rawInput.(*SendEmailTaskInput)
    // send email
}

Tasks may accept input of arbitrary type as long as it's serializable to JSON

Then, define a few required settings:

sessionCache := NewAWSSessionsCache()

settings := taskhawk.Settings{
    AWSAccessKey: <YOUR AWS ACCESS KEY>,
    AWSAccountID: <YOUR AWS ACCOUNT ID>,
    AWSRegion: <YOUR AWS REGION>,
    AWSSecretKey: <YOUR AWS SECRET KEY>,

    Queue: <YOUR TASKHAWK QUEUE>,
}
taskhawk.InitSettings(settings)

Before the task can be dispatched, it would need to be registered, as shown below. It is recommended that the task names are centrally managed by the application.

taskRegistry, _ := NewTaskRegistry(NewPublisher(sessionCache, settings))
taskRegistry.RegisterTask(&SendEmailTask{
    Task: taskhawk.Task{
        Inputer: func() interface{} {
            return &SendEmailTaskInput{}
        },
        TaskName: "SendEmailTask",
    },
})

And finally, dispatch your task asynchronously:

taskRegistry.dispatch("SendEmailTask", &SendEmailTaskInput{...})

Development

Prerequisites

Install go1.11.x

Getting Started

Assuming that you have golang installed, set up your environment like so:


$ cd ${GOPATH}/src/github.com/Automatic/taskhawk-go
$ go build
Running tests

$ make test  
# OR
$ go test -tags test ./...

Getting Help

We use GitHub issues for tracking bugs and feature requests.

  • If it turns out that you may have found a bug, please open an issue

Release notes

Current version: v1.0.2-dev

v1.0.0
  • Initial version

Documentation

Overview

Package taskhawk is a replacement for celery that works on AWS SQS/SNS, while keeping things pretty simple and straightforward. Any unbound function can be converted into a TaskHawk task.

For inter-service messaging, see Hedwig: https://godoc.org/github.com/Automatic/hedwig-go/hedwig.

Provisioning

Taskhawk works on SQS and SNS as backing queues. Before you can publish tasks, you need to provision the required infra. This may be done manually, or, preferably, using Terraform. Taskhawk provides tools to make infra configuration easier: see Taskhawk Terraform Generator (https://github.com/Automatic/taskhawk-terraform-generator) for further details.

Using Taskhawk

To use taskhawk, convert your function into a "Task" as shown here:

type SendEmailTask struct {
	taskhawk.Task
}

func (t *SendEmailTask) Run(context.Context, interface{}) error {...}

Tasks may accept input of arbitrary type as long as it's serializable to JSON.

Then, define a few required settings:

sessionCache := AWSSessionsCache{}

settings := taskhawk.Settings{
	AWSAccessKey: <YOUR AWS ACCESS KEY>,
	AWSAccountID: <YOUR AWS ACCOUNT ID>,
	AWSRegion: <YOUR AWS REGION>,
	AWSSecretKey: <YOUR AWS SECRET KEY>,

	Queue: <YOUR TASKHAWK QUEUE>,
}
taskhawk.InitSettings(settings)

Before the task can be dispatched, it would need to be registered like so:

func NewSendEmailTask() *SendEmailTask {
	return &SendEmailTask{
		Task: taskhawk.Task{
			TaskName:  "tasks.SendEmailTask",
			Inputer: func() interface{} {
				return &SendEmailTaskInput{}
			},
			Publisher: NewPublisher(sessionCache, settings),
		}
	}
}

taskhawk.RegisterTask(NewSendEmailTask())

And finally, dispatch your task asynchronously:

NewSendEmailTask().dispatch(&SendEmailTaskInput{...})

To pass your context, use:

NewSendEmailTask().dispatchWithContext(ctx, &SendEmailTaskInput{...})

If you want to include a custom headers with the message (for example, you can include a request_id field for cross-application tracing), you can set it on the input object (ITaskHeaders interface).

If you want to customize priority, you can do it like so:

NewSendEmailTask().dispatchWithPriority(ctx, taskhawk.PriorityHigh, &SendEmailTaskInput{...})

Tasks are held in SQS queue until they're successfully executed, or until they fail a configurable number of times. Failed tasks are moved to a Dead Letter Queue, where they're held for 14 days, and may be examined for further debugging.

Priority

Taskhawk provides 4 priority queues to use, which may be customized per task, or per message. For more details, see https://godoc.org/github.com/Automatic/taskhawk-go/taskhawk#Priority.

Metadata and Headers

If your input struct satisfies `taskhawk.ITaskMetadata` interface, it'll be filled in with the following attributes:

id: task identifier. This represents a run of a task.

priority: the priority this task message was dispatched with.

receipt: SQS receipt for the task. This may be used to extend message visibility if the task is running longer than expected.

timestamp: task dispatch epoch timestamp (milli-seconds)

version: message format version.

If your input struct satisfies ITaskHeaders interface, it'll be filled with custom headers that the task was dispatched with.

Helper structs that automatically satisfy these interfaces are available in taskhawk library that you may embed in your input struct like so:

type SendEmailTaskInput struct {
	taskhawk.TaskMetadata
	taskhawk.TaskHeaders
	...
}

For a compile time type assertion check, you may add (in global scope):

var _ taskhawk.ITaskMetadata = &SendEmailTaskInput{}
var _ taskhawk.ITaskHeaders = &SendEmailTaskInput{}

This snippet won't consume memory or do anything at runtime.

Consumer

A consumer for SQS based workers can be started as following:

consumer := taskhawk.NewQueueConsumer(sessionCache, settings)
consumer.ListenForMessages(ctx, &taskhawk.ListenRequest{...})

This is a blocking function, so if you want to listen to multiple priority queues, you'll need to run these on separate goroutines.

A consumer for Lambda based workers can be started as following:

consumer := taskhawk.NewLambdaConsumer(sessionCache, settings)
consumer.HandleLambdaEvent(ctx, &snsEvent)

where snsEvent is the event provided by AWS to your Lambda function as described in AWS documentation: https://docs.aws.amazon.com/lambda/latest/dg/eventsources.html#eventsources-sns.

Index

Constants

View Source
const (
	ErrStringTaskNotFound = "Task not found"
)

Error strings used within Taskhawk Error strings are used instead of errors, cause the error stack trace can only be reliably constructed from within the appropriate execution context

Variables

View Source
var ErrRetry = errors.New("Retry exception")

ErrRetry indicates that task failed for a known reason and should be retried without logging falure

Functions

func InitSettings

func InitSettings(settings *Settings)

InitSettings initializes a settings object after validating all values and filling in defaults. This function must be called before Settings object is used.

func NewLambdaHandler

func NewLambdaHandler(consumer ILambdaConsumer) lambda.Handler

NewLambdaHandler returns a new lambda Handler that can be started like so:

func main() {
    lambda.StartHandler(NewLambdaHandler(consumer))
}

If you want to add additional error handle (e.g. panic catch etc), you can always use your own Handler, and call LambdaHandler.Invoke

Types

type AWSSessionsCache

type AWSSessionsCache struct {
	// contains filtered or unexported fields
}

AWSSessionsCache is a cache that holds AWS sessions

func NewAWSSessionsCache

func NewAWSSessionsCache() *AWSSessionsCache

NewAWSSessionsCache creates a new session cache

func (*AWSSessionsCache) GetSession

func (c *AWSSessionsCache) GetSession(ctx context.Context) *session.Session

GetSession retrieves a session if it is cached, otherwise creates one

type DefaultHeaders

type DefaultHeaders func(ctx context.Context, task ITask) map[string]string

DefaultHeaders is the type of the function for injecting custom headers for every task

type ILambdaConsumer

type ILambdaConsumer interface {
	// HandleLambdaInput processes taskhawk messages for Lambda apps and calls the task like so:
	//
	// task_fn(input).
	//
	// If `input` implements `ITaskMetadata`, metadata will be filled in with the following things: id, version,
	// header, receipt.
	//
	// The error returned by the task is returned to the caller, which should be returned from the Lambda handler.
	HandleLambdaEvent(ctx context.Context, snsEvent *events.SNSEvent) error
}

ILambdaConsumer represents a taskhawk consumer for lambda apps

type IPublisher

type IPublisher interface {
	// Publish publishes a message on Taskhawk broker
	Publish(ctx context.Context, message *message) error

	// Settings returns publisher's settings
	Settings() *Settings
}

IPublisher interface represents all publish related functions

func NewPublisher

func NewPublisher(sessionCache *AWSSessionsCache, settings *Settings) IPublisher

NewPublisher creates a new publisher

type IQueueConsumer

type IQueueConsumer interface {
	// ListenForMessages starts a taskhawk listener for message types provided and calls the task like so:
	//
	// task_fn(input).
	//
	// If `input` implements `ITaskMetadata`, metadata will be filled in with the following things: id, version,
	// header, receipt.
	//
	// The message is explicitly deleted only if task function ran successfully. In case of an exception the message is
	// kept on queue and processed again. If the callback keeps failing, SQS dead letter queue mechanism kicks in and
	// the message is moved to the dead-letter queue.
	//
	// This function never returns by default. Possible shutdown methods:
	// 1. Cancel the context - returns immediately.
	// 2. Set a deadline on the context of less than 10 seconds - returns after processing current messages.
	// 3. Run for limited number of loops by setting LoopCount on the request - returns after running loop a finite
	// number of times
	ListenForMessages(ctx context.Context, request *ListenRequest) error
}

IQueueConsumer represents a taskhawk consumer for SQS apps

type ITask

type ITask interface {
	// Name of the task. This is used to serialize/deserialize tasks, and so should be changed carefully
	Name() string

	// Priority of the task by default. A publisher _may_ chose to override.
	Priority() Priority

	// NewInput returns an empty input struct as expected by the Task's Run method. May be `nil`
	// If your task needs to get custom headers set during dispatch, implement interface ITaskHeaders,
	// or embed TaskHeaders
	// If your task needs to get metadata (message id etc), implement interface ITaskMetadata, or embed TaskMetadata
	NewInput() interface{}

	// Run is the main method for a task. The conrete type of the input parameter will be same as whatever is
	// returned by the NewInput() method.
	Run(context context.Context, input interface{}) error
}

ITask is an interface all TaskHawk tasks are expected to implement

type ITaskHeaders

type ITaskHeaders interface {
	// SetHeaders sets the headers on a task input
	SetHeaders(map[string]string)

	// GetHeaders returns the headers set on a task input
	GetHeaders() map[string]string
}

ITaskHeaders interface needs to be implemented by the input struct if your task needs to get custom headers set during dispatch

type ITaskMetadata

type ITaskMetadata interface {
	// SetID sets the message id
	SetID(string)

	// SetPriority sets the priority a message was dispatched with
	SetPriority(Priority)

	// SetReceipt sets the message receipt from SQS.
	// This may be used to extend visibility timeout for long running tasks
	SetReceipt(string)

	// SetTimestamp sets the message dispatch timestamp
	SetTimestamp(JSONTime)

	// SetVersion sets the message schema version
	SetVersion(Version)
}

ITaskMetadata interface needs to be implemented by the input struct if your task needs to get metatada ( message id etc)

type ITaskRegistry

type ITaskRegistry interface {
	// DispatchWithPriority dispatches a task asynchronously with custom priority.
	// The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.
	DispatchWithPriority(ctx context.Context, taskName string, priority Priority, input interface{}) error

	// DispatchWithContext dispatches a task asynchronously with context.
	// The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.
	DispatchWithContext(ctx context.Context, taskName string, input interface{}) error

	// Dispatch a task asynchronously. The concrete type of input is expected to be same as the concrete type of
	// NewInput()'s return value.
	Dispatch(taskName string, input interface{}) error

	// GetTask fetches a task from the task registry
	GetTask(name string) (ITask, error)

	// NewLambdaConsumer creates a new taskhawk consumer for lambda apps
	//
	// Cancelable context may be used to cancel processing of messages
	NewLambdaConsumer(sessionCache *AWSSessionsCache, settings *Settings) ILambdaConsumer

	// NewQueueConsumer creates a new taskhawk consumer for queue apps
	//
	// Cancelable context may be used to cancel processing of messages
	NewQueueConsumer(sessionCache *AWSSessionsCache, settings *Settings) IQueueConsumer

	// RegisterTask registers the task to the task registry
	RegisterTask(task ITask) error
}

ITaskRegistry is an interface for the task registry to manage tasks

type JSONTime

type JSONTime time.Time

JSONTime is just a wrapper around time that serializes time to epoch in milliseconds

func (JSONTime) MarshalJSON

func (t JSONTime) MarshalJSON() ([]byte, error)

MarshalJSON changes time to epoch in milliseconds

func (*JSONTime) UnmarshalJSON

func (t *JSONTime) UnmarshalJSON(b []byte) error

UnmarshalJSON changes time from epoch in milliseconds or string (RFC3339) to time.Time.

type LambdaHandler

type LambdaHandler struct {
	// contains filtered or unexported fields
}

LambdaHandler implements Lambda.Handler interface

func (*LambdaHandler) Invoke

func (handler *LambdaHandler) Invoke(ctx context.Context, payload []byte) ([]byte, error)

Invoke is the function that is invoked when a Lambda executes

type LambdaRequest

type LambdaRequest struct {
	Ctx          context.Context
	Priority     Priority
	Record       *events.SNSEventRecord
	TaskRegistry ITaskRegistry
}

LambdaRequest represents a request for lambda apps

type ListenRequest

type ListenRequest struct {
	// Priority represents the priority queue for fetching messages from. Defaults to PriorityDefault.
	Priority Priority

	// NumMessages represents the number of SQS messages to fetch in one API call. Defaults ot 1.
	NumMessages uint

	// VisibilityTimeoutS represents the default visibility timeout in seconds for a message.
	// This is the amount of time you have to process your tasks.
	// It defaults to whatever is set in the queue configuration.
	VisibilityTimeoutS uint

	// LoopCount is the number of loops to run for fetching messages.
	// This may be used to limit to only certain number of messages.
	// Defaults to running as an infinite loop until the context is canceled.
	LoopCount uint
}

ListenRequest represents a request to listen for messages

type PreProcessHookLambdaApp

type PreProcessHookLambdaApp func(request *LambdaRequest) error

PreProcessHookLambdaApp is the type of the function for pre-process hook for lambda apps

type PreProcessHookQueueApp

type PreProcessHookQueueApp func(request *QueueRequest) error

PreProcessHookQueueApp is the type of the function for pre-process hook for SQS apps

type Priority

type Priority int

Priority of a task. This may be used to differentiate batch jobs from other tasks for example.

High and low priority queues provide independent scaling knobs for your use-case.

const (
	// PriorityDefault is the default priority of a task if nothing is specified. In most cases,
	// using just the default queue should work fine.
	PriorityDefault Priority = iota // Keep default first so empty values automatically default
	PriorityLow
	PriorityHigh
	// PriorityBulk queue will typically have different monitoring, and may be used for bulk jobs,
	// such as sending push notifications to all users. This allows you to effectively
	// throttle the tasks.
	PriorityBulk
)

Priority for a task

func (Priority) MarshalJSON

func (p Priority) MarshalJSON() ([]byte, error)

MarshalJSON changes Priority to a JSON string

func (*Priority) UnmarshalJSON

func (p *Priority) UnmarshalJSON(b []byte) error

UnmarshalJSON changes priority from a JSON string to Priority

type QueueRequest

type QueueRequest struct {
	Ctx          context.Context
	Priority     Priority
	QueueMessage *sqs.Message
	QueueName    string
	QueueURL     string
	TaskRegistry ITaskRegistry
}

QueueRequest represents a request for queue apps

type Settings

type Settings struct {
	AWSRegion    string
	AWSAccountID string
	AWSAccessKey string
	AWSSecretKey string

	// AWS read timeout for publisher
	AWSReadTimeout time.Duration

	// AWS debug request error logs toggle
	AWSDebugRequestLogEnabled bool

	// AWSSessionToken represents temporary credentials (for example, for Lambda apps)
	AWSSessionToken string // optional;

	// DefaultHeaders is a function that may be used to inject custom headers into every message,
	// for example, request id. This hook is called right before dispatch, and
	// any headers that are explicitly specified when dispatching may override
	// these headers.
	DefaultHeaders DefaultHeaders // optional;

	// IsLambdaApp indicates if this is a lambda app (which uses SNS instead of SQS).
	IsLambdaApp bool

	// PreProcessHookQueueApp is a function which can be used to plug into the message processing pipeline
	// BEFORE any processing happens. This hook may be used to perform
	// initializations such as set up a global request id based on message
	// headers.
	PreProcessHookQueueApp PreProcessHookQueueApp // optional;

	// PreProcessHookLambdaApp is a function which can be used to plug into the message processing pipeline
	// BEFORE any processing happens. This hook may be used to perform
	// initializations such as set up a global request id based on message
	// headers.
	PreProcessHookLambdaApp PreProcessHookLambdaApp // optional;

	// Queue is the name of the taskhawk queue for this project (exclude the TASKHAWK- prefix)
	Queue string

	// ShutdownTimeout is the time the app has to shut down before being brutally killed
	ShutdownTimeout time.Duration // optional; defaults to 10s

	// Sync changes taskhawk dispatch to synchronous mode. This is similar
	// to Celery's Eager mode and is helpful for integration testing
	Sync bool
}

Settings is used to create Taskhawk settings

type Task

type Task struct {
	// TaskName represents the name of the task
	TaskName string

	// Inputer is a function that returns an empty input object as the task expects.
	// This is optional and if not specified, it implies task doesn't require input
	Inputer inputer

	// DefaultPriority is the default priority of a task. This may be overridden at a specific message level.
	DefaultPriority Priority
}

Task is a base struct that should be embedded in all TaskHawk tasks. It provides partial implementation of the ITask interface by implementing a few methods

func (*Task) Name

func (t *Task) Name() string

Name returns the task name

func (*Task) NewInput

func (t *Task) NewInput() interface{}

NewInput returns an empty input struct of concrete type same as the concrete type expected by the Task's Run method.

func (*Task) Priority

func (t *Task) Priority() Priority

Priority returns the default priority of a task

type TaskHeaders

type TaskHeaders struct {
	Headers map[string]string
}

TaskHeaders provides a default implementation for ITaskHeaders and may be embedded in your input struct

func (*TaskHeaders) GetHeaders

func (h *TaskHeaders) GetHeaders() map[string]string

GetHeaders returns the custom headers passed when the task was dispatched

func (*TaskHeaders) SetHeaders

func (h *TaskHeaders) SetHeaders(headers map[string]string)

SetHeaders sets the custom headers passed when the task was dispatched

type TaskMetadata

type TaskMetadata struct {
	ID        string
	Priority  Priority
	Receipt   string
	Timestamp JSONTime
	Version   Version
}

TaskMetadata provides a default implementation for ITaskMetadata and may be embedded in your input struct

func (*TaskMetadata) SetID

func (m *TaskMetadata) SetID(id string)

SetID sets the message id

func (*TaskMetadata) SetPriority

func (m *TaskMetadata) SetPriority(priority Priority)

SetPriority sets the priority a message was dispatched with

func (*TaskMetadata) SetReceipt

func (m *TaskMetadata) SetReceipt(receipt string)

SetReceipt sets the message receipt from SQS. This may be used to extend visibility timeout for long running tasks

func (*TaskMetadata) SetTimestamp

func (m *TaskMetadata) SetTimestamp(time JSONTime)

SetTimestamp sets the message dispatch timestamp

func (*TaskMetadata) SetVersion

func (m *TaskMetadata) SetVersion(version Version)

SetVersion sets the message schema version

type TaskRegistry

type TaskRegistry struct {
	// contains filtered or unexported fields
}

TaskRegistry manages and dispatches tasks registered to this registry

func NewTaskRegistry

func NewTaskRegistry(publisher IPublisher) (*TaskRegistry, error)

NewTaskRegistry creates a task registry

func (*TaskRegistry) Dispatch

func (tr *TaskRegistry) Dispatch(taskName string, input interface{}) error

Dispatch a task asynchronously. The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.

func (*TaskRegistry) DispatchWithContext

func (tr *TaskRegistry) DispatchWithContext(ctx context.Context, taskName string, input interface{}) error

DispatchWithContext dispatches a task asynchronously with context. The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.

func (*TaskRegistry) DispatchWithPriority

func (tr *TaskRegistry) DispatchWithPriority(ctx context.Context, taskName string, priority Priority, input interface{}) error

DispatchWithPriority dispatches a task asynchronously with custom priority. The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.

func (*TaskRegistry) GetTask

func (tr *TaskRegistry) GetTask(name string) (ITask, error)

GetTask fetches a task from the task registry

func (*TaskRegistry) NewLambdaConsumer

func (tr *TaskRegistry) NewLambdaConsumer(sessionCache *AWSSessionsCache, settings *Settings) ILambdaConsumer

NewLambdaConsumer creates a new taskhawk consumer for lambda apps

Cancelable context may be used to cancel processing of messages

func (*TaskRegistry) NewQueueConsumer

func (tr *TaskRegistry) NewQueueConsumer(sessionCache *AWSSessionsCache, settings *Settings) IQueueConsumer

NewQueueConsumer creates a new taskhawk consumer for queue apps

Cancelable context may be used to cancel processing of messages

func (*TaskRegistry) RegisterTask

func (tr *TaskRegistry) RegisterTask(task ITask) error

RegisterTask registers the task to the task registry

type Version

type Version string

Version represents the message format version

const (
	// Version1_0 represents the first version of the message format schema
	Version1_0 Version = "1.0"

	// CurrentVersion represents the current version of the taskhawk message schema
	CurrentVersion = Version1_0
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL