harbinger

package module
v0.0.0-...-c28ad0a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2018 License: MIT Imports: 3 Imported by: 0

README

harbinger - SQS-based application

Build Status codecov

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

func NewHub

func NewHub() *Hub

func (*Hub) Broadcast

func (hub *Hub) Broadcast(i interface{})

func (*Hub) Signal

func (hub *Hub) Signal(i interface{})

func (*Hub) Subscribe

func (hub *Hub) Subscribe() *subscription

func (*Hub) Unsubscribe

func (hub *Hub) Unsubscribe(sub *subscription)

type Operation

type Operation interface {
	IncrementTry()

	// Blocks the flow of execution till the operation is done.
	// Must be able to call Wait() from various points in code.
	Wait()

	// Done() signals that the Operation is done. Anyone waiting
	// with Wait() will resume.
	//
	// Unfortunate for you, this has to be idempotent.
	// Minimally, it has to not crash when called consecutively
	Done()
}

Operation - an interface representing a unit of task to be done by a worker. For example, if you want to define a pool of http requests or database readers, then your operation can be a request object or an object that represents a database read. To capture the output, either publicly or privately set an output.

The interface assumes that Wait() would block until the operation is Done(). Otherwise, the operation may be a bit unpredictable.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

func NewRingBuffer

func NewRingBuffer(capacity uint) *RingBuffer

func (*RingBuffer) Close

func (buffer *RingBuffer) Close() error

func (*RingBuffer) Read

func (buffer *RingBuffer) Read(p []byte) (n int, err error)

func (*RingBuffer) Write

func (buffer *RingBuffer) Write(p []byte) (n int, err error)

type SubscriptionRing

type SubscriptionRing struct {
	// contains filtered or unexported fields
}

func NewSubscriptionRing

func NewSubscriptionRing() *SubscriptionRing

func (*SubscriptionRing) Add

func (rng *SubscriptionRing) Add(sub *subscription)

Add - add a subscription as a ring item to the end

func (*SubscriptionRing) Current

func (rng *SubscriptionRing) Current() *subscription

func (*SubscriptionRing) Do

func (rng *SubscriptionRing) Do(f func(*subscription))

func (*SubscriptionRing) Len

func (rng *SubscriptionRing) Len() int

Len - returns the number of nodes in the ring (see container/ring.Ring.Len)

func (*SubscriptionRing) Next

func (rng *SubscriptionRing) Next()

Next - advances the underlying ring node forward by one

func (*SubscriptionRing) Remove

func (rng *SubscriptionRing) Remove(sub *subscription) bool

Remove - remove a the first occurrence, if any, of a given subscription and returns true. If the node is not removed, the function returns false.

type Worker

type Worker interface {

	// Init -
	Init() error
	Process(op Operation) (bool, error)
	HandleError(error, Operation)
	Equal(Worker) bool
	Cleanup()
}

Worker - an interface representing a client or processor that handles requests (operations).

For example, if you want to define a pool of map/reduce processes then a worker may be a logical unit (consisting of several logical units) that processes a request. If you want to define a pool of database clients, then a worker may be some struct that accepts a database query for an op, and returns the seeker for the row.

Such a client/processor should know how to handle errors and clean up. The main function here would be its Process function which processes a certain operation and returns if it failed and if the operation should be retried.

type WorkerPool

type WorkerPool struct {
	Workers []Worker
	// contains filtered or unexported fields
}

WorkerPool - represents a pool of (possibly heterogeneous) Workers who will read messages off of a queue and process them. The idea here is that the messages without curation and dispatch may go through several passes before being handled by the correct worker.

What the worker pool does, then, is to provide basic controls around spinning up the worker, assigning tasks to the worker, keeping the workers up, and shutting down the pool when everything is done.

It exposes just three methods: Start, Execute, Shutdown. With these three methods we should be able to push tasks to initialise the workers, push tasks to them, and reclaim resources when done.

func NewPool

func NewPool(workers []Worker) *WorkerPool

NewPool - creates a new pool of workers. By passing it a list of workers, each will be initialised, and registered to receive messages on a queue, and restarted when some error occurs. Each will be shutdown appropriately when the shutdown sequence is called.

func (*WorkerPool) Do

func (pool *WorkerPool) Do(op Operation) error

Do - executes a single operation asynchronously. This returns an error if the worker pool is not running (e.g. has not started or is shutdown) because then the channels are all closed.

func (*WorkerPool) Execute

func (pool *WorkerPool) Execute(ops []Operation) (<-chan Operation, error)

Execute - executes a collection of operations (requests). The return is a channel that is closed when all the operations have either been successfully processed or failed.

If an error is returned, the channel is closed.

func (*WorkerPool) Shutdown

func (pool *WorkerPool) Shutdown()

Shutdown - shuts down the actor pool so that (some of) its resources can be reused, and the workers will be notified to shutdown.

This will prevent all other requests from being executed. This does not affect any requests that have begun processing.

func (*WorkerPool) Start

func (pool *WorkerPool) Start() error

Start - start the pool by setting up the workers to listen to the requests. A pool that isn't started cannot process any requests.

func (*WorkerPool) Wrap

func (pool *WorkerPool) Wrap(inStream <-chan Operation) (<-chan Operation, error)

Wrap - wraps an input stream into another output stream. The idea here is if the client has a streaming input, then rather than forcing clients to batch and executing using Execute, they should be able to fire directly into an input channel and we will handle processing the operations and then as operations are done, putting them onto the output channel.

When the input stream is closed, the output stream may not close. The output stream is only closed when all the inbound requests have been handled, either successfully or unsuccessfully (but marked as done).

Directories

Path Synopsis
bin
app

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL