Documentation ¶
Overview ¶
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Edge ¶
type Edge[T any] interface { Output() chan T Send(payload T) }
Edge is an interface that is used for transferring data between vertices
type Machine ¶ added in v2.2.0
type Machine[T any] interface { // Name returns the name of the Machine path. Useful for debugging or reasoning about the path. Name() string // Then apply a mutation to each individual element of the payload. Then(a Monad[T]) Machine[T] // Recurse applies a recursive function to the payload through a Y Combinator. // f is a function used by the Y Combinator to perform a recursion // on the payload. // Example: // // func(f Monad[int]) Monad[int] { // return func(x int) int { // if x <= 0 { // return 1 // } else { // return x * f(x-1) // } // } // } Recurse(x Monad[Monad[T]]) Machine[T] // Memoize applies a recursive function to the payload through a Y Combinator // and memoizes the results based on the index func. // f is a function used by the Y Combinator to perform a recursion // on the payload. // Example: // // func(f Monad[int]) Monad[int] { // return func(x int) int { // if x <= 0 { // return 1 // } else { // return x * f(x-1) // } // } // } Memoize(x Monad[Monad[T]], index func(T) string) Machine[T] // Or runs all of the functions until one succeeds or sends the payload to the right branch Or(x ...Filter[T]) (Machine[T], Machine[T]) // And runs all of the functions and if one doesnt succeed sends the payload to the right branch And(x ...Filter[T]) (Machine[T], Machine[T]) // Filter splits the data into multiple stream branches If(f Filter[T]) (Machine[T], Machine[T]) // Select applies a series of Filters to the payload and returns a list of Builders // the last one being for any unmatched payloads. Select(fns ...Filter[T]) []Machine[T] // Duplicate splits the data into multiple stream branches Duplicate() (Machine[T], Machine[T]) // While creates a loop in the stream based on the filter While(x Filter[T]) (loop, out Machine[T]) // Drop terminates the data from further processing without passing it on Drop() // Distribute is a function used for fanout Distribute(Edge[T]) Machine[T] // Output provided channel Output() chan T // Converts the Machine to an Edge, important to note // that only paloads to this Machine will be output. // The startFn returned by New must be called to start // this Machine before calling Send on this Edge AsEdge() Edge[T] }
Machine is the interface provided for creating a data processing stream.
type Monad ¶ added in v2.2.0
type Monad[T any] func(d T) T
Monad is a function that is applied to payload and used for transformations
type Option ¶
type Option[T any] struct { // FIFO controls the processing order of the payloads // If set to true the system will wait for one payload // to be processed before starting the next. FIFO bool `json:"fifo,omitempty"` // BufferSize sets the buffer size on the edge channels between the // vertices, this setting can be useful when processing large amounts // of data with FIFO turned on. BufferSize int `json:"buffer_size,omitempty"` // Telemetry provides the ability to enable and configure telemetry Telemetry Telemetry[T] `json:"telemetry,omitempty"` // PanicHandler is a function that is called when a panic occurs PanicHandler func(err error, payload T) `json:"-"` // DeepCopyBetweenVerticies controls whether DeepCopy is performed between verticies. // This is useful if the functions applied are holding copies of the payload for // longer than they process it. DeepCopy must be set DeepCopyBetweenVerticies bool `json:"deep_copy_between_vetricies,omitempty"` // DeepCopy is a function to preform a deep copy of the Payload DeepCopy Monad[T] `json:"-"` }
Option type for holding machine settings.
type Telemetry ¶
type Telemetry[T any] interface { IncrementPayloadCount(vertexName string) IncrementErrorCount(vertexName string) Duration(vertexName string, duration time.Duration) RecordPayload(vertexName string, payload T) RecordError(vertexName string, payload T, err error) }
Telemetry type for holding telemetry settings.