machine

package module
v1.0.0-RC5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2021 License: MIT Imports: 21 Imported by: 13

README

Go PkgGoDev GoDoc Go Report Card Codacy Badge Codacy Badge Version Badge

Machine

Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.

It supports opentelemetry spans and metrics out of the box

It also supports building dynamic pipelines using

Components is a repository of different vertex and plugin implementations


Installation

Add the primary library to your project

  go get -u github.com/whitaker-io/machine

Data is a library for getting and setting values in a map[string]interface{}


Documentation

Gopher

Docs


Example

Basic receive -> process -> send Flow

  stream := NewStream("unique_id1", 
    func(c context.Context) chan []Data {
      channel := make(chan []Data)
    
      // setup channel to collect data as long as 
      // the context has not completed

      return channel
    },
    &Option{FIFO: boolP(false)},
    &Option{Metrics: boolP(true)},
    &Option{Span: boolP(false)},
  )

  stream.Builder().Map("unique_id2", 
      func(m Data) error {
        var err error

        // ...do some processing

        return err
      },
    ).Publish("publish_left_id", publishFN(func(d []data.Data) error {
      // send the data somewhere

      return nil
    }),
  )

  if err := stream.Run(context.Background()); err != nil {
    // Run will return an error in the case that 
    // one of the paths is not terminated (i.e. missing a Publish)
    panic(err)
  }

🤝 Contributing

Contributions, issues and feature requests are welcome.
Feel free to check issues page if you want to contribute.
Check the contributing guide.

Author

👤 Jonathan Whitaker

Show your support

Please ⭐️ this repository if this project helped you!


License

Machine is provided under the MIT License.

The MIT License (MIT)

Copyright (c) 2020 Jonathan Whitaker

Documentation

Overview

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterPluginProvider added in v0.11.0

func RegisterPluginProvider(name string, p PluginProvider)

RegisterPluginProvider function for registering a PluginProvider to be used for loading VertexProviders

Types

type Applicative added in v0.3.0

type Applicative func(d data.Data) data.Data

Applicative is a function that is applied on an individual basis for each Packet in the payload. The resulting data replaces the old data

type Builder

type Builder interface {
	Map(id string, a Applicative) Builder
	MapPlugin(v *VertexSerialization) (Builder, error)
	Window(id string, x Window) Builder
	WindowPlugin(v *VertexSerialization) (Builder, error)
	Sort(id string, x Comparator) Builder
	SortPlugin(v *VertexSerialization) (Builder, error)
	Remove(id string, x Remover) Builder
	RemovePlugin(v *VertexSerialization) (Builder, error)
	FoldLeft(id string, f Fold) Builder
	FoldLeftPlugin(v *VertexSerialization) (Builder, error)
	FoldRight(id string, f Fold) Builder
	FoldRightPlugin(v *VertexSerialization) (Builder, error)
	Fork(id string, f Fork) (Builder, Builder)
	ForkPlugin(v *VertexSerialization) (left, right Builder, err error)
	Loop(id string, x Fork) (loop, out Builder)
	LoopPlugin(v *VertexSerialization) (loop, out Builder, err error)
	Publish(id string, s Publisher)
	PublishPlugin(v *VertexSerialization) error
	// contains filtered or unexported methods
}

Builder is the interface provided for creating a data processing stream.

type Comparator added in v0.16.5

type Comparator func(a data.Data, b data.Data) int

Comparator is a function to compare 2 data.Data's

type Edge added in v0.17.0

type Edge interface {
	Send(ctx context.Context, channel chan []*Packet)
	Next(payload ...*Packet)
}

Edge is an inteface that is used for transferring data between vertices

type EdgeProvider added in v0.17.0

type EdgeProvider interface {
	New(ctx context.Context, id string, options *Option) Edge
}

EdgeProvider is an interface that is used for providing new instances of the Edge interface given the *Option set in the Stream

type Error added in v0.16.0

type Error struct {
	Err        error     `json:"err"`
	StreamID   string    `json:"stream_id"`
	VertexID   string    `json:"vertex_id"`
	VertexType string    `json:"vertex_type"`
	Packets    []*Packet `json:"payload"`
	Time       time.Time `json:"time"`
}

Error type for wrapping errors coming from the Stream

func (*Error) Error added in v0.16.0

func (e *Error) Error() string

type Fold added in v0.5.0

type Fold func(aggregate, next data.Data) data.Data

Fold is a function used to combine a payload into a single Packet. It may be used with either a Fold Left or Fold Right operation, which starts at the corresponding side and moves through the payload. The returned instance of data.Data is used as the aggregate in the subsequent call.

type Fork added in v0.5.0

type Fork func(list []*Packet) (a, b []*Packet)

Fork is a function for splitting the payload into 2 separate paths. Default Forks for duplication and error checking are provided by ForkDuplicate and ForkError respectively.

var (
	// ForkDuplicate is a Fork that creates a deep copy of the
	// payload and sends it down both branches.
	ForkDuplicate Fork = func(payload []*Packet) (a, b []*Packet) {
		return payload, deepCopyPayload(payload)
	}

	// ForkError is a Fork that splits machine.Packets based on
	// the presence of an error. Errors are sent down the
	// right side path.
	ForkError Fork = func(payload []*Packet) (s, f []*Packet) {
		s = []*Packet{}
		f = []*Packet{}

		for _, packet := range payload {
			if len(packet.Errors) > 0 {
				f = append(f, packet)
			} else {
				s = append(s, packet)
			}
		}

		return s, f
	}
)

type ForkRule added in v0.5.0

type ForkRule func(d data.Data) bool

ForkRule is a function that can be converted to a Fork via the Handler method allowing for Forking based on the contents of the data.

func (ForkRule) Handler added in v0.5.0

func (r ForkRule) Handler(payload []*Packet) (t, f []*Packet)

Handler is a method for turning the ForkRule into an instance of Fork

type HTTPStream added in v0.16.0

type HTTPStream interface {
	Stream
	Handler() fiber.Handler
	InjectionHandlers() map[string]fiber.Handler
}

HTTPStream is a Stream that also provides a fiber.Handler for receiving data

func NewHTTPStream added in v0.16.0

func NewHTTPStream(id string, opts ...*Option) HTTPStream

NewHTTPStream a method that creates a Stream which takes in data through a fiber.Handler

func NewHTTPStreamPlugin

func NewHTTPStreamPlugin(v *VertexSerialization) (HTTPStream, error)

NewHTTPStreamPlugin a method that creates a Stream which takes in data through a fiber.Handler

func NewWebsocketStream added in v0.16.0

func NewWebsocketStream(id string, opts ...*Option) HTTPStream

NewWebsocketStream a method that creates a Stream which takes in data through a fiber.Handler that runs a websocket

func NewWebsocketStreamPlugin

func NewWebsocketStreamPlugin(v *VertexSerialization) (HTTPStream, error)

NewWebsocketStreamPlugin a method that creates a Stream which takes in data through a fiber.Handler that runs a websocket

type Option added in v0.2.0

type Option struct {
	// DeepCopy uses encoding/gob to create a deep copy of the payload
	// before the processing to ensure concurrent map exceptions cannot
	// happen. Is fairly resource intensive so use with caution.
	// Default: false
	DeepCopy *bool `json:"deep_copy,omitempty" mapstructure:"deep_copy,omitempty"`
	// FIFO controls the processing order of the payloads
	// If set to true the system will wait for one payload
	// to be processed before starting the next.
	// Default: false
	FIFO *bool `json:"fifo,omitempty" mapstructure:"fifo,omitempty"`
	// BufferSize sets the buffer size on the edge channels between the
	// vertices, this setting can be useful when processing large amounts
	// of data with FIFO turned on.
	// Default: 0
	BufferSize *int `json:"buffer_size,omitempty" mapstructure:"buffer_size,omitempty"`
	// Span controls whether opentelemetry spans are created for tracing
	// Packets processed by the system.
	// Default: true
	Span *bool `json:"spans_enabled,omitempty" mapstructure:"spans_enabled,omitempty"`
	// Metrics controls whether opentelemetry metrics are recorded for
	// Packets processed by the system.
	// Default: true
	Metrics *bool `json:"metrics_enabled,omitempty" mapstructure:"metrics_enabled,omitempty"`
	// Provider determines the edge type to be used, logic for what type of edge
	// for a given id is required if not using homogeneous edges
	// Default: nil
	Provider EdgeProvider `json:"-" mapstructure:"-"`
	// Validators are used to ensure the incoming Data is compliant
	// they are run at the start of the stream before creation of Packets
	// Default: nil
	Validators map[string]ForkRule `json:"-" mapstructure:"-"`
}

Option type for holding machine settings.

type Packet

type Packet struct {
	ID     string           `json:"id"`
	Data   data.Data        `json:"data"`
	Errors map[string]error `json:"errors"`
}

Packet type that holds information traveling through the machine.

type PluginProvider added in v0.11.0

type PluginProvider func(attributes map[string]interface{}) (interface{}, error)

PluginProvider is a function loading plugins must return one of the following types:

Subscription
Retriever
Applicative
Comparator
Remover
Fold
Fork
Publisher

type Publisher added in v0.12.0

type Publisher interface {
	Send([]data.Data) error
}

Publisher is an interface for sending data out of the Stream

type Remover added in v0.16.5

type Remover func(index int, d data.Data) bool

Remover func that is used to remove Data based on a true result

type Retriever added in v0.3.0

type Retriever func(ctx context.Context) chan []data.Data

Retriever is a function that provides data to a generic Stream must stop when the context receives a done signal.

type Stream added in v0.5.0

type Stream interface {
	ID() string
	Run(ctx context.Context) error
	Inject(id string, payload ...*Packet)
	VertexIDs() []string
	Builder() Builder
	Errors() chan error
}

Stream is a representation of a data stream and its associated logic. Creating a new Stream is handled by the appropriately named NewStream function.

The Builder method is the entrypoint into creating the data processing flow. All branches of the Stream are required to end in either a Publish or a Link in order to be considered valid.

func NewStream added in v0.5.0

func NewStream(id string, retriever Retriever, options ...*Option) Stream

NewStream is a function for creating a new Stream. It takes an id, a Retriever function, and a list of Options that can override the defaults and set new defaults for the subsequent vertices in the Stream.

func NewStreamPlugin

func NewStreamPlugin(v *VertexSerialization) (Stream, error)

NewStreamPlugin is a function for creating a new Stream. It takes an id, a Retriever function, and a list of Options that can override the defaults and set new defaults for the subsequent vertices in the Stream.

func NewSubscriptionStream added in v0.16.0

func NewSubscriptionStream(id string, sub Subscription, interval time.Duration, opts ...*Option) Stream

NewSubscriptionStream creates a Stream from the provider Subscription and pulls data continuously after an interval amount of time

func NewSubscriptionStreamPlugin

func NewSubscriptionStreamPlugin(v *VertexSerialization) (Stream, error)

NewSubscriptionStreamPlugin creates a Stream from the provider Subscription and pulls data continuously after an interval amount of time

type Subscription added in v0.7.0

type Subscription interface {
	Read(ctx context.Context) []data.Data
	Close() error
}

Subscription is an interface for creating a pull based stream. It requires 2 methods Read and Close.

Read is called when the interval passes and the resulting payload is sent down the Stream.

Close is called during a graceful termination and any errors are logged.

type VertexSerialization added in v0.11.0

type VertexSerialization struct {
	// ID unique identifier for the stream.
	ID string `json:"id,omitempty" mapstructure:"id,omitempty"`

	// Type is the name of the PluginProvider to use.
	Provider string `json:"provider,omitempty" mapstructure:"provider,omitempty"`

	// Attributes are a map[string]interface{} of properties to be used with the PluginProvider.
	Attributes map[string]interface{} `json:"attributes,omitempty" mapstructure:"attributes,omitempty"`
	// contains filtered or unexported fields
}

VertexSerialization config based definition for a stream vertex

func (*VertexSerialization) MarshalJSON

func (v *VertexSerialization) MarshalJSON() ([]byte, error)

MarshalJSON implementation to marshal json

func (*VertexSerialization) MarshalYAML

func (v *VertexSerialization) MarshalYAML() (interface{}, error)

MarshalYAML implementation to marshal yaml

func (*VertexSerialization) UnmarshalJSON

func (v *VertexSerialization) UnmarshalJSON(bytez []byte) error

UnmarshalJSON implementation to unmarshal json

func (*VertexSerialization) UnmarshalYAML

func (v *VertexSerialization) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implementation to unmarshal yaml

type Window added in v1.0.0

type Window func(list ...*Packet) []*Packet

Window is a function that is applied to the entire payload. The resulting data replaces the old data

Directories

Path Synopsis
cmd module
common module
edge
http Module
pubsub Module
telemetry module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL