Documentation ¶
Overview ¶
Package finn is a framework for easily creating workers that listen to jobs on a message queue (e.g. RabbitMQ, Kafka)
The following is an example of a worker that we can write.
package workers import "github.com/Wattpad/finn" type MyWorker struct { finn.BaseWorker } func (self *MyWorker) NewInstance() finn.GenericWorker { return &MyWorker{} } func (self *MyWorker) Name() string { return "MyWorker" } func (self *MyWorker) TopicName() string { return "my-topic" } func (self *MyWorker) Run() { // do something }
Here we register our worker with finn, then set the queue and queue configuration. After that we start finn up.
package main import "github.com/Wattpad/finn" func main() { finn.AddWorker(&finn.MyWorker{}) finn.SetQueue(&finn.RabbitQueue{}, finn.QueueConfig{"host": "localhost"}) finn.Listen() }
Index ¶
- func AddWorker(worker GenericWorker) error
- func Listen()
- func LogError(err error)
- func LogInfo(info string)
- func LogInfoColour(info string)
- func Pack(worker GenericWorker) []byte
- func SetQueue(userQueue GenericQueue, userConfig QueueConfig) error
- type BaseQueue
- type BaseWorker
- func (self *BaseWorker) Attempts() int
- func (self *BaseWorker) CanRun() bool
- func (self *BaseWorker) IncreaseAttempts()
- func (self *BaseWorker) MaxAttempts() int
- func (self *BaseWorker) NextStartStamp(delay int) int64
- func (self *BaseWorker) RetryDelay(delay int) time.Duration
- func (self *BaseWorker) RetryDelaySeconds() int
- func (self *BaseWorker) RunDelay() time.Duration
- func (self *BaseWorker) SetStartStamp(stamp int64)
- func (self *BaseWorker) StartTime() time.Time
- type GenericQueue
- type GenericTopic
- type GenericWorker
- type Job
- type MockQueue
- type MockTopic
- type QueueConfig
- type RabbitQueue
- type RabbitTopic
- type Runner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LogInfoColour ¶
func LogInfoColour(info string)
LogInfoColour is LogInfo except it colours the info green
func Pack ¶
func Pack(worker GenericWorker) []byte
Pack encodes a worker object into a job to be put into a queue
func SetQueue ¶
func SetQueue(userQueue GenericQueue, userConfig QueueConfig) error
SetQueue sets the queue and queue configuration that Finn will use
Types ¶
type BaseQueue ¶
type BaseQueue struct {
// contains filtered or unexported fields
}
BaseQueue contains common methods used by the different queue implementations
func (*BaseQueue) SetConfig ¶
func (self *BaseQueue) SetConfig(config QueueConfig, defaults QueueConfig)
SetConfig sets up configuration values. Defaults fill in for empty values in the passed-in QueueConfig.
type BaseWorker ¶
BaseWorker contains common methods used by the different worker implementations
func (*BaseWorker) Attempts ¶
func (self *BaseWorker) Attempts() int
func (*BaseWorker) CanRun ¶
func (self *BaseWorker) CanRun() bool
func (*BaseWorker) IncreaseAttempts ¶
func (self *BaseWorker) IncreaseAttempts()
func (*BaseWorker) MaxAttempts ¶
func (self *BaseWorker) MaxAttempts() int
func (*BaseWorker) NextStartStamp ¶
func (self *BaseWorker) NextStartStamp(delay int) int64
func (*BaseWorker) RetryDelay ¶
func (self *BaseWorker) RetryDelay(delay int) time.Duration
func (*BaseWorker) RetryDelaySeconds ¶
func (self *BaseWorker) RetryDelaySeconds() int
func (*BaseWorker) RunDelay ¶
func (self *BaseWorker) RunDelay() time.Duration
func (*BaseWorker) SetStartStamp ¶
func (self *BaseWorker) SetStartStamp(stamp int64)
func (*BaseWorker) StartTime ¶
func (self *BaseWorker) StartTime() time.Time
type GenericQueue ¶
type GenericQueue interface { // Initialize sets up the initial connection to the queue service Initialize(QueueConfig) error // NewTopic creates a new topic to listen on NewTopic(string) (GenericTopic, error) // Close shuts down the queue Close() error }
GenericQueue is the interface all queues using Finn must match (ie maps 1:1 with RabbitMQ, Kafka)
type GenericTopic ¶
type GenericTopic interface { // Stream returns a channel with a stream of messages from the topic Stream() (<-chan []byte, error) // Put sends a message on the topic Put([]byte) error // Close shuts down the connection to the topic Close() error }
GenericTopic represents a stream of messages from a topic in a queue
type GenericWorker ¶
type GenericWorker interface { // Name returns the name of the worker Name() string // Run performs the actual work. // Returns an error and whether to retry or not (bool) Run() (error, bool) // NewInstance returns a new instance of the worker NewInstance() GenericWorker // RunDelay returns the duration to sleep before running RunDelay() time.Duration // RetryDelay returns the duration to wait until the retry should start RetryDelay(int) time.Duration // RetryDelaySeconds returns the base number of seconds to wait before retrying RetryDelaySeconds() int // StartTime returns the time at which the worker should start running StartTime() time.Time // SetStartStamp sets the time at which the worker should start running SetStartStamp(int64) // NextStartStamp returns the next time the worker should start, based on RetryDelay() NextStartStamp(int) int64 // Attempts returns the run attempt number Attempts() int // MaxAttempts returns the maximum number of attempts MaxAttempts() int // IncreaseAttempts increments the attempts counter by one IncreaseAttempts() // TopicName returns the queue/topic that the worker should listen on TopicName() string }
GenericWorker is the interface that all workers using Finn must match
func Unpack ¶
func Unpack(value []byte, template GenericWorker) (GenericWorker, error)
Unpack decodes a job into a worker object
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
Job represents the packed job + the topic it came in on
type MockQueue ¶
type MockQueue struct { BaseQueue Topics map[string]*MockTopic // contains filtered or unexported fields }
MockQueue is a fake queue that is meant for testing workers
func (*MockQueue) Initialize ¶
func (self *MockQueue) Initialize(config QueueConfig) error
type MockTopic ¶
type MockTopic struct {
// contains filtered or unexported fields
}
func (*MockTopic) PutWorker ¶
func (self *MockTopic) PutWorker(worker GenericWorker)
PutWorker is a non-blocking testing function that adds a worker to the stream
type QueueConfig ¶
QueueConfig is used to do queue-specific configuration (e.g. will be different for RabbitMQ vs Kafka)
type RabbitQueue ¶
type RabbitQueue struct { BaseQueue // contains filtered or unexported fields }
RabbitQueue represents a connection to RabbitMQ
func (*RabbitQueue) Close ¶
func (self *RabbitQueue) Close() error
Close shuts down the connection to RabbitMQ
func (*RabbitQueue) Initialize ¶
func (self *RabbitQueue) Initialize(config QueueConfig) error
Initialize sets up the initial connection to RabbitMQ, and initalizes variables. Should only be called once.
func (*RabbitQueue) NewTopic ¶
func (self *RabbitQueue) NewTopic(topic string) (GenericTopic, error)
NewTopic creates a new channel, and then listens to a topic on that channel
type RabbitTopic ¶
type RabbitTopic struct {
// contains filtered or unexported fields
}
RabbitTopic is an abstraction on top of RabbitMQ queues
func (*RabbitTopic) Close ¶
func (self *RabbitTopic) Close() error
Close shuts down the topic listener, as well as closes the output stream
func (*RabbitTopic) Put ¶
func (self *RabbitTopic) Put(message []byte) error
Put sends a message out on the topic
func (*RabbitTopic) Stream ¶
func (self *RabbitTopic) Stream() (<-chan []byte, error)
Stream returns a blocking channel with deliveries from the topic. It also translates RabbitMQ messages into general-purpose ones.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner handles running/retrying workers
func (*Runner) Initialize ¶
Initialize sets up the worker runner