Documentation ΒΆ
Index ΒΆ
- func RegisterPluginProvider(name string, p PluginProvider)
- type Applicative
- type Builder
- type Data
- type Fold
- type Fork
- type ForkRule
- type HealthInfo
- type InjectionCallback
- type Log
- type LogStore
- type Option
- type Packet
- type Pipe
- func (pipe *Pipe) Add(method, path string, handlers ...func(*fiber.Ctx) error) fiber.Router
- func (pipe *Pipe) Load(streams []*StreamSerialization) error
- func (pipe *Pipe) Run(ctx context.Context, port string, gracePeriod time.Duration) error
- func (pipe *Pipe) Stream(stream Stream) Builder
- func (pipe *Pipe) StreamHTTP(id string, opts ...*Option) Builder
- func (pipe *Pipe) StreamSubscription(id string, sub Subscription, interval time.Duration, opts ...*Option) Builder
- func (pipe *Pipe) StreamWebsocket(id string, opts ...*Option) Builder
- func (pipe *Pipe) Use(args ...interface{}) fiber.Router
- type PluginDefinition
- type PluginProvider
- type Publisher
- type Retriever
- type Stream
- type StreamSerialization
- type Subscription
- type VertexSerialization
Constants ΒΆ
This section is empty.
Variables ΒΆ
This section is empty.
Functions ΒΆ
func RegisterPluginProvider ΒΆ added in v0.11.0
func RegisterPluginProvider(name string, p PluginProvider)
RegisterPluginProvider function for registering a PluginProvider to be used for loading VertexProviders
Types ΒΆ
type Applicative ΒΆ added in v0.3.0
Applicative is a function that is applied on an individual basis for each Packet in the payload. The data may be modified or checked for correctness. Any resulting error is combined with current errors in the wrapping Packet.
type Builder ΒΆ
type Builder interface { Map(id string, a Applicative, options ...*Option) Builder FoldLeft(id string, f Fold, options ...*Option) Builder FoldRight(id string, f Fold, options ...*Option) Builder Fork(id string, f Fork, options ...*Option) (Builder, Builder) Loop(id string, x Fork, options ...*Option) (loop, out Builder) Publish(id string, s Publisher, options ...*Option) }
Builder is the interface provided for creating a data processing stream.
type Data ΒΆ added in v0.4.0
Data is a map[string]interface{} used to hold the data flowing into the Stream it can be converted to a struct using the As method, though this is temporary and can be costly.
type Fold ΒΆ added in v0.5.0
Fold is a function used to combine a payload into a single Packet. It may be used with either a Fold Left or Fold Right operation, which starts at the corresponding side and moves through the payload. The returned instance of Data is used as the aggregate in the subsequent call.
type Fork ΒΆ added in v0.5.0
Fork is a function for splitting the payload into 2 separate paths. Default Forks for duplication and error checking are provided by ForkDuplicate and ForkError respectively.
var ( // ForkDuplicate is a Fork that creates a deep copy of the // payload and sends it down both branches. ForkDuplicate Fork = func(payload []*Packet) (a, b []*Packet) { payload2 := []*Packet{} buf := &bytes.Buffer{} enc, dec := gob.NewEncoder(buf), gob.NewDecoder(buf) _ = enc.Encode(payload) _ = dec.Decode(&payload2) for i, packet := range payload { payload2[i].span = packet.span } return payload, payload2 } // ForkError is a Fork that splits machine.Packets based on // the presence of an error. Errors are sent down the // right side path. ForkError Fork = func(payload []*Packet) (s, f []*Packet) { s = []*Packet{} f = []*Packet{} for _, packet := range payload { if packet.Error != nil { f = append(f, packet) } else { s = append(s, packet) } } return s, f } )
type ForkRule ΒΆ added in v0.5.0
ForkRule is a function that can be converted to a Fork via the Handler method allowing for Forking based on the contents of the data.
type HealthInfo ΒΆ added in v0.7.0
type HealthInfo struct { StreamID string `json:"stream_id"` LastPayload time.Time `json:"last_payload"` // contains filtered or unexported fields }
HealthInfo is the type used for providing basic healthcheck information about last start time of payloads
type InjectionCallback ΒΆ added in v0.7.0
type InjectionCallback func(logs ...*Log)
InjectionCallback is a function provided to the LogStore Join method so that the cluster may restart work that has been dropped by one of the workers. Injections will only be processed for vertices that have the Injectable option set to true, which is the default.
type Log ΒΆ added in v0.7.0
type Log struct { OwnerID string `json:"owner_id"` StreamID string `json:"stream_id"` VertexID string `json:"vertex_id"` VertexType string `json:"vertex_type"` State string `json:"state"` Packet *Packet `json:"packet"` When time.Time `json:"when"` }
Log type for holding the data that is recorded from the streams and sent to the LogStore instance
type LogStore ΒΆ added in v0.7.0
type LogStore interface { Join(id string, callback InjectionCallback, streamIDs ...string) error Write(logs ...*Log) Leave(id string) error }
LogStore is an interface for allowing a distributed cluster of workers It requires 3 methods Join, Write, and Leave.
Join is called during the run method of the Pipe and it is used for accouncing membership to the cluster with an identifier, a callback for injection, and the list of Streams that will be running in the pipe. Injection is how work can be restarted in the system and is the responsibility of the implementation to decide when and how it is reinitiated.
Write is called at the beginning of every vertex and provides the current payload about to be run. The implementation is considered the owner of that payload and may modify the data in special known circumstances if need be.
Leave is called during a graceful termination and any errors are logged.
type Option ΒΆ added in v0.2.0
type Option struct { // DeepCopy uses encoding/gob to create a deep copy of the payload // before the processing to ensure concurrent map exceptions cannot // happen. Is fairly resource intensive so use with caution. // Default: false DeepCopy *bool // FIFO controls the processing order of the payloads // If set to true the system will wait for one payload // to be processed before starting the next. // Default: false FIFO *bool // Injectable controls whether the vertex accepts injection calls // if set to false the data will be logged and processing will not // take place. // Default: true Injectable *bool // BufferSize sets the buffer size on the edge channels between the // vertices, this setting can be useful when processing large amounts // of data with FIFO turned on. // Default: 0 BufferSize *int // Span controls whether opentelemetry spans are created for tracing // Packets processed by the system. // Default: true Span *bool // Metrics controls whether opentelemetry metrics are recorded for // Packets processed by the system. // Default: true Metrics *bool }
Option type for holding machine settings.
type Packet ΒΆ
type Packet struct { ID string `json:"id"` Data Data `json:"data"` Error error `json:"error"` // contains filtered or unexported fields }
Packet type that holds information traveling through the machine.
type Pipe ΒΆ added in v0.7.0
type Pipe struct {
// contains filtered or unexported fields
}
Pipe is the representation of the system. It can run multiple Streams and controls the start and stop functionality of the system.
func NewPipe ΒΆ added in v0.7.0
NewPipe is a function for creating a new Pipe. If logStore is nil then the associated feature will be disabled.
func NewPipeWithFiber ΒΆ added in v0.14.3
NewPipeWithFiber is a function for creating a new Pipe. If logStore is nil then the associated feature will be disabled.
if NewPipeWithFiber is used all pipe paths are prefixed with /pipe/:pipe_id
func (*Pipe) Add ΒΆ added in v0.14.2
Add Wraps fiber.App.Add
Add allows you to specify a HTTP method to register a route
func (*Pipe) Load ΒΆ added in v0.9.0
func (pipe *Pipe) Load(streams []*StreamSerialization) error
Load method loads a stream based on the StreamSerialization
func (*Pipe) Run ΒΆ added in v0.7.0
Run starts the Pipe and subsequent Streams. It requires a context, a port to run an instance of fiber.App which hosts the /health endpoint and any HTTP based streams at /stream/:id, and a gracePeriod for which graceful shutdown can take place.
if NewPipeWithFiber is used port is ignored and the previously mentioned paths are prefixed with /pipe/:pipe_id
func (*Pipe) Stream ΒΆ added in v0.8.0
Stream is a method for adding a generic developer defined Stream. New Streams are created by the appropriately named NewStream function.
func (*Pipe) StreamHTTP ΒΆ added in v0.7.0
StreamHTTP a method that creates a Stream at the path /stream/:id which is hosted by the Pipe's fiber.App
func (*Pipe) StreamSubscription ΒΆ added in v0.7.0
func (pipe *Pipe) StreamSubscription(id string, sub Subscription, interval time.Duration, opts ...*Option) Builder
StreamSubscription is a method for creating a Stream based on the provided Subscription which has it's Read method called at the end of each interval period.
func (*Pipe) StreamWebsocket ΒΆ added in v0.14.0
StreamWebsocket a method that creates a Stream at the path /ws/:id which is hosted by the Pipe's fiber.App
func (*Pipe) Use ΒΆ added in v0.7.0
Use Wraps fiber.App.Use
Use registers a middleware route that will match requests with the provided prefix (which is optional and defaults to "/").
app.Use(func(c *fiber.Ctx) error { return c.Next() }) app.Use("/api", func(c *fiber.Ctx) error { return c.Next() }) app.Use("/api", handler, func(c *fiber.Ctx) error { return c.Next() })
This method will match all HTTP verbs: GET, POST, PUT, HEAD etc...
type PluginDefinition ΒΆ added in v0.11.0
type PluginDefinition struct { // Type is the name of the PluginProvider to use. Type string `json:"type" mapstructure:"type"` // Payload is the location, script, etc provided to load the plugin. // Depends on the PluginProvider. Payload string `json:"payload" mapstructure:"payload"` // Symbol is the name of the symbol to be loaded from the plugin. Symbol string `json:"symbol" mapstructure:"symbol"` // Attributes are a map[string]interface{} of properties to be used with the PluginProvider. Attributes map[string]interface{} `json:"attributes" mapstructure:"attributes"` }
PluginDefinition type for declaring the path and symbol for a golang plugin containing the Provider
type PluginProvider ΒΆ added in v0.11.0
type PluginProvider interface {
Load(*PluginDefinition) (interface{}, error)
}
PluginProvider interface for providing a way of loading plugins must return one of the following types:
Subscription Retriever Applicative Fold Fork Publisher
type Retriever ΒΆ added in v0.3.0
Retriever is a function that provides data to a generic Stream must stop when the context receives a done signal.
type Stream ΒΆ added in v0.5.0
type Stream interface { ID() string Run(ctx context.Context, recorders ...recorder) error Inject(ctx context.Context, events map[string][]*Packet) Builder() Builder }
Stream is a representation of a data stream and its associated logic. It may be used individually or hosted by a Pipe. Creating a new Stream is handled by the appropriately named NewStream function.
The Builder method is the entrypoint into creating the data processing flow. All branches of the Stream are required to end in either a Publish or a Link in order to be considered valid.
type StreamSerialization ΒΆ added in v0.11.0
type StreamSerialization struct { // Type type of stream to create. // // For root serializations valid values are 'http', 'subscription', or 'stream'. Type string `json:"type,omitempty" mapstructure:"type,omitempty"` // Interval is the duration in nanoseconds between pulls in a 'subscription' Type. It is only read // if the Type is 'subscription'. Interval time.Duration `json:"interval,omitempty" mapstructure:"interval,omitempty"` *VertexSerialization }
StreamSerialization config based definition for a stream
func (*StreamSerialization) MarshalJSON ΒΆ added in v0.12.4
func (s *StreamSerialization) MarshalJSON() ([]byte, error)
MarshalJSON implementation to marshal json
func (*StreamSerialization) MarshalYAML ΒΆ added in v0.11.0
func (s *StreamSerialization) MarshalYAML() (interface{}, error)
MarshalYAML implementation to marshal yaml
func (*StreamSerialization) UnmarshalJSON ΒΆ added in v0.12.4
func (s *StreamSerialization) UnmarshalJSON(data []byte) error
UnmarshalJSON implementation to unmarshal json
func (*StreamSerialization) UnmarshalYAML ΒΆ added in v0.11.0
func (s *StreamSerialization) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implementation to unmarshal yaml
type Subscription ΒΆ added in v0.7.0
Subscription is an interface for creating a pull based stream. It requires 2 methods Read and Close.
Read is called when the interval passes and the resulting payload is sent down the Stream.
Close is called during a graceful termination and any errors are logged.
type VertexSerialization ΒΆ added in v0.11.0
type VertexSerialization struct { // ID unique identifier for the stream. ID string `json:"id,omitempty" mapstructure:"id,omitempty"` // Provider Plugin information to load Provider *PluginDefinition `json:"provider,omitempty" mapstructure:"provider,omitempty"` // Options are a slice of machine.Option https://godoc.org/github.com/whitaker-io/machine#Option Options []*Option `json:"options,omitempty" mapstructure:"options,omitempty"` // contains filtered or unexported fields }
VertexSerialization config based definition for a stream vertex