fusion

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2020 License: MIT Imports: 8 Imported by: 2

README

💥 Fusion

WIP

Fusion is a tiny actor library/tool written in Go. Fusion provides an actor that consumes from a stream, calls a registered processor function while applying retries if necessary.

Fusion can be used as a library within a tool for file processing etc, or can be used to build distributed actor systems using stream/queue implementations based on Kafka, Redis etc.

Architecture

Features

  • Automatic retries with configurable backoff strategies (Constant, Exponential or custom).
  • Streams: Line stream from any io.Reader, Kafka stream or custom (Implement fusion.Stream interface).
  • Delay Queues: In-Memory queue for simple use cases, Redis (WIP) queue for distributed use cases or custom (fusion.DelayQueue interface).
  • Concurrent workers.

Usage

As Library
package main

import (
    "context"
    "os"
    "github.com/spy16/fusion"
    "github.com/spy16/fusion/stream"
)

func main() {
    actor := fusion.New(fusion.Options{
        Workers: 10,
        MaxRetries: 10,
        Stream: &stream.Lines{From: os.Stdin},
        Processor: func(ctx context.Context, msg fusion.Message) error {
            // process message. Returning error triggers retries if possible.
            // can return fusion.Skip to indicate message should be ignored.
            return nil
        },
        OnFailure: func(msg fusion.Message, err error) {
           // message exhausted all retries. push to dead-letter queue or
           // just log it.
        },
    })
    _ = actor.Run(context.Background())
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoMessage can be returned from stream/queue implementations to indicate that
	// the stream/queue currently has no message. Actor might switch to polling mode in
	// this case.
	ErrNoMessage = errors.New("no message available")

	// Skip can be returned by processor functions to indicate that the message
	// should be ignored.
	Skip = errors.New("skip message")

	// Failed can be returned from processor functions to indicate that the message
	// must be failed immediately (i.e., skip retries even if available). Processors
	// can wrap this message using '%w' directive in fmt functions.
	Failed = errors.New("fail message")
)

Functions

This section is empty.

Types

type Actor

type Actor struct {
	Logger
	// contains filtered or unexported fields
}

Actor represents an entity that consumes messages from the stream and acts on it. If processing message from the stream fails, it is queued in the configured delay queue if retries are enabled. If retries are not enabled, messages are passed to the configured OnFailure handler.

func New

func New(opts Options) *Actor

New returns a new actor with given configurations. If proc is nil, actor will consume from queue and skip everything. If queue is nil, in-memory queue will be used if retries are enabled.

func (*Actor) Run

func (actor *Actor) Run(ctx context.Context) error

Run spawns all the worker goroutines and blocks until all of them return. Workers will exit when context is cancelled or when stream and delay-queue both return EOF.

type Backoff

type Backoff interface {
	// RetryAfter should return the time duration which should be
	// elapsed before the next queueForRetry.
	RetryAfter(msg Message) time.Duration
}

Backoff represents a backoff strategy to be used by the actor.

func ConstBackoff

func ConstBackoff(interval time.Duration) Backoff

ConstBackoff implements a constant interval Backoff strategy.

func ExpBackoff

func ExpBackoff(base float64, initialTimeout, maxTimeout time.Duration) Backoff

ExpBackoff provides a simple exponential Backoff strategy for retries.

type DelayQueue

type DelayQueue interface {
	// Enqueue must save the message with priority based on the timestamp set.
	// If no timestamp is set, current timestamp should be assumed.
	Enqueue(msg Message) error

	// Dequeue should read one message that has an expired timestamp and call
	// readFn with it. Success/failure from readFn must be considered as ACK
	// or nACK respectively. When message is not available, Dequeue should not
	// block but return ErrNoMessage. Queue can return EOF to indicate that the
	// queue is fully drained. Other errors from the queue will be logged and
	// ignored.
	Dequeue(ctx context.Context, readFn ReadFn) error
}

DelayQueue implementation maintains the messages in a timestamp based order. This is used by actor for retries and manual enqueuing messages.

type InMemQ

type InMemQ struct {
	// contains filtered or unexported fields
}

InMemQ implements an in-memory min-heap based message queue.

func (*InMemQ) Dequeue

func (q *InMemQ) Dequeue(ctx context.Context, readFn ReadFn) error

Dequeue reads a message from the in-mem heap if available and calls readFn with it. Otherwise returns ErrNoMessage.

func (*InMemQ) Enqueue

func (q *InMemQ) Enqueue(msg Message) error

Enqueue pushes the message into the in-mem heap with timestamp as its priority. If timestamp is not set, current timestamp will be assumed.

type Logger

type Logger interface {
	Debugf(msg string, args ...interface{})
	Infof(msg string, args ...interface{})
	Warnf(msg string, args ...interface{})
	Errorf(msg string, args ...interface{})
}

Logger implementations provide logging facilities for Actor.

type Message

type Message struct {
	Key []byte `json:"key" xml:"key"`
	Val []byte `json:"val" xml:"val"`

	// Time at which message arrived or should be processed when scheduled by
	// retrying logic. (Managed by the actor)
	Time time.Time `json:"time" xml:"time"`

	// Attempts is incremented by the actor every time an attempt is done to
	// process the message.
	Attempts int `json:"attempts" xml:"attempts"`
}

Message represents a message from the stream. Contents of key and value are not validated by the framework itself, but may be validated by the processor functions.

type Options

type Options struct {
	// Stream is the primary source of messages for the actor. Worker
	// threads read from stream continuously and process the messages.
	// If stream is not set, actor will rely entirely on the queue and
	// manually enqueued messages using actor.Enqueue().
	Stream Stream

	// Queue is used for retries and for manually enqueuing messages
	// for the actor. If queue is not set, an in-memory queue will be
	// used.
	Queue DelayQueue

	// Processor function to use for processing the messages. If not
	// set, a no-op processor will be used that skips everything.
	Processor Processor

	// Workers is the number of worker goroutines to spawn when actor
	// starts. Defaults to 1.
	Workers int

	// MaxRetries is the number of retry attempts allowed. If not set,
	// retries are disabled.
	MaxRetries int

	// Backoff strategy to be used for retries. Messages that need to
	// be retried are re-queued to a future time based on the delay
	// returned by backoff. If not set, retries are disabled.
	Backoff Backoff

	// OnFailure if set, will be called when all retries are exhausted
	// and the processing has not succeeded. If not set, such messages
	// will simply be logged and ignored. err argument will have error
	// that occurred in processor in the last retry attempt.
	OnFailure func(msg Message, err error)

	// Logger can be overridden to use custom logger.
	Logger Logger

	// PollInterval to use when non-blocking queues/streams return ErrNoMessage.
	// Defaults to 300ms.
	PollInterval time.Duration
}

Options represents the configuration options for an actor instance.

type Processor

type Processor func(ctx context.Context, msg Message) error

Processor implementations define the logic to be executed by actor on receiving a message from the stream.

type ReadFn

type ReadFn func(ctx context.Context, msg Message) error

ReadFn implementation is called by the message queue to handle a message.

type Stream

type Stream interface {
	// Read should read the next message available in  the stream and
	// call readFn with it. Success/Failure of the readFn invocation
	// should be used as ACK/NACK respectively.
	Read(ctx context.Context, readFn ReadFn) error
}

Stream represents an immutable stream of messages.

Directories

Path Synopsis
cmd
reactor module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL