package module
Version: v0.3.2 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2021 License: MIT Imports: 8 Imported by: 2


💥 Fusion

PkgGoDevGo ReportCard

Fusion is a tiny stream processing library written in Go.

See reactor for a stream processing tool built using fusion.


  • Simple & lightweight.
  • Highly Composable. Compose Proc implementations in a way that is similar to middleware pattern to get concurrent processing, automatic retries etc.
  • Use for simple single node or more complex distributed setup by using different fusion.Stream and fusion.Proc implementations.
  • Zero dependencies.


A simple line counter implementation:

package main

import (

func main() {
	count := int64(0)
	runner := fusion.Runner{
      Stream: &fusion.LineStream{From: os.Stdin},
      Proc: &fusion.Fn{
      	Workers: 5,
      	Func: func(ctx context.Context, msg fusion.Msg) error {
      	  atomic.AddInt64(&count, 1)
      	  return nil
	_ = runner.Run(context.Background())
	fmt.Printf("Count=%d\n", count)




This section is empty.


View Source
var (
	// Skip can be passed as argument to the Ack method of Msg to signal
	// that the message should be skipped.
	Skip = errors.New("skip message")

	// Fail can be passed as argument to the Ack method of Msg to signal
	// that the message should be failed immediately without retries.
	Fail = errors.New("fail message")

	// Retry can be returned from a proc implementations when processing
	// a message failed and should be retried later sometime.
	Retry = errors.New("retry message")


This section is empty.


type Fn added in v0.3.0

type Fn struct {
	// Number of worker threads to launch for processing messages.
	// If not set, defaults to 1.
	Workers int

	// Func is the function to invoke for each message. If not set,
	// uses a no-op func.
	Func func(ctx context.Context, msg Msg) error

Fn implements a concurrent Proc using a custom processor function.

func (*Fn) Run added in v0.3.0

func (fn *Fn) Run(ctx context.Context, stream <-chan Msg) error

Run spawns the configured number of worker threads.

type LineStream added in v0.1.4

type LineStream struct {
	From   io.Reader // From is the reader to use.
	Offset int       // Offset to start at.
	Size   int       // Number of lines (from offset) to stream.
	Buffer int       // Stream channel buffer size.
	// contains filtered or unexported fields

LineStream implements a Stream using io.Reader. This implementation scans the reader line-by-line and streams each line as a message. If offset is set, 'offset' number of lines are read and skipped. If Size is set, only 'size' number of lines are read after which the source will return EOF.

func (*LineStream) Err added in v0.2.0

func (rd *LineStream) Err() error

Err returns the error that caused the source to end.

func (*LineStream) Out added in v0.3.0

func (rd *LineStream) Out(ctx context.Context) (<-chan Msg, error)

Out sets up the source channel and sets up goroutines for writing to it.

type Log added in v0.3.1

type Log func(_ map[string]interface{})

Log implementation provides structured logging facilities for fusion components.

func LogFrom added in v0.3.1

func LogFrom(ctx context.Context) Log

LogFrom extracts log function set by fusion Runner from the context.

type Msg added in v0.2.4

type Msg struct {
	Key     []byte            `json:"key"`
	Val     []byte            `json:"val"`
	Attribs map[string]string `json:"attribs"`

	// Ack will be used to signal an ACK/nACK when message has passed
	// through the pipeline. A no-op value must be set when there is
	// no need for ack. Ack must be idempotent. If message was handled
	// successfully, then Ack will be called without error.
	Ack func(err error)

Msg represents a message with one or more payloads.

func (*Msg) Clone added in v0.2.4

func (msg *Msg) Clone() Msg

Clone returns a clone of the original message. Ack function will be set to no-op in the clone.

type Proc added in v0.2.1

type Proc interface {
	// Run should spawn the worker threads that consume from 'stream' and
	// process messages. Run should block until all workers exit. Proc is
	// responsible for acknowledging the message based on success/failure
	// in handling. Proc must stop all workers when ctx is cancelled or
	// 'stream' is closed.
	Run(ctx context.Context, stream <-chan Msg) error

Proc represents a processor in the stream pipeline.

type ProcFn added in v0.3.1

type ProcFn func(ctx context.Context, stream <-chan Msg) error

ProcFn implements Proc using a simple Go function.

func (ProcFn) Run added in v0.3.1

func (pf ProcFn) Run(ctx context.Context, stream <-chan Msg) error

Run dispatches the msg to the wrapped function.

type Runner added in v0.3.0

type Runner struct {
	// Proc to use for processing the messages.
	Proc Proc

	// Stream to read messages from.
	Stream Stream

	// DrainTime is the timeout for draining the messages from the stream
	// channel when the Proc exits pre-maturely. If not set, channel will
	// not be drained.
	DrainTime time.Duration

	// Log to be used by the Runner. If not set, a no-op value will be
	// used.
	Log Log

Runner represents a fusion streaming pipeline. A fusion instance has a stream and a proc for processing it. If the proc is not set, a no-op proc will be used.

func (Runner) Run added in v0.3.0

func (fu Runner) Run(ctx context.Context) error

Run spawns all the worker goroutines and blocks until all of them exit. Worker threads exit when context is cancelled or when source closes. It returns any error that was returned from the source.

type Stream

type Stream interface {
	// Out should return a channel to which it independently writes the data
	// stream to. Stream is responsible for closing the returned channel once
	// the data is exhausted or when the stream worker exits. All goroutines
	// spawned by the stream must exit when the given context is cancelled.
	Out(ctx context.Context) (<-chan Msg, error)

Stream implementation is the source of data in a pipeline.

type StreamFn added in v0.3.0

type StreamFn func(ctx context.Context) (*Msg, error)

StreamFn implements a source using a Go function value.

func (StreamFn) Out added in v0.3.0

func (sf StreamFn) Out(ctx context.Context) (<-chan Msg, error)

Out launches a goroutine that continuously calls the wrapped function and writes the return message to the channel. Stops when ctx is cancelled or the function returns an error.


Path Synopsis
reactor module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL