Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // Skip can be passed as argument to the Ack method of Msg to signal // that the message should be skipped. Skip = errors.New("skip message") // Fail can be passed as argument to the Ack method of Msg to signal // that the message should be failed immediately without retries. Fail = errors.New("fail message") // Retry can be returned from a proc implementations when processing // a message failed and should be retried later sometime. Retry = errors.New("retry message") )
Functions ¶
This section is empty.
Types ¶
type Fn ¶ added in v0.3.0
type Fn struct {
// Number of worker threads to launch for processing messages.
// If not set, defaults to 1.
Workers int
// Func is the function to invoke for each message. If not set,
// uses a no-op func.
Func func(ctx context.Context, msg Msg) error
}
Fn implements a concurrent Proc using a custom processor function.
type LineStream ¶ added in v0.1.4
type LineStream struct {
From io.Reader // From is the reader to use.
Offset int // Offset to start at.
Size int // Number of lines (from offset) to stream.
Buffer int // Stream channel buffer size.
// contains filtered or unexported fields
}
LineStream implements a Stream using io.Reader. This implementation scans the reader line-by-line and streams each line as a message. If offset is set, 'offset' number of lines are read and skipped. If Size is set, only 'size' number of lines are read after which the source will return EOF.
func (*LineStream) Err ¶ added in v0.2.0
func (rd *LineStream) Err() error
Err returns the error that caused the source to end.
type Log ¶ added in v0.3.1
type Log func(_ map[string]interface{})
Log implementation provides structured logging facilities for fusion components.
type Msg ¶ added in v0.2.4
type Msg struct {
Key []byte `json:"key"`
Val []byte `json:"val"`
Attribs map[string]string `json:"attribs"`
// Ack will be used to signal an ACK/nACK when message has passed
// through the pipeline. A no-op value must be set when there is
// no need for ack. Ack must be idempotent. If message was handled
// successfully, then Ack will be called without error.
Ack func(err error)
}
Msg represents a message with one or more payloads.
type Proc ¶ added in v0.2.1
type Proc interface {
// Run should spawn the worker threads that consume from 'stream' and
// process messages. Run should block until all workers exit. Proc is
// responsible for acknowledging the message based on success/failure
// in handling. Proc must stop all workers when ctx is cancelled or
// 'stream' is closed.
Run(ctx context.Context, stream <-chan Msg) error
}
Proc represents a processor in the stream pipeline.
type Runner ¶ added in v0.3.0
type Runner struct {
// Proc to use for processing the messages.
Proc Proc
// Stream to read messages from.
Stream Stream
// DrainTime is the timeout for draining the messages from the stream
// channel when the Proc exits pre-maturely. If not set, channel will
// not be drained.
DrainTime time.Duration
// Log to be used by the Runner. If not set, a no-op value will be
// used.
Log Log
}
Runner represents a fusion streaming pipeline. A fusion instance has a stream and a proc for processing it. If the proc is not set, a no-op proc will be used.
type Stream ¶
type Stream interface {
// Out should return a channel to which it independently writes the data
// stream to. Stream is responsible for closing the returned channel once
// the data is exhausted or when the stream worker exits. All goroutines
// spawned by the stream must exit when the given context is cancelled.
Out(ctx context.Context) (<-chan Msg, error)
}
Stream implementation is the source of data in a pipeline.