machine

package module
v2.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2022 License: MIT Imports: 4 Imported by: 0

README

Go PkgGoDev GoDoc Go Report Card Codacy Badge Codacy Badge Version Badge

Machine

Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.


Installation

Add the primary library to your project

  go get github.com/whitaker-io/machine/v2

The main function types are:

// Applicative is a function that is applied to payload and used for transformations
type Applicative[T any] func(d T) T

// Test is a function used in composition of And/Or operations and used to
// filter results down different branches with transformations
type Test[T any] func(d T) (T, error)

// Filter is a function that can be used to filter the payload.
type Filter[T any] func(d T) bool

These are used in the Builder type provided by the Stream type:

// Stream is a representation of a data stream and its associated logic.
//
// The Builder method is the entrypoint into creating the data processing flow.
// All branches of the Stream are required to end in an OutputTo call.
type Stream[T any] interface {
	Start(ctx context.Context, input chan T) error
	Builder() Builder[T]
}

// Builder is the interface provided for creating a data processing stream.
type Builder[T any] interface {
	Then(a Applicative[T]) Builder[T]
	Or(x ...Test[T]) (Builder[T], Builder[T])
	And(x ...Test[T]) (Builder[T], Builder[T])
	Filter(f Filter[T]) (Builder[T], Builder[T])
	Duplicate() (Builder[T], Builder[T])
	Loop(x Filter[T]) (loop, out Builder[T])
	Drop()
	Distribute(Edge[T]) Builder[T]
	OutputTo(x chan T)
}

Distribute is a special method used for fan-out operations. It takes an instance of Edge[T] and can be used most typically to distribute work via a Pub/Sub. The Edge[T] interface is as follows:

// Edge is an interface that is used for transferring data between vertices
type Edge[T any] interface {
	ReceiveOn(ctx context.Context, channel chan T)
	Send(payload T)
}

The Send method is used for data leaving the associated vertex and the ReceiveOn method is used by the following vertex to receive data. The context.Context used is the same as the one used to start the Stream.


You can also setup Telemetry and other options by passing in the Option type

// Option type for holding machine settings.
type Option[T any] struct {
	// FIFO controls the processing order of the payloads
	// If set to true the system will wait for one payload
	// to be processed before starting the next.
	FIFO bool `json:"fifo,omitempty"`
	// BufferSize sets the buffer size on the edge channels between the
	// vertices, this setting can be useful when processing large amounts
	// of data with FIFO turned on.
	BufferSize int `json:"buffer_size,omitempty"`
	// Telemetry provides the ability to enable and configure telemetry
	Telemetry Telemetry[T] `json:"telemetry,omitempty"`
	// PanicHandler is a function that is called when a panic occurs
	PanicHandler func(err error, payload T) `json:"-"`
	// DeepCopyBetweenVerticies controls whether DeepCopy is performed between verticies.
	// This is useful if the functions applied are holding copies of the payload for
	// longer than they process it. DeepCopy must be set
	DeepCopyBetweenVerticies bool `json:"deep_copy_between_vetricies,omitempty"`
	// DeepCopy is a function to preform a deep copy of the Payload
	DeepCopy func(T) T `json:"-"`
}

// Telemetry type for holding telemetry settings.
type Telemetry[T any] interface {
	IncrementPayloadCount(vertexName string)
	IncrementErrorCount(vertexName string)
	Duration(vertexName string, duration time.Duration)
	RecordPayload(vertexName string, payload T)
	RecordError(vertexName string, payload T, err error)
}


🤝 Contributing

Contributions, issues and feature requests are welcome.
Feel free to check issues page if you want to contribute.
Check the contributing guide.

Author

👤 Jonathan Whitaker

Show your support

Please ⭐️ this repository if this project helped you!


License

Machine is provided under the MIT License.

The MIT License (MIT)

Copyright (c) 2020 Jonathan Whitaker

Documentation

Overview

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Applicative

type Applicative[T any] func(d T) T

Applicative is a function that is applied to payload and used for transformations

func (Applicative[T]) Component

func (x Applicative[T]) Component(output chan T) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Builder

type Builder[T any] interface {
	Then(a Applicative[T]) Builder[T]
	Or(x ...Test[T]) (Builder[T], Builder[T])
	And(x ...Test[T]) (Builder[T], Builder[T])
	Filter(f Filter[T]) (Builder[T], Builder[T])
	Duplicate() (Builder[T], Builder[T])
	Loop(x Filter[T]) (loop, out Builder[T])
	Drop()
	Distribute(Edge[T]) Builder[T]
	OutputTo(x chan T)
}

Builder is the interface provided for creating a data processing stream.

type Component

type Component[T any] interface {
	Component(e chan T) Vertex[T]
}

Component is an interface for providing a vertex that can be used to run individual components on the payload.

type Edge

type Edge[T any] interface {
	ReceiveOn(ctx context.Context, channel chan T)
	Send(payload T)
}

Edge is an interface that is used for transferring data between vertices

type Filter

type Filter[T any] func(d T) bool

Filter is a function that can be used to filter the payload.

func (Filter[T]) Component

func (x Filter[T]) Component(left, right chan T, _ *Option[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Option

type Option[T any] struct {
	// FIFO controls the processing order of the payloads
	// If set to true the system will wait for one payload
	// to be processed before starting the next.
	FIFO bool `json:"fifo,omitempty"`
	// BufferSize sets the buffer size on the edge channels between the
	// vertices, this setting can be useful when processing large amounts
	// of data with FIFO turned on.
	BufferSize int `json:"buffer_size,omitempty"`
	// Telemetry provides the ability to enable and configure telemetry
	Telemetry Telemetry[T] `json:"telemetry,omitempty"`
	// PanicHandler is a function that is called when a panic occurs
	PanicHandler func(err error, payload T) `json:"-"`
	// DeepCopyBetweenVerticies controls whether DeepCopy is performed between verticies.
	// This is useful if the functions applied are holding copies of the payload for
	// longer than they process it. DeepCopy must be set
	DeepCopyBetweenVerticies bool `json:"deep_copy_between_vetricies,omitempty"`
	// DeepCopy is a function to preform a deep copy of the Payload
	DeepCopy func(T) T `json:"-"`
}

Option type for holding machine settings.

type Stream

type Stream[T any] interface {
	Start(ctx context.Context, input chan T) error
	Builder() Builder[T]
}

Stream is a representation of a data stream and its associated logic.

The Builder method is the entrypoint into creating the data processing flow. All branches of the Stream are required to end in an OutputTo call.

func New

func New[T any](name string, options *Option[T]) Stream[T]

New is a function for creating a new Stream.

name string option *Option[T]

type Telemetry

type Telemetry[T any] interface {
	IncrementPayloadCount(vertexName string)
	IncrementErrorCount(vertexName string)
	Duration(vertexName string, duration time.Duration)
	RecordPayload(vertexName string, payload T)
	RecordError(vertexName string, payload T, err error)
}

Telemetry type for holding telemetry settings.

type Test

type Test[T any] func(d T) (T, error)

Test is a function used in composition of And/Or operations and used to filter results down different branches with transformations

func (Test[T]) Component

func (x Test[T]) Component(left, right chan T, _ *Option[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Vertex

type Vertex[T any] func(payload T)

Vertex is a type used to process data for a stream.

func (Vertex[T]) Run

func (x Vertex[T]) Run(ctx context.Context, name string, channel chan T, option *Option[T])

Run creates a go func to process the data in the channel until the context is canceled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL