machine

package module
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2023 License: MIT Imports: 4 Imported by: 0

README

Go PkgGoDev GoDoc Go Report Card Codacy Badge Codacy Badge Version Badge

Machine

Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.


Installation

Add the primary library to your project

  go get github.com/whitaker-io/machine/v2

The main function types are:

// Applicative is a function that is applied to payload and used for transformations
type Applicative[T any] func(d T) T

// Test is a function used in composition of And/Or operations and used to
// filter results down different branches with transformations
type Test[T any] func(d T) (T, error)

// Filter is a function that can be used to filter the payload.
type Filter[T any] func(d T) bool

// Transform is a function used by the Y Combinator to perform a recursion
// on the payload.
// Example:
// func(f Applicative[int]) Applicative[int] {
// 	 return func(x int) int {
// 		 if x < 1 {
// 			 return 1
// 		 } else {
// 			 return x * f(x-1)
// 		 }
// 	 }
// }
type Transform[T any] func(d Applicative[T]) Applicative[T]

These are used in the Builder type provided by the Stream type:

// Stream is a representation of a data stream and its associated logic.
//
// The Builder method is the entrypoint into creating the data processing flow.
// All branches of the Stream are required to end in an OutputTo call.
type Stream[T any] interface {
	Start(ctx context.Context, input chan T) error
	Builder() Builder[T]
}
// Builder is the interface provided for creating a data processing stream.
type Builder[T any] interface {
	// Then apply a mutation to each individual element of the payload.
	Then(a Applicative[T]) Builder[T]
	// Y applies a recursive function to the payload through a Y Combinator.
	Y(x Transform[T]) Builder[T]
	// Or runs all of the functions until one succeeds or sends the payload to the right branch
	Or(x ...Test[T]) (Builder[T], Builder[T])
	// And runs all of the functions and if one doesnt succeed sends the payload to the right branch
	And(x ...Test[T]) (Builder[T], Builder[T])
	// Filter splits the data into multiple stream branches
	Filter(f Filter[T]) (Builder[T], Builder[T])
	// When applies a series of Filters to the payload and returns a list of Builders
	// the last one being for any unmatched payloads.
	When(fns ...Filter[T]) []Builder[T]
	// Duplicate splits the data into multiple stream branches
	Duplicate() (Builder[T], Builder[T])
	// Loop creates a loop in the stream based on the filter
	Loop(x Filter[T]) (loop, out Builder[T])
	// Drop terminates the data from further processing without passing it on
	Drop()
	// Distribute is a function used for fanout
	Distribute(Edge[T]) Builder[T]
	// OutputTo caps the builder and sends the output to the provided channel
	OutputTo(x chan T)
}

Distribute is a special method used for fan-out operations. It takes an instance of Edge[T] and can be used most typically to distribute work via a Pub/Sub. The Edge[T] interface is as follows:

// Edge is an interface that is used for transferring data between vertices
type Edge[T any] interface {
	ReceiveOn(ctx context.Context, channel chan T)
	Send(payload T)
}

The Send method is used for data leaving the associated vertex and the ReceiveOn method is used by the following vertex to receive data. The context.Context used is the same as the one used to start the Stream.

An example using google pubsub can be found in the edge/pubsub directory


You can also setup Telemetry and other options by passing in the Option type

// Option type for holding machine settings.
type Option[T any] struct {
	// FIFO controls the processing order of the payloads
	// If set to true the system will wait for one payload
	// to be processed before starting the next.
	FIFO bool `json:"fifo,omitempty"`
	// BufferSize sets the buffer size on the edge channels between the
	// vertices, this setting can be useful when processing large amounts
	// of data with FIFO turned on.
	BufferSize int `json:"buffer_size,omitempty"`
	// Telemetry provides the ability to enable and configure telemetry
	Telemetry Telemetry[T] `json:"telemetry,omitempty"`
	// PanicHandler is a function that is called when a panic occurs
	PanicHandler func(err error, payload T) `json:"-"`
	// DeepCopyBetweenVerticies controls whether DeepCopy is performed between verticies.
	// This is useful if the functions applied are holding copies of the payload for
	// longer than they process it. DeepCopy must be set
	DeepCopyBetweenVerticies bool `json:"deep_copy_between_vetricies,omitempty"`
	// DeepCopy is a function to preform a deep copy of the Payload
	DeepCopy func(T) T `json:"-"`
}

// Telemetry type for holding telemetry settings.
type Telemetry[T any] interface {
	IncrementPayloadCount(vertexName string)
	IncrementErrorCount(vertexName string)
	Duration(vertexName string, duration time.Duration)
	RecordPayload(vertexName string, payload T)
	RecordError(vertexName string, payload T, err error)
}


🤝 Contributing

Contributions, issues and feature requests are welcome.
Feel free to check issues page if you want to contribute.
Check the contributing guide.

Author

👤 Jonathan Whitaker

Show your support

Please ⭐️ this repository if this project helped you!


License

Machine is provided under the MIT License.

The MIT License (MIT)

Copyright (c) 2020 Jonathan Whitaker

Documentation

Overview

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.

Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Applicative

type Applicative[T any] func(d T) T

Applicative is a function that is applied to payload and used for transformations

func (Applicative[T]) Component

func (x Applicative[T]) Component(output chan T) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Builder

type Builder[T any] interface {
	// Then apply a mutation to each individual element of the payload.
	Then(a Applicative[T]) Builder[T]
	// Y applies a recursive function to the payload through a Y Combinator.
	Y(x Transform[T]) Builder[T]
	// Or runs all of the functions until one succeeds or sends the payload to the right branch
	Or(x ...Test[T]) (Builder[T], Builder[T])
	// And runs all of the functions and if one doesnt succeed sends the payload to the right branch
	And(x ...Test[T]) (Builder[T], Builder[T])
	// Filter splits the data into multiple stream branches
	Filter(f Filter[T]) (Builder[T], Builder[T])
	// When applies a series of Filters to the payload and returns a list of Builders
	// the last one being for any unmatched payloads.
	When(fns ...Filter[T]) []Builder[T]
	// Duplicate splits the data into multiple stream branches
	Duplicate() (Builder[T], Builder[T])
	// Loop creates a loop in the stream based on the filter
	Loop(x Filter[T]) (loop, out Builder[T])
	// Drop terminates the data from further processing without passing it on
	Drop()
	// Distribute is a function used for fanout
	Distribute(Edge[T]) Builder[T]
	// OutputTo caps the builder and sends the output to the provided channel
	OutputTo(x chan T)
}

Builder is the interface provided for creating a data processing stream.

type Component

type Component[T any] interface {
	Component(e chan T) Vertex[T]
}

Component is an interface for providing a vertex that can be used to run individual components on the payload.

type Edge

type Edge[T any] interface {
	ReceiveOn(ctx context.Context, channel chan T)
	Send(payload T)
}

Edge is an interface that is used for transferring data between vertices

type Filter

type Filter[T any] func(d T) bool

Filter is a function that can be used to filter the payload.

func (Filter[T]) Component

func (x Filter[T]) Component(left, right chan T, _ *Option[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Option

type Option[T any] struct {
	// FIFO controls the processing order of the payloads
	// If set to true the system will wait for one payload
	// to be processed before starting the next.
	FIFO bool `json:"fifo,omitempty"`
	// BufferSize sets the buffer size on the edge channels between the
	// vertices, this setting can be useful when processing large amounts
	// of data with FIFO turned on.
	BufferSize int `json:"buffer_size,omitempty"`
	// Telemetry provides the ability to enable and configure telemetry
	Telemetry Telemetry[T] `json:"telemetry,omitempty"`
	// PanicHandler is a function that is called when a panic occurs
	PanicHandler func(err error, payload T) `json:"-"`
	// DeepCopyBetweenVerticies controls whether DeepCopy is performed between verticies.
	// This is useful if the functions applied are holding copies of the payload for
	// longer than they process it. DeepCopy must be set
	DeepCopyBetweenVerticies bool `json:"deep_copy_between_vetricies,omitempty"`
	// DeepCopy is a function to preform a deep copy of the Payload
	DeepCopy func(T) T `json:"-"`
}

Option type for holding machine settings.

type Stream

type Stream[T any] interface {
	Start(ctx context.Context, input chan T) error
	Builder() Builder[T]
}

Stream is a representation of a data stream and its associated logic.

The Builder method is the entrypoint into creating the data processing flow. All branches of the Stream are required to end in an OutputTo call.

func New

func New[T any](name string, options *Option[T]) Stream[T]

New is a function for creating a new Stream.

name string option *Option[T]

type Telemetry

type Telemetry[T any] interface {
	IncrementPayloadCount(vertexName string)
	IncrementErrorCount(vertexName string)
	Duration(vertexName string, duration time.Duration)
	RecordPayload(vertexName string, payload T)
	RecordError(vertexName string, payload T, err error)
}

Telemetry type for holding telemetry settings.

type Test

type Test[T any] func(d T) (T, error)

Test is a function used in composition of And/Or operations and used to filter results down different branches with transformations

func (Test[T]) Component

func (x Test[T]) Component(left, right chan T, _ *Option[T]) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Transform added in v2.0.5

type Transform[T any] func(d Applicative[T]) Applicative[T]

Transform is a function used by the Y Combinator to perform a recursion on the payload. Example:

func(f Applicative[int]) Applicative[int] {
	 return func(x int) int {
		 if x <= 0 {
			 return 1
		 } else {
			 return x * f(x-1)
		 }
	 }
}

func (Transform[T]) Component added in v2.0.5

func (x Transform[T]) Component(output chan T) Vertex[T]

Component is a function for providing a vertex that can be used to run individual components on the payload.

type Vertex

type Vertex[T any] func(payload T)

Vertex is a type used to process data for a stream.

func (Vertex[T]) Run

func (x Vertex[T]) Run(ctx context.Context, name string, channel chan T, option *Option[T])

Run creates a go func to process the data in the channel until the context is canceled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL