finn

package module
v0.0.0-...-cb16837 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2015 License: MIT Imports: 11 Imported by: 0

README

finn

Finn is a Go module for creating workers from any message queue (e.g. RabbitMQ)

Sample Worker
package workers

import "github.com/Wattpad/finn"

type MyWorker struct {
	finn.BaseWorker
}

func (self *MyWorker) NewInstance() finn.GenericWorker {
	return &MyWorker{}
}

func (self *MyWorker) Name() string {
	return "MyWorker"
}

func (self *MyWorker) TopicName() string {
	return "my-topic"
}

func (self *MyWorker) Run() {
	// do something
}
Sample main()
package main

import "github.com/Wattpad/finn"

func main() {
	finn.AddWorker(&finn.MyWorker{})
	finn.SetQueue(&finn.RabbitQueue{}, finn.QueueConfig{"host": "localhost"})
	finn.Listen()
}
Dependencies:
  • github.com/streadway/amqp (for RabbitMQ)
  • github.com/ugorji/go (for MessagePack)

Documentation

Overview

Package finn is a framework for easily creating workers that listen to jobs on a message queue (e.g. RabbitMQ, Kafka)

The following is an example of a worker that we can write.

package workers

import "github.com/Wattpad/finn"

type MyWorker struct {
	finn.BaseWorker
}

func (self *MyWorker) NewInstance() finn.GenericWorker {
	return &MyWorker{}
}

func (self *MyWorker) Name() string {
	return "MyWorker"
}

func (self *MyWorker) TopicName() string {
	return "my-topic"
}

func (self *MyWorker) Run() {
	// do something
}

Here we register our worker with finn, then set the queue and queue configuration. After that we start finn up.

package main

import "github.com/Wattpad/finn"

func main() {
	finn.AddWorker(&finn.MyWorker{})
	finn.SetQueue(&finn.RabbitQueue{}, finn.QueueConfig{"host": "localhost"})
	finn.Listen()
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddWorker

func AddWorker(worker GenericWorker) error

AddWorker registers a worker with Finn

func Listen

func Listen()

Listen boots Finn up and begins listening for work on the queue

func LogError

func LogError(err error)

LogError logs error messages to standard error

func LogInfo

func LogInfo(info string)

LogInfo logs informational messages to standard out

func LogInfoColour

func LogInfoColour(info string)

LogInfoColour is LogInfo except it colours the info green

func Pack

func Pack(worker GenericWorker) []byte

Pack encodes a worker object into a job to be put into a queue

func SetQueue

func SetQueue(userQueue GenericQueue, userConfig QueueConfig) error

SetQueue sets the queue and queue configuration that Finn will use

Types

type BaseQueue

type BaseQueue struct {
	// contains filtered or unexported fields
}

BaseQueue contains common methods used by the different queue implementations

func (*BaseQueue) SetConfig

func (self *BaseQueue) SetConfig(config QueueConfig, defaults QueueConfig)

SetConfig sets up configuration values. Defaults fill in for empty values in the passed-in QueueConfig.

type BaseWorker

type BaseWorker struct {
	RunAttempts int
	StartStamp  int64
}

BaseWorker contains common methods used by the different worker implementations

func (*BaseWorker) Attempts

func (self *BaseWorker) Attempts() int

func (*BaseWorker) CanRun

func (self *BaseWorker) CanRun() bool

func (*BaseWorker) IncreaseAttempts

func (self *BaseWorker) IncreaseAttempts()

func (*BaseWorker) MaxAttempts

func (self *BaseWorker) MaxAttempts() int

func (*BaseWorker) NextStartStamp

func (self *BaseWorker) NextStartStamp(delay int) int64

func (*BaseWorker) RetryDelay

func (self *BaseWorker) RetryDelay(delay int) time.Duration

func (*BaseWorker) RetryDelaySeconds

func (self *BaseWorker) RetryDelaySeconds() int

func (*BaseWorker) RunDelay

func (self *BaseWorker) RunDelay() time.Duration

func (*BaseWorker) SetStartStamp

func (self *BaseWorker) SetStartStamp(stamp int64)

func (*BaseWorker) StartTime

func (self *BaseWorker) StartTime() time.Time

type GenericQueue

type GenericQueue interface {
	// Initialize sets up the initial connection to the queue service
	Initialize(QueueConfig) error

	// NewTopic creates a new topic to listen on
	NewTopic(string) (GenericTopic, error)

	// Close shuts down the queue
	Close() error
}

GenericQueue is the interface all queues using Finn must match (ie maps 1:1 with RabbitMQ, Kafka)

type GenericTopic

type GenericTopic interface {
	// Stream returns a channel with a stream of messages from the topic
	Stream() (<-chan []byte, error)

	// Put sends a message on the topic
	Put([]byte) error

	// Close shuts down the connection to the topic
	Close() error
}

GenericTopic represents a stream of messages from a topic in a queue

type GenericWorker

type GenericWorker interface {
	// Name returns the name of the worker
	Name() string

	// Run performs the actual work.
	// Returns an error and whether to retry or not (bool)
	Run() (error, bool)

	// NewInstance returns a new instance of the worker
	NewInstance() GenericWorker

	// RunDelay returns the duration to sleep before running
	RunDelay() time.Duration

	// RetryDelay returns the duration to wait until the retry should start
	RetryDelay(int) time.Duration

	// RetryDelaySeconds returns the base number of seconds to wait before retrying
	RetryDelaySeconds() int

	// StartTime returns the time at which the worker should start running
	StartTime() time.Time

	// SetStartStamp sets the time at which the worker should start running
	SetStartStamp(int64)

	// NextStartStamp returns the next time the worker should start, based on RetryDelay()
	NextStartStamp(int) int64

	// Attempts returns the run attempt number
	Attempts() int

	// MaxAttempts returns the maximum number of attempts
	MaxAttempts() int

	// IncreaseAttempts increments the attempts counter by one
	IncreaseAttempts()

	// TopicName returns the queue/topic that the worker should listen on
	TopicName() string
}

GenericWorker is the interface that all workers using Finn must match

func Unpack

func Unpack(value []byte, template GenericWorker) (GenericWorker, error)

Unpack decodes a job into a worker object

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job represents the packed job + the topic it came in on

type MockQueue

type MockQueue struct {
	BaseQueue
	Topics map[string]*MockTopic
	// contains filtered or unexported fields
}

MockQueue is a fake queue that is meant for testing workers

func (*MockQueue) Close

func (self *MockQueue) Close() error

func (*MockQueue) GetTopic

func (self *MockQueue) GetTopic(topicName string) *MockTopic

GetTopic returns a topic for testing

func (*MockQueue) Initialize

func (self *MockQueue) Initialize(config QueueConfig) error

func (*MockQueue) NewTopic

func (self *MockQueue) NewTopic(topic string) (GenericTopic, error)

type MockTopic

type MockTopic struct {
	// contains filtered or unexported fields
}

func (*MockTopic) Close

func (self *MockTopic) Close() error

func (*MockTopic) Put

func (self *MockTopic) Put(message []byte) error

Put is a non-blocking testing function that adds a message to the stream

func (*MockTopic) PutWorker

func (self *MockTopic) PutWorker(worker GenericWorker)

PutWorker is a non-blocking testing function that adds a worker to the stream

func (*MockTopic) Stream

func (self *MockTopic) Stream() (<-chan []byte, error)

type QueueConfig

type QueueConfig map[string]string

QueueConfig is used to do queue-specific configuration (e.g. will be different for RabbitMQ vs Kafka)

type RabbitQueue

type RabbitQueue struct {
	BaseQueue
	// contains filtered or unexported fields
}

RabbitQueue represents a connection to RabbitMQ

func (*RabbitQueue) Close

func (self *RabbitQueue) Close() error

Close shuts down the connection to RabbitMQ

func (*RabbitQueue) Initialize

func (self *RabbitQueue) Initialize(config QueueConfig) error

Initialize sets up the initial connection to RabbitMQ, and initalizes variables. Should only be called once.

func (*RabbitQueue) NewTopic

func (self *RabbitQueue) NewTopic(topic string) (GenericTopic, error)

NewTopic creates a new channel, and then listens to a topic on that channel

type RabbitTopic

type RabbitTopic struct {
	// contains filtered or unexported fields
}

RabbitTopic is an abstraction on top of RabbitMQ queues

func (*RabbitTopic) Close

func (self *RabbitTopic) Close() error

Close shuts down the topic listener, as well as closes the output stream

func (*RabbitTopic) Put

func (self *RabbitTopic) Put(message []byte) error

Put sends a message out on the topic

func (*RabbitTopic) Stream

func (self *RabbitTopic) Stream() (<-chan []byte, error)

Stream returns a blocking channel with deliveries from the topic. It also translates RabbitMQ messages into general-purpose ones.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner handles running/retrying workers

func (*Runner) Close

func (self *Runner) Close()

Close shuts down the Runner and underlying queue

func (*Runner) Initialize

func (self *Runner) Initialize() error

Initialize sets up the worker runner

func (*Runner) Retry

func (self *Runner) Retry(worker GenericWorker, waitGroup *sync.WaitGroup)

Retry handles the logic for retrying a job

func (*Runner) Run

func (self *Runner) Run(worker GenericWorker, waitGroup *sync.WaitGroup)

Run handles the run and retry logic for a single job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL