machine

package module
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2021 License: MIT Imports: 19 Imported by: 13

README ΒΆ

Go PkgGoDev GoDoc Go Report Card Codacy Badge Codacy Badge Gitter chat

Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.

Installation

Add the primary library to your project

  go get -u github.com/whitaker-io/machine

Foundry is a tool used to generate new projects quickly Foundry


Subscription implementations can be found here Subscriptions



Documentation

Docs


Example

Redis Subscription with basic receive -> process -> send Stream


  // logger allows for logs to be transmitted to your log provider
  var logger machine.Logger

  // logStore allows for running a cluster and handles communication
  var logStore machine.LogStore

  // pool is a redigo Pool for a redis cluster to read the stream from
  // see also the Google Pub/Sub, Kafka, and SQS implementations
  var pool *redigo.Pool

  redisStream := redis.New(pool, logger)
  
  // NewPipe creates a pipe in which you can run multiple streams
  // the id is the instance identifier for the cluster
  p := NewPipe(uuid.New().String(), logger, logStore, fiber.Config{
    ReadTimeout: time.Second,
    WriteTimeout: time.Second,
    BodyLimit: 4 * 1024 * 1024,
    DisableKeepalive: true,
  })

  // StreamSubscription takes an instance of machine.Subscription
  // and a time interval in which to read
  // the id here needs to be the same for all the nodes for the clustering to work
  builder := p.StreamSubscription("unique_stream_id", redisStream, 5*time.Millisecond,
    &Option{FIFO: boolP(false)},
    &Option{Injectable: boolP(true)},
    &Option{Metrics: boolP(true)},
    &Option{Span: boolP(false)},
    &Option{BufferSize: intP(0)},
  ).Builder()

  builder.Map("unique_id2", 
      func(m Data) error {
        var err error

        // ...do some processing

        return err
      },
    ).
    Transmit("unique_id3", 
      func(d []Data) error {
        // send a copy of the data somewhere

        return nil
      },
    )

  // Run requires a context, the port to run the fiber.App,
  // and the timeout for graceful shutdown
  if err := p.Run(context.Background(), ":5000", 10 * time.Second); err != nil {
    // Run will return an error in the case that 
    // one of the paths is not terminated (i.e. missing a Transmit)
    panic(err)
  }

🀝 Contributing

Contributions, issues and feature requests are welcome.
Feel free to check issues page if you want to contribute.
Check the contributing guide.

Author

πŸ‘€ Jonathan Whitaker

Show your support

Please ⭐️ this repository if this project helped you!


License

Machine is provided under the MIT License.

The MIT License (MIT)

Copyright (c) 2020 Jonathan Whitaker

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

This section is empty.

Functions ΒΆ

This section is empty.

Types ΒΆ

type Applicative ΒΆ added in v0.3.0

type Applicative func(data Data) error

Applicative is a function that is applied on an individual basis for each Packet in the payload. The data may be modified or checked for correctness. Any resulting error is combined with current errors in the wrapping Packet.

type Builder ΒΆ

type Builder interface {
	Map(id string, a Applicative, options ...*Option) Builder
	FoldLeft(id string, f Fold, options ...*Option) Builder
	FoldRight(id string, f Fold, options ...*Option) Builder
	Fork(id string, f Fork, options ...*Option) (Builder, Builder)
	Loop(id string, x Fork, options ...*Option) (loop LoopBuilder, out Builder)
	Link(id, target string, options ...*Option)
	Transmit(id string, s Sender, options ...*Option)
}

Builder is the interface provided for creating a data processing stream.

type Data ΒΆ added in v0.4.0

type Data typed.Typed

Data wrapper on typed.Typed.

func (*Data) As ΒΆ added in v0.9.11

func (d *Data) As(i interface{}) error

As helper function used for converting the Data into the given struct or map uses github.com/mitchellh/mapstructure under the covers and input must be a pointer to a map or struct

type DebugInfo ΒΆ added in v0.9.13

type DebugInfo struct {
	ID       string    `json:"vertex_id"`
	Snapshot Data      `json:"snapshot"`
	Start    time.Time `json:"start"`
	End      time.Time `json:"end"`
}

DebugInfo holds information when the Option.Debug flag is set to true

type Fold ΒΆ added in v0.5.0

type Fold func(aggregate, next Data) Data

Fold is a function used to combine a payload into a single Packet. It may be used with either a Fold Left or Fold Right operation, which starts at the corresponding side and moves through the payload. The returned instance of Data is used as the aggregate in the subsequent call.

type Fork ΒΆ added in v0.5.0

type Fork func(list []*Packet) (a, b []*Packet)

Fork is a function for splitting the payload into 2 separate paths. Default Forks for duplication and error checking are provided by ForkDuplicate and ForkError respectively.

var (
	// ForkDuplicate is a Fork that creates a deep copy of the
	// payload and sends it down both branches.
	ForkDuplicate Fork = func(payload []*Packet) (a, b []*Packet) {
		payload2 := []*Packet{}
		buf := &bytes.Buffer{}
		enc, dec := gob.NewEncoder(buf), gob.NewDecoder(buf)

		_ = enc.Encode(payload)
		_ = dec.Decode(&payload2)

		for i, packet := range payload {
			payload2[i].span = packet.span
		}

		return payload, payload2
	}

	// ForkError is a Fork that splits machine.Packets based on
	// the presence of an error. Errors are sent down the
	// right side path.
	ForkError Fork = func(payload []*Packet) (s, f []*Packet) {
		s = []*Packet{}
		f = []*Packet{}

		for _, packet := range payload {
			if packet.Error != nil {
				f = append(f, packet)
			} else {
				s = append(s, packet)
			}
		}

		return s, f
	}
)

type ForkRule ΒΆ added in v0.5.0

type ForkRule func(data Data) bool

ForkRule is a function that can be converted to a Fork via the Handler method allowing for Forking based on the contents of the data.

func (ForkRule) Handler ΒΆ added in v0.5.0

func (r ForkRule) Handler(payload []*Packet) (t, f []*Packet)

Handler is a method for turning the ForkRule into an instance of Fork

type HealthInfo ΒΆ added in v0.7.0

type HealthInfo struct {
	StreamID    string    `json:"stream_id"`
	LastPayload time.Time `json:"last_payload"`
	// contains filtered or unexported fields
}

HealthInfo is the type used for providing basic healthcheck information about last start time of payloads

type InjectionCallback ΒΆ added in v0.7.0

type InjectionCallback func(logs ...*Log)

InjectionCallback is a function provided to the LogStore Join method so that the cluster may restart work that has been dropped by one of the workers. Injections will only be processed for vertices that have the Injectable option set to true, which is the default.

type Log ΒΆ added in v0.7.0

type Log struct {
	OwnerID    string    `json:"owner_id"`
	StreamID   string    `json:"stream_id"`
	VertexID   string    `json:"vertex_id"`
	VertexType string    `json:"vertex_type"`
	State      string    `json:"state"`
	Packet     *Packet   `json:"packet"`
	When       time.Time `json:"when"`
}

Log type for holding the data that is recorded from the streams and sent to the LogStore instance

type LogStore ΒΆ added in v0.7.0

type LogStore interface {
	Join(id string, callback InjectionCallback, streamIDs ...string) error
	Write(logs ...*Log)
	Leave(id string) error
}

LogStore is an interface for allowing a distributed cluster of workers It requires 3 methods Join, Write, and Leave.

Join is called during the run method of the Pipe and it is used for accouncing membership to the cluster with an identifier, a callback for injection, and the list of Streams that will be running in the pipe. Injection is how work can be restarted in the system and is the responsibility of the implementation to decide when and how it is reinitiated.

Write is called at the beginning of every vertex and provides the current payload about to be run. The implementation is considered the owner of that payload and may modify the data in special known circumstances if need be.

Leave is called during a graceful termination and any errors are logged.

type Logger ΒΆ added in v0.7.0

type Logger interface {
	Error(...interface{})
	Info(...interface{})
}

Logger is an interface for sending log messages to an outside system.

type LoopBuilder ΒΆ added in v0.9.15

type LoopBuilder interface {
	Map(id string, a Applicative, options ...*Option) LoopBuilder
	FoldLeft(id string, f Fold, options ...*Option) LoopBuilder
	FoldRight(id string, f Fold, options ...*Option) LoopBuilder
	Fork(id string, f Fork, options ...*Option) (LoopBuilder, LoopBuilder)
	Loop(id string, x Fork, options ...*Option) (loop LoopBuilder, out LoopBuilder)
	Transmit(id string, s Sender, options ...*Option)
	Done()
}

LoopBuilder is the interface provided for creating a data processing stream that loops.

type Option ΒΆ added in v0.2.0

type Option struct {
	// DeepCopy uses encoding/gob to create a deep copy of the payload
	// before the processing to ensure concurrent map exceptions cannot
	// happen. Is fairly resource intensive so use with caution.
	// Default: false
	DeepCopy *bool
	// FIFO controls the processing order of the payloads
	// If set to true the system will wait for one payload
	// to be processed before starting the next.
	// Default: false
	FIFO *bool
	// Injectable controls whether the vertex accepts injection calls
	// if set to false the data will be logged and processing will not
	// take place.
	// Default: true
	Injectable *bool
	// BufferSize sets the buffer size on the edge channels between the
	// vertices, this setting can be useful when processing large amounts
	// of data with FIFO turned on.
	// Default: 0
	BufferSize *int
	// Span controls whether opentelemetry spans are created for tracing
	// Packets processed by the system.
	// Default: true
	Span *bool
	// Metrics controls whether opentelemetry metrics are recorded for
	// Packets processed by the system.
	// Default: true
	Metrics *bool
	// TraceID adds __traceID to the data if missing and uses incoming
	// __traceID value as Packet.ID if provided
	// Default: false
	TraceID *bool
	// Debug adds traces of changes to the packet, useful for debugging,
	// but very costly and production use is not advised unless the overhead
	// is acceptable
	// Default: false
	Debug *bool
}

Option type for holding machine settings.

type Packet ΒΆ

type Packet struct {
	ID        string       `json:"id"`
	Data      Data         `json:"data"`
	Error     error        `json:"error"`
	Snapshots []*DebugInfo `json:"snapshots,omitempty"`
	// contains filtered or unexported fields
}

Packet type that holds information traveling through the machine.

func (*Packet) As ΒΆ added in v0.9.11

func (p *Packet) As(i interface{}) error

As helper function used for converting the Packet into the given struct or map uses github.com/mitchellh/mapstructure under the covers and input must be a pointer to a map or struct

type Pipe ΒΆ added in v0.7.0

type Pipe struct {
	// contains filtered or unexported fields
}

Pipe is the representation of the system. It can run multiple Streams and controls the start and stop functionality of the system.

func NewPipe ΒΆ added in v0.7.0

func NewPipe(id string, logger Logger, store LogStore, config ...fiber.Config) *Pipe

NewPipe is a function for creating a new Pipe. If logger or logStore are nil then the accosiated feature will be disabled.

func (*Pipe) Load ΒΆ added in v0.9.0

func (pipe *Pipe) Load(lc *Serialization) error

Load method loads a stream based on github.com/traefik/yaegi

func (*Pipe) Run ΒΆ added in v0.7.0

func (pipe *Pipe) Run(ctx context.Context, port string, gracePeriod time.Duration) error

Run starts the Pipe and subsequent Streams. It requires a context, a port to run an instance of fiber.App which hosts the /health endpoint and any HTTP based streams at /strea/:id, and a gracePeriod for which graceful shutdown can take place.

func (*Pipe) Stream ΒΆ added in v0.8.0

func (pipe *Pipe) Stream(stream Stream) Builder

Stream is a method for adding a generic developer defined Stream. New Streams are created by the appropriately named NewStream function.

func (*Pipe) StreamHTTP ΒΆ added in v0.7.0

func (pipe *Pipe) StreamHTTP(id string, opts ...*Option) Builder

StreamHTTP a method that creates a Stream at the path /stream/:id which is hosted by the Pipe's fiber.App

func (*Pipe) StreamSubscription ΒΆ added in v0.7.0

func (pipe *Pipe) StreamSubscription(id string, sub Subscription, interval time.Duration, opts ...*Option) Builder

StreamSubscription is a method for creating a Stream based on the provided Subscription which has it's Read method called at the end of each interval period.

func (*Pipe) Use ΒΆ added in v0.7.0

func (pipe *Pipe) Use(args ...interface{})

Use Wraps fiber.App.Use

Use registers a middleware route that will match requests with the provided prefix (which is optional and defaults to "/").

app.Use(func(c *fiber.Ctx) error {
   return c.Next()
})
app.Use("/api", func(c *fiber.Ctx) error {
   return c.Next()
})
app.Use("/api", handler, func(c *fiber.Ctx) error {
   return c.Next()
})

This method will match all HTTP verbs: GET, POST, PUT, HEAD etc...

type Retriever ΒΆ added in v0.3.0

type Retriever func(ctx context.Context) chan []Data

Retriever is a function that provides data to a generic Stream must stop when the context receives a done signal.

type Sender ΒΆ added in v0.3.0

type Sender func(payload []Data) error

Sender is a function used to transmit the payload to a different system the returned error is logged, but not specifically acted upon.

type Serialization ΒΆ added in v0.9.0

type Serialization struct {
	// ID unique identifier for the stream.
	ID string `json:"id,omitempty" mapstructure:"id,omitempty"`
	// Type type of stream to create.
	//
	// For root serializations valid values are 'http', 'subscription', or 'stream'.
	//
	// For child serializations valid values are 'map', 'fold_left', 'fold_right', 'fork'
	// 'link', and 'transmit'
	Type string `json:"type,omitempty" mapstructure:"type,omitempty"`
	// Interval is the duration between pulls in a 'subscription' Type. It is only read
	// if the Type is 'subscription'.
	Interval time.Duration `json:"interval,omitempty" mapstructure:"interval,omitempty"`
	// Symbol is the name of the golang symbol that provides the target of the script,
	// this is typically the var/func name for the vertex in the script.
	Symbol string `json:"symbol,omitempty" mapstructure:"symbol,omitempty"`
	// Script is the yaegi script that contains the code for the vertex.
	// Symbols for the stdlib and machine are provided.
	Script string `json:"script,omitempty" mapstructure:"script,omitempty"`
	// Options are a slice of machine.Option https://godoc.org/github.com/whitaker-io/machine#Option
	Options []*Option `json:"options,omitempty" mapstructure:"options,omitempty"`
	// To is a reference ID used for the 'link' type. The value must be the ID of a predecessor
	// of this vertex
	To string `json:"to,omitempty" mapstructure:"to,omitempty"`
	// Next is the child vertex for every type other than Fork.
	Next *Serialization `json:"next,omitempty" mapstructure:"next,omitempty"`
	// Left is the left side child vertex for a Fork. It is only read for Forks.
	Left *Serialization `json:"left,omitempty" mapstructure:"left,omitempty"`
	// Right is the right side child vertex for a Fork. It is only read for Forks.
	Right *Serialization `json:"right,omitempty" mapstructure:"right,omitempty"`
}

Serialization type for holding information about github.com/traefik/yaegi based streams

type Stream ΒΆ added in v0.5.0

type Stream interface {
	ID() string
	Run(ctx context.Context, recorders ...recorder) error
	Inject(ctx context.Context, events map[string][]*Packet)
	Builder() Builder
}

Stream is a representation of a data stream and its associated logic. It may be used individually or hosted by a Pipe. Creating a new Stream is handled by the appropriately named NewStream function.

The Builder method is the entrypoint into creating the data processing flow. All branches of the Stream are required to end in either a Transmit or a Link in order to be considered valid.

func NewStream ΒΆ added in v0.5.0

func NewStream(id string, retriever Retriever, options ...*Option) Stream

NewStream is a function for creating a new Stream. It takes an id, a Retriever function, and a list of Options that can override the defaults and set new defaults for the subsequent vertices in the Stream.

type Subscription ΒΆ added in v0.7.0

type Subscription interface {
	Read(ctx context.Context) []Data
	Close() error
}

Subscription is an interface for creating a pull based stream. It requires 2 methods Read and Close.

Read is called when the interval passes and the resulting payload is sent down the Stream.

Close is called during a graceful termination and any errors are logged.

Directories ΒΆ

Path Synopsis
cmd module
common module
edge
http Module
pubsub Module
telemetry module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL