client

package module
v0.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2020 License: Apache-2.0 Imports: 27 Imported by: 0

README

Raven-worker package

Description

Raven-worker is used as the base of every worker in the Raven framework.
A worker is a building block that can perform a specific task in a streaming data-flow.
A data-stream that is configured from one or multiple workers, is called a flow.
The goal of a flow is to filter, extract, enrich and store streaming data to easily manage all your (streaming) data.

Worker types

Workers come in three different types: extract, transform and load workers. These types are based on the graph theory where each worker represents a node.
Consider any Raven flow a directed acyclic graph.

About Workers

  • Workers need to be as simple as possible, only do their job. Everything else will be handled by Raven.
  • If an error occurs, just panic. Raven will handle monitoring and restarting workers. The Raven Worker base package will use a sequential backoff algorithm for handling incidental errors.
  • If there isn't enough work, for a longer period, just stop. Raven will monitor backlogs and start new workers when necessary.

How to use

The following functions are exposed by the package:

New

This initializes the worker.

For the Raven framework, a worker needs three variables to work with:

  • ravenworker.WithRavenURL to connect to the Raven framework. Multiple values can be used here, as it will use them in a round robin configuration.
  • ravenworker.WithWorkerID to identify itself and get new jobs.
  • ravenworker.WithFlowID to get the right jobs for the flow it belongs to and therefore receive the right events (messages) to process.

The ravenworker.DefaultEnvironment() method can be used for convenience, to load the configuration from environment variables.

Example:

	c, err := ravenworker.New(
        ravenworker.DefaultEnvironment(),
    )

    if err != nil {
        // handle error
    }

The ravenworkworker.CustomEnvironment can be used when environment variables are not available. CustomEnvironment takes three string arguments to set the raven workflow url, flow ID and worker ID.

Example:

	c, err := ravenworker.New(
        ravenworker.Customenvironment("localhost:8023", "568e8bee-aca8-40c2-bfed-504ce103d4b6", "b557f6b3-b436-4635-9ae0-010fed184168"),
    )

    if err != nil {
        // handle error
    }

Note: when specifying the raven workflow url, leave out any scheme (http/https) in the string. Raven-worker uses the capnproto protocol and the function will add the proper scheme for you.

Now you can use the methods:

Consume

When a worker is of type transform or load, use Consume to retrieve the message from the stream.

Example:

    ref, err := c.Consume(context.Background())
    if err != nil {
        // handle error
    }
Get

Get will retrieve the actual message.

Example:

    msg, err := c.Get(ref)
    if err != nil {
        // handle error
    }
Ack

Ack will acknowledge the message and proceeds the flow.

Example:

    msg, err := c.Ack(ref, WithMessage(msg), WithFilter())
    if err != nil {
        // handle error
    }
Produce

When a worker is of type transform or load, use Produce to put the new message or ack the message.
The actual content (payload) is stored in message.Content which takes a byte array. Convenience functions like JsonContent (which encodes the object to json byte array) exists.

Example:

    message := NewMessage()
    message.Content = JsonContent(obj)

    if err := c.Produce(message); err != nil {
        // handle error
    }

Documentation

Index

Constants

View Source
const ITEM_CAPACITY = 10000
View Source
const ITEM_SIZE = 2048

Variables

View Source
var DefaultLogger = NewDefaultLogger()
View Source
var (
	ErrChecksumFailed = errors.New("Checksum failed")
)

Functions

func CopyN added in v0.2.7

func CopyN(w io.Writer, r io.Reader, h io.Writer, size uint64, buffer []byte) error

func NewDefaultLogger added in v0.2.3

func NewDefaultLogger() *defaultLogger

NewDefaultLogger creates a JSON logger which outputs to an http endpoint. provide an empty string as endpoint to log to stdout.

func NewLogUploader added in v0.2.3

func NewLogUploader(ctx context.Context, endpoint string) (*logUploader, error)

func NewRawSegment added in v0.2.7

func NewRawSegment() *rawSegment

Types

type BackOffFunc added in v0.2.2

type BackOffFunc func() backoff.BackOff

type Config

type Config struct {
	// contains filtered or unexported fields
}

type Content added in v0.2.2

type Content []byte

func JsonContent added in v0.2.2

func JsonContent(v interface{}) Content

func StringContent added in v0.2.2

func StringContent(s string) Content

type DefaultWorker added in v0.2.2

type DefaultWorker struct {
	Config
}

func (*DefaultWorker) Close added in v0.2.3

func (w *DefaultWorker) Close() error

type IndexEntry added in v0.2.7

type IndexEntry [64]byte

we could use protobuf here, but we don't want to unmarshal, reflect bytes

func (IndexEntry) Checksum added in v0.2.7

func (ie IndexEntry) Checksum() uint32

func (IndexEntry) EventID added in v0.2.7

func (ie IndexEntry) EventID() uuid.UUID

func (IndexEntry) LSN added in v0.2.7

func (ie IndexEntry) LSN() uint64

func (IndexEntry) Offset added in v0.2.7

func (ie IndexEntry) Offset() uint64

func (*IndexEntry) ReadFrom added in v0.2.7

func (ie *IndexEntry) ReadFrom(r io.Reader) (int, error)

func (*IndexEntry) SetChecksum added in v0.2.7

func (ie *IndexEntry) SetChecksum(checksum uint32)

func (*IndexEntry) SetEventID added in v0.2.7

func (ie *IndexEntry) SetEventID(eventID uuid.UUID)

func (*IndexEntry) SetLSN added in v0.2.7

func (ie *IndexEntry) SetLSN(lsn uint64)

func (*IndexEntry) SetOffset added in v0.2.7

func (ie *IndexEntry) SetOffset(offset uint64)

func (*IndexEntry) SetSize added in v0.2.7

func (ie *IndexEntry) SetSize(size uint64)

func (*IndexEntry) SetTimestamp added in v0.2.7

func (ie *IndexEntry) SetTimestamp(value uint64)

func (IndexEntry) Size added in v0.2.7

func (ie IndexEntry) Size() uint64

add compression type

func (IndexEntry) Timestamp added in v0.2.7

func (ie IndexEntry) Timestamp() uint64

func (IndexEntry) WriteTo added in v0.2.7

func (ie IndexEntry) WriteTo(w io.Writer) (int, error)

type LogCloser added in v0.2.3

type LogCloser interface {
	Close() error
}

type Logger added in v0.2.2

type Logger interface {
	Debugf(msg string, args ...interface{})
	Infof(msg string, args ...interface{})
	Errorf(msg string, args ...interface{})
	Fatalf(msg string, args ...interface{})
}

type Message

type Message struct {
	MetaData []Metadata

	Content Content
}

func NewMessage added in v0.2.2

func NewMessage() Message

NewMessage will return a new empty Message struct

message := NewMessage()

func (Message) MarshalJSON added in v0.2.2

func (r Message) MarshalJSON() ([]byte, error)

func (*Message) UnmarshalJSON added in v0.2.2

func (r *Message) UnmarshalJSON(data []byte) error

type Metadata added in v0.2.2

type Metadata struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

type NullLogCloser added in v0.2.3

type NullLogCloser struct {
}

func (NullLogCloser) Close added in v0.2.3

func (NullLogCloser) Close() error

type OptionFunc added in v0.2.2

type OptionFunc func(*Config) error

func DefaultEnvironment added in v0.2.2

func DefaultEnvironment() OptionFunc

DefaultEnvironment returns the optionFunc that expects 'RAVEN_URL', 'FLOW_ID' and 'WORKER_ID' as environmental variables 'CONSUME_TIMEOUT' will override the default if set. DefaultLogger is set as the logger.

func WithBackOff added in v0.2.2

func WithBackOff(fn BackOffFunc) OptionFunc

func WithCloser added in v0.2.3

func WithCloser(closer io.Closer) OptionFunc

WithCloser adds an 'io.Closer' to the list.

func WithConsumeTimeout added in v0.2.3

func WithConsumeTimeout(s string) (OptionFunc, error)

WithConsumeTimeout time frame to wait for a new message. not setting this equals wait forever.

func WithLogger added in v0.2.2

func WithLogger(l Logger) (OptionFunc, error)

func WithMaxIntake added in v0.2.3

func WithMaxIntake(num string) OptionFunc

WithMaxIntake ingest messages until maxIntake is reached.

func WithName added in v0.2.7

func WithName(s string) OptionFunc

func WithRavenURL added in v0.2.2

func WithRavenURL(urlStr string) (OptionFunc, error)

func WithSubscription added in v0.2.7

func WithSubscription(s string) OptionFunc

func WithTopicIn added in v0.2.7

func WithTopicIn(s string) OptionFunc

func WithTopicOut added in v0.2.7

func WithTopicOut(s string) OptionFunc

type Producer added in v0.2.7

type Producer interface {
	Produce(context.Context, Segment) error
	Close() error
}

func NewProducer added in v0.2.7

func NewProducer(opts ...OptionFunc) (Producer, error)

type Reference

type Reference struct {
	AckID   uuid.UUID
	EventID uuid.UUID
}

type Segment added in v0.2.7

type Segment interface {
	SegmentReader
	SegmentWriter
	SegmentCloser
}

type SegmentCloser added in v0.2.7

type SegmentCloser interface {
	Close()
}

type SegmentHeaderer added in v0.2.7

type SegmentHeaderer interface {
	Header() IndexEntry
	SetHeader(IndexEntry)
}

are there other options? don't really like this

type SegmentReader added in v0.2.7

type SegmentReader interface {
	Read([]byte) (int, error)
}

type SegmentWriter added in v0.2.7

type SegmentWriter interface {
	Write([]byte) (int, error)
}

type Subscriber added in v0.2.7

type Subscriber interface {
	Consume(context.Context, Segment) error
	Ack(Segment) error
	Close() error
}

todo: reconnect on fail, continue deliver on non acked. have something like a adjust or delta for acks? as it will build a new conenction or should it sync within the connection, current segmentid?

func NewSubscriber added in v0.2.7

func NewSubscriber(opts ...OptionFunc) (Subscriber, error)

type Worker added in v0.2.2

type Worker interface {
}

func New added in v0.2.2

func New(opts ...OptionFunc) (Worker, error)

New returns a new configured Raven Worker client

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL