Documentation
¶
Overview ¶
Package streamdal is a library that allows running of Client data pipelines against data This package is designed to be included in golang message bus libraries. The only public method is Process() which is used to run pipelines against data.
Use of this package requires a running instance of a streamdal server©. The server can be downloaded at https://github.com/streamdal/streamdal/tree/main/apps/server
The following environment variables must be set: - STREAMDAL_URL: The address of the Client server - STREAMDAL_TOKEN: The token to use when connecting to the Client server - STREAMDAL_SERVICE_NAME: The name of the service to identify it in the streamdal console
Optional parameters: - STREAMDAL_DRY_RUN: If true, rule hits will only be logged, no failure modes will be ran
Index ¶
Constants ¶
const ( // DefaultPipelineTimeoutDurationStr is the default timeout for a pipeline execution DefaultPipelineTimeoutDurationStr = "100ms" // DefaultStepTimeoutDurationStr is the default timeout for a single step. DefaultStepTimeoutDurationStr = "10ms" // ReconnectSleep determines the length of time to wait between reconnect attempts to streamdal server© ReconnectSleep = time.Second * 5 // MaxWASMPayloadSize is the maximum size of data that can be sent to the WASM module MaxWASMPayloadSize = 1024 * 1024 // 1Mi // ClientTypeSDK & ClientTypeShim are referenced by shims and SDKs to indicate // in what context this SDK is being used. ClientTypeSDK ClientType = 1 ClientTypeShim ClientType = 2 // OperationTypeConsumer and OperationTypeProducer are used to indicate the // type of operation the Process() call is performing. OperationTypeConsumer OperationType = 1 OperationTypeProducer OperationType = 2 AbortAllStr = "aborted all pipelines" AbortCurrentStr = "aborted current pipeline" AbortNoneStr = "no abort condition" // ExecStatusTrue ExecStatusFalse ExecStatusError are used to indicate // the execution status of the _last_ step in the _last_ pipeline. ExecStatusTrue = protos.ExecStatus_EXEC_STATUS_TRUE ExecStatusFalse = protos.ExecStatus_EXEC_STATUS_FALSE ExecStatusError = protos.ExecStatus_EXEC_STATUS_ERROR )
const ( // NumTailWorkers is the number of tail workers to start for each tail request // The workers are responsible for reading from the tail channel and streaming // TailResponse messages to the server NumTailWorkers = 2 // MinTailResponseIntervalMS is how often we send a TailResponse to the server // If this rate is exceeded, we will drop messages rather than flooding the server // This is an int to avoid a .Milliseconds() call MinTailResponseIntervalMS = 10 )
Variables ¶
var ( ErrEmptyConfig = errors.New("config cannot be empty") ErrEmptyServiceName = errors.New("data source cannot be empty") ErrEmptyOperationName = errors.New("operation name cannot be empty") ErrInvalidOperationType = errors.New("operation type must be set to either OperationTypeConsumer or OperationTypeProducer") ErrEmptyComponentName = errors.New("component name cannot be empty") ErrEmptyCommand = errors.New("command cannot be empty") ErrEmptyProcessRequest = errors.New("process request cannot be empty") // ErrMaxPayloadSizeExceeded is returned when the payload is bigger than MaxWASMPayloadSize ErrMaxPayloadSizeExceeded = fmt.Errorf("payload size exceeds maximum of '%d' bytes", MaxWASMPayloadSize) // ErrPipelineTimeout is returned when a pipeline exceeds the configured timeout ErrPipelineTimeout = errors.New("pipeline timeout exceeded") // ErrClosedClient is returned when .Process() is called after .Close() ErrClosedClient = errors.New("client is closed") )
Functions ¶
This section is empty.
Types ¶
type Audience ¶
type Audience struct { ComponentName string OperationType OperationType OperationName string }
Audience is used to announce an audience to the Streamdal server on library initialization We use this to avoid end users having to import our protos
type ClientType ¶
type ClientType int
ClientType is used to indicate if this library is being used by a shim or directly (as an SDK)
type Config ¶
type Config struct { // ServerURL the hostname and port for the gRPC API of Streamdal Server // If this value is left empty, the library will not attempt to connect to the server // and New() will return nil ServerURL string // ServerToken is the authentication token for the gRPC API of the Streamdal server // If this value is left empty, the library will not attempt to connect to the server // and New() will return nil ServerToken string // ServiceName is the name that this library will identify as in the UI. Required ServiceName string // PipelineTimeout defines how long this library will allow a pipeline to // run. Optional; default: 100ms PipelineTimeout time.Duration // StepTimeout defines how long this library will allow a single step to run. // Optional; default: 10ms StepTimeout time.Duration // IgnoreStartupError defines how to handle an error on initial startup via // New(). If left as false, failure to complete startup (such as bad auth) // will cause New() to return an error. If true, the library will block and // continue trying to initialize. You may want to adjust this if you want // your application to behave a certain way on startup when the server // is unavailable. Optional; default: false IgnoreStartupError bool // If specified, library will connect to the server but won't apply any // pipelines. Optional; default: false DryRun bool // ShutdownCtx is a context that the library will listen to for cancellation // notices. Upon cancelling this context, SDK will stop all active goroutines // and free up all used resources. Optional; default: nil ShutdownCtx context.Context // Logger is a logger you can inject (such as logrus) to allow this library // to log output. Optional; default: nil Logger logger.Logger // Audiences is a list of audiences you can specify at registration time. // This is useful if you know your audiences in advance and want to populate // service groups in the Streamdal UI _before_ your code executes any .Process() // calls. Optional; default: nil Audiences []*Audience // ClientType specifies whether this of the SDK is used in a shim library or // as a standalone SDK. This information is used for both debug info and to // help the library determine whether ServerURL and ServerToken should be // optional or required. Optional; default: ClientTypeSDK ClientType ClientType // EnableStdout enables ability for wasm modules to write to stdout EnableStdout bool // EnableStderr enables ability for wasm modules to write to stderr EnableStderr bool }
type IStreamdal ¶
type IStreamdal interface { // Process is used to run data pipelines against data Process(ctx context.Context, req *ProcessRequest) *ProcessResponse // Close should be called when streamdal client is no longer being used; // will stop all active goroutines and clean up resources; client should NOT // be re-used after Close() is called. Close() }
type OperationType ¶
type OperationType int
OperationType is used to indicate if the operation is a consumer or a producer
type ProcessRequest ¶
type ProcessRequest struct { ComponentName string OperationType OperationType OperationName string Data []byte }
ProcessRequest is used to maintain a consistent API for the Process() call
type ProcessResponse ¶
type ProcessResponse protos.SDKResponse
ProcessResponse is the response struct from a Process() call
type Streamdal ¶
type Streamdal struct {
// contains filtered or unexported fields
}
Streamdal is the main struct for this library
func (*Streamdal) Process ¶
func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessResponse
type Tail ¶
type Tail struct { Request *protos.Command CancelFunc context.CancelFunc // contains filtered or unexported fields }
func (*Tail) ShipResponse ¶
func (t *Tail) ShipResponse(tr *protos.TailResponse)
func (*Tail) ShouldSend ¶
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package helper contains WASM-related helper functions and methods.
|
Package helper contains WASM-related helper functions and methods. |
Package hostfunc contains host function methods.
|
Package hostfunc contains host function methods. |
kvfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
loggerfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
Package metrics is responsible for tracking and publishing metrics to the Streamdal server.
|
Package metrics is responsible for tracking and publishing metrics to the Streamdal server. |
metricsfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
Package server is a wrapper for the Client gRPC API.
|
Package server is a wrapper for the Client gRPC API. |
serverfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |