Documentation ¶
Overview ¶
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Index ¶
- func RegisterPluginProvider(name string, p PluginProvider)
- type Applicative
- type Builder
- type Comparator
- type Edge
- type EdgeProvider
- type Error
- type Fold
- type Fork
- type ForkRule
- type HTTPStream
- type Option
- type Packet
- type PluginProvider
- type Publisher
- type Remover
- type Retriever
- type Stream
- func NewStream(id string, retriever Retriever, options ...*Option) Stream
- func NewStreamPlugin(v *VertexSerialization) (Stream, error)
- func NewSubscriptionStream(id string, sub Subscription, interval time.Duration, opts ...*Option) Stream
- func NewSubscriptionStreamPlugin(v *VertexSerialization) (Stream, error)
- type Subscription
- type VertexSerialization
- type Window
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RegisterPluginProvider ¶ added in v0.11.0
func RegisterPluginProvider(name string, p PluginProvider)
RegisterPluginProvider function for registering a PluginProvider to be used for loading VertexProviders
Types ¶
type Applicative ¶ added in v0.3.0
Applicative is a function that is applied on an individual basis for each Packet in the payload. The resulting data replaces the old data
type Builder ¶
type Builder interface { Map(id string, a Applicative) Builder MapPlugin(v *VertexSerialization) (Builder, error) Window(id string, x Window) Builder WindowPlugin(v *VertexSerialization) (Builder, error) Sort(id string, x Comparator) Builder SortPlugin(v *VertexSerialization) (Builder, error) Remove(id string, x Remover) Builder RemovePlugin(v *VertexSerialization) (Builder, error) FoldLeft(id string, f Fold) Builder FoldLeftPlugin(v *VertexSerialization) (Builder, error) FoldRight(id string, f Fold) Builder FoldRightPlugin(v *VertexSerialization) (Builder, error) Fork(id string, f Fork) (Builder, Builder) ForkPlugin(v *VertexSerialization) (left, right Builder, err error) Loop(id string, x Fork) (loop, out Builder) LoopPlugin(v *VertexSerialization) (loop, out Builder, err error) Publish(id string, s Publisher) PublishPlugin(v *VertexSerialization) error // contains filtered or unexported methods }
Builder is the interface provided for creating a data processing stream.
type Comparator ¶ added in v0.16.5
Comparator is a function to compare 2 data.Data's
type Edge ¶ added in v0.17.0
Edge is an inteface that is used for transferring data between vertices
type EdgeProvider ¶ added in v0.17.0
EdgeProvider is an interface that is used for providing new instances of the Edge interface given the *Option set in the Stream
type Error ¶ added in v0.16.0
type Error struct { Err error `json:"err"` StreamID string `json:"stream_id"` VertexID string `json:"vertex_id"` VertexType string `json:"vertex_type"` Packets []*Packet `json:"payload"` Time time.Time `json:"time"` }
Error type for wrapping errors coming from the Stream
type Fold ¶ added in v0.5.0
Fold is a function used to combine a payload into a single Packet. It may be used with either a Fold Left or Fold Right operation, which starts at the corresponding side and moves through the payload. The returned instance of data.Data is used as the aggregate in the subsequent call.
type Fork ¶ added in v0.5.0
Fork is a function for splitting the payload into 2 separate paths. Default Forks for duplication and error checking are provided by ForkDuplicate and ForkError respectively.
var ( // ForkDuplicate is a Fork that creates a deep copy of the // payload and sends it down both branches. ForkDuplicate Fork = func(payload []*Packet) (a, b []*Packet) { return payload, deepCopyPayload(payload) } // ForkError is a Fork that splits machine.Packets based on // the presence of an error. Errors are sent down the // right side path. ForkError Fork = func(payload []*Packet) (s, f []*Packet) { s = []*Packet{} f = []*Packet{} for _, packet := range payload { if len(packet.Errors) > 0 { f = append(f, packet) } else { s = append(s, packet) } } return s, f } )
type ForkRule ¶ added in v0.5.0
ForkRule is a function that can be converted to a Fork via the Handler method allowing for Forking based on the contents of the data.
type HTTPStream ¶ added in v0.16.0
type HTTPStream interface { Stream Handler() fiber.Handler InjectionHandlers() map[string]fiber.Handler }
HTTPStream is a Stream that also provides a fiber.Handler for receiving data
func NewHTTPStream ¶ added in v0.16.0
func NewHTTPStream(id string, opts ...*Option) HTTPStream
NewHTTPStream a method that creates a Stream which takes in data through a fiber.Handler
func NewHTTPStreamPlugin ¶
func NewHTTPStreamPlugin(v *VertexSerialization) (HTTPStream, error)
NewHTTPStreamPlugin a method that creates a Stream which takes in data through a fiber.Handler
func NewWebsocketStream ¶ added in v0.16.0
func NewWebsocketStream(id string, opts ...*Option) HTTPStream
NewWebsocketStream a method that creates a Stream which takes in data through a fiber.Handler that runs a websocket
func NewWebsocketStreamPlugin ¶
func NewWebsocketStreamPlugin(v *VertexSerialization) (HTTPStream, error)
NewWebsocketStreamPlugin a method that creates a Stream which takes in data through a fiber.Handler that runs a websocket
type Option ¶ added in v0.2.0
type Option struct { // DeepCopy uses encoding/gob to create a deep copy of the payload // before the processing to ensure concurrent map exceptions cannot // happen. Is fairly resource intensive so use with caution. // Default: false DeepCopy *bool `json:"deep_copy,omitempty" mapstructure:"deep_copy,omitempty"` // FIFO controls the processing order of the payloads // If set to true the system will wait for one payload // to be processed before starting the next. // Default: false FIFO *bool `json:"fifo,omitempty" mapstructure:"fifo,omitempty"` // BufferSize sets the buffer size on the edge channels between the // vertices, this setting can be useful when processing large amounts // of data with FIFO turned on. // Default: 0 BufferSize *int `json:"buffer_size,omitempty" mapstructure:"buffer_size,omitempty"` // Span controls whether opentelemetry spans are created for tracing // Packets processed by the system. // Default: true Span *bool `json:"spans_enabled,omitempty" mapstructure:"spans_enabled,omitempty"` // Metrics controls whether opentelemetry metrics are recorded for // Packets processed by the system. // Default: true Metrics *bool `json:"metrics_enabled,omitempty" mapstructure:"metrics_enabled,omitempty"` // Provider determines the edge type to be used, logic for what type of edge // for a given id is required if not using homogeneous edges // Default: nil Provider EdgeProvider `json:"-" mapstructure:"-"` // Validators are used to ensure the incoming Data is compliant // they are run at the start of the stream before creation of Packets // Default: nil Validators map[string]ForkRule `json:"-" mapstructure:"-"` }
Option type for holding machine settings.
type Packet ¶
type Packet struct { ID string `json:"id"` Data data.Data `json:"data"` Errors map[string]error `json:"errors"` }
Packet type that holds information traveling through the machine.
type PluginProvider ¶ added in v0.11.0
PluginProvider is a function loading plugins must return one of the following types:
Subscription Retriever Applicative Comparator Remover Fold Fork Publisher
type Retriever ¶ added in v0.3.0
Retriever is a function that provides data to a generic Stream must stop when the context receives a done signal.
type Stream ¶ added in v0.5.0
type Stream interface { ID() string Run(ctx context.Context) error Inject(id string, payload ...*Packet) VertexIDs() []string Builder() Builder Errors() chan error }
Stream is a representation of a data stream and its associated logic. Creating a new Stream is handled by the appropriately named NewStream function.
The Builder method is the entrypoint into creating the data processing flow. All branches of the Stream are required to end in either a Publish or a Link in order to be considered valid.
func NewStream ¶ added in v0.5.0
NewStream is a function for creating a new Stream. It takes an id, a Retriever function, and a list of Options that can override the defaults and set new defaults for the subsequent vertices in the Stream.
func NewStreamPlugin ¶
func NewStreamPlugin(v *VertexSerialization) (Stream, error)
NewStreamPlugin is a function for creating a new Stream. It takes an id, a Retriever function, and a list of Options that can override the defaults and set new defaults for the subsequent vertices in the Stream.
func NewSubscriptionStream ¶ added in v0.16.0
func NewSubscriptionStream(id string, sub Subscription, interval time.Duration, opts ...*Option) Stream
NewSubscriptionStream creates a Stream from the provider Subscription and pulls data continuously after an interval amount of time
func NewSubscriptionStreamPlugin ¶
func NewSubscriptionStreamPlugin(v *VertexSerialization) (Stream, error)
NewSubscriptionStreamPlugin creates a Stream from the provider Subscription and pulls data continuously after an interval amount of time
type Subscription ¶ added in v0.7.0
Subscription is an interface for creating a pull based stream. It requires 2 methods Read and Close.
Read is called when the interval passes and the resulting payload is sent down the Stream.
Close is called during a graceful termination and any errors are logged.
type VertexSerialization ¶ added in v0.11.0
type VertexSerialization struct { // ID unique identifier for the stream. ID string `json:"id,omitempty" mapstructure:"id,omitempty"` // Type is the name of the PluginProvider to use. Provider string `json:"provider,omitempty" mapstructure:"provider,omitempty"` // Attributes are a map[string]interface{} of properties to be used with the PluginProvider. Attributes map[string]interface{} `json:"attributes,omitempty" mapstructure:"attributes,omitempty"` // contains filtered or unexported fields }
VertexSerialization config based definition for a stream vertex
func (*VertexSerialization) MarshalJSON ¶
func (v *VertexSerialization) MarshalJSON() ([]byte, error)
MarshalJSON implementation to marshal json
func (*VertexSerialization) MarshalYAML ¶
func (v *VertexSerialization) MarshalYAML() (interface{}, error)
MarshalYAML implementation to marshal yaml
func (*VertexSerialization) UnmarshalJSON ¶
func (v *VertexSerialization) UnmarshalJSON(bytez []byte) error
UnmarshalJSON implementation to unmarshal json
func (*VertexSerialization) UnmarshalYAML ¶
func (v *VertexSerialization) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implementation to unmarshal yaml