Documentation
¶
Overview ¶
Package streamdal is a library that allows running of Client data pipelines against data This package is designed to be included in golang message bus libraries. The only public method is Process() which is used to run pipelines against data.
Use of this package requires a running instance of a streamdal server©. The server can be downloaded at https://github.com/streamdal/streamdal/tree/main/apps/server
The following environment variables must be set: - STREAMDAL_URL: The address of the Client server - STREAMDAL_TOKEN: The token to use when connecting to the Client server - STREAMDAL_SERVICE_NAME: The name of the service to identify it in the streamdal console
Optional parameters: - STREAMDAL_DRY_RUN: If true, rule hits will only be logged, no failure modes will be ran
Index ¶
Constants ¶
const ( // DefaultPipelineTimeoutDurationStr is the default timeout for a pipeline execution. DefaultPipelineTimeoutDurationStr = "100ms" // DefaultStepTimeoutDurationStr is the default timeout for a single step. DefaultStepTimeoutDurationStr = "10ms" // DefaultSamplingRate is the default number at which messages will be sampled // per SamplingIntervalSeconds if SamplingEnabled is set to true and SampleRate // is not specified in the config. DefaultSamplingRate = 1 // DefaultSamplingIntervalSeconds is the default interval at which sampling will be done // if SamplingEnabled is set to true and SamplingIntervalSeconds is not specified in the config. DefaultSamplingIntervalSeconds = 1 // DefaultModeAsyncWorkers is the default number of workers that will be spawned // by the SDK when config.Mode is set to ModeAsync. // // Higher amount == improved .Process() speeds but heavier resource usage. DefaultModeAsyncWorkers = 2 // DefaultMetricsNumWorkers is the default number of workers that the SDK will // spawn for collecting and sending metrics to the configured Streamdal server. // // NOTE: Metrics are NOT optional. The server uses metrics sent by the SDK // clients to determine if clients are alive, display usage stats and many // other features. DefaultMetricsNumWorkers = 1 // ReconnectSleep determines the length of time the SDK will wait between // reconnect attempts to streamdal server after the streamdal server goes away. ReconnectSleep = time.Second * 5 // MaxWASMPayloadSize is the maximum size of data that can be sent to the WASM module MaxWASMPayloadSize = 1024 * 1024 // 1Mi // ClientTypeSDK & ClientTypeShim are referenced by shims and SDKs to indicate // in what context this SDK is being used. ClientTypeSDK ClientType = 1 ClientTypeShim ClientType = 2 // OperationTypeConsumer and OperationTypeProducer are used to indicate the // type of operation the Process() call is performing. OperationTypeConsumer OperationType = 1 OperationTypeProducer OperationType = 2 ModeSync SDKMode = SDKMode(protos.SDKMode_SDK_MODE_SYNC) ModeAsync SDKMode = SDKMode(protos.SDKMode_SDK_MODE_ASYNC) AbortAllStr = "aborted all pipelines" AbortCurrentStr = "aborted current pipeline" AbortNoneStr = "no abort condition" // ExecStatusTrue ExecStatusFalse ExecStatusError are used to indicate // the execution status of the _last_ step in the _last_ pipeline. ExecStatusTrue = protos.ExecStatus_EXEC_STATUS_TRUE ExecStatusFalse = protos.ExecStatus_EXEC_STATUS_FALSE ExecStatusError = protos.ExecStatus_EXEC_STATUS_ERROR ExecStatusAsync = protos.ExecStatus_EXEC_STATUS_ASYNC ExecStatusSampling = protos.ExecStatus_EXEC_STATUS_SAMPLING WazeroExecutionModeCompiler WazeroExecutionMode = 1 WazeroExecutionModeInterpreter WazeroExecutionMode = 2 DefaultWazeroExecutionMode = WazeroExecutionModeCompiler )
const ( // NumTailWorkers is the number of tail workers to start for each tail request // The workers are responsible for reading from the tail channel and streaming // TailResponse messages to the server NumTailWorkers = 2 // MinTailResponseIntervalMS is how often we send a TailResponse to the server // If this rate is exceeded, we will drop messages rather than flooding the server // This is an int to avoid a .Milliseconds() call MinTailResponseIntervalMS = 10 )
Variables ¶
var ( ErrEmptyConfig = errors.New("config cannot be empty") ErrEmptyServiceName = errors.New("data source cannot be empty") ErrEmptyOperationName = errors.New("operation name cannot be empty") ErrInvalidOperationType = errors.New("operation type must be set to either OperationTypeConsumer or OperationTypeProducer") ErrEmptyComponentName = errors.New("component name cannot be empty") ErrEmptyCommand = errors.New("command cannot be empty") ErrEmptyProcessRequest = errors.New("process request cannot be empty") // ErrMaxPayloadSizeExceeded is returned when the payload is bigger than MaxWASMPayloadSize ErrMaxPayloadSizeExceeded = fmt.Errorf("payload size exceeds maximum of '%d' bytes", MaxWASMPayloadSize) // ErrPipelineTimeout is returned when a pipeline exceeds the configured timeout ErrPipelineTimeout = errors.New("pipeline timeout exceeded") // ErrClosedClient is returned when .Process() is called after .Close() ErrClosedClient = errors.New("client is closed") )
Functions ¶
This section is empty.
Types ¶
type Audience ¶
type Audience struct { ComponentName string OperationType OperationType OperationName string }
Audience is used to announce an audience to the Streamdal server on library initialization We use this to avoid end users having to import our protos
type ClientType ¶
type ClientType int
ClientType is used to indicate if this library is being used by a shim or directly (as an SDK)
type Config ¶
type Config struct { // ServerURL the hostname and port for the gRPC API of Streamdal Server. // If this value is left empty, the library will not attempt to connect to // the server and New() will return nil. // // REQUIRED ServerURL string // ServerToken is the authentication token for the gRPC API of the Streamdal server // If this value is left empty, the library will not attempt to connect to the server // and New() will return nil. // // REQUIRED ServerToken string // ServiceName is the name that this library will identify as in the UI. // // REQUIRED ServiceName string // PipelineTimeout defines how long this library will allow a pipeline to run. // // Optional; default: 100ms PipelineTimeout time.Duration // StepTimeout defines how long this library will allow a single step to run. // // Optional; default: 10ms StepTimeout time.Duration // IgnoreStartupError defines how to handle an error on initial startup via // New(). If left as false, failure to complete startup (such as bad auth) // will cause New() to return an error. If true, the library will block and // continue trying to initialize. You may want to adjust this if you want // your application to behave a certain way on startup when the server // is unavailable. // // Optional; default: false IgnoreStartupError bool // If specified, library will connect to the server but won't apply any // pipelines. // // Optional; default: false DryRun bool // ShutdownCtx is a context that the library will listen to for cancellation // notices. Upon cancelling this context, SDK will stop all active goroutines // and free up all used resources. // // Optional; default: nil ShutdownCtx context.Context // Logger is a logger you can inject (such as logrus) to allow this library // to log output. // // Optional; default: nil Logger logger.Logger // Audiences is a list of audiences you can specify at registration time. // This is useful if you know your audiences in advance and want to populate // service groups in the Streamdal UI _before_ your code executes any .Process() // calls. // // NOTE: If you do not specify audiences here, they will only show up in // the UI _after_ the first call to .Process() completes! // // Optional; default: nil Audiences []*Audience // ClientType specifies whether this of the SDK is used in a shim library or // as a standalone SDK. This information is used for both debug info and to // help the library determine whether ServerURL and ServerToken should be // optional or required. // // Optional; default: ClientTypeSDK ClientType ClientType // EnableStdout enables ability for wasm modules to write to stdout EnableStdout bool // EnableStderr enables ability for wasm modules to write to stderr EnableStderr bool // SamplingEnabled enables sampling of data. This will cause Process() to // ignore some messages if the rate exceeds the SamplingRate per SamplingIntervalSeconds. // This is useful for high-throughput systems where you cannot .Process() // every message. SamplingEnabled bool // SamplingRate is the rate at which messages will be sampled per SamplingIntervalSeconds // // Default: 1 SamplingRate int // SamplingIntervalSeconds is the interval at which sampling will be done. // // Default: 1 SamplingIntervalSeconds int // SDKMode is how Process() will handle messages. // ModeSync = Process() will block until the pipeline is complete // ModeAsync = Process() will return immediately and the pipeline will run in the background via workers // // Default: ModeSync Mode SDKMode // ModeAsyncNumWorkers is the number of workers SDK will spawn when Mode is // set to ModeAsync. // // Default: 4 ModeAsyncNumWorkers int // Number of workers that the SDK will run for collecting and sending metrics // // Default: 1 MetricsNumWorkers int // WazeroExecutionMode defines how the SDK will execute wasm modules. // // WazeroExecutionModeCompiler = slow instantiation, faster execution // WazeroExecutionModeInterpreter = fast instantiation, slower execution // // Default: WazeroExecutionModeCompiler WazeroExecutionMode WazeroExecutionMode }
type IStreamdal ¶
type IStreamdal interface { // Process is used to run data pipelines against data Process(ctx context.Context, req *ProcessRequest) *ProcessResponse // Close should be called when streamdal client is no longer being used; // will stop all active goroutines and clean up resources; client should NOT // be re-used after Close() is called. Close() }
type OperationType ¶
type OperationType int
OperationType is used to indicate if the operation is a consumer or a producer
type ProcessRequest ¶
type ProcessRequest struct { ComponentName string OperationType OperationType OperationName string Data []byte }
ProcessRequest is used to maintain a consistent API for the Process() call
type ProcessResponse ¶
type ProcessResponse protos.SDKResponse
ProcessResponse is the response struct from a Process() call
type SDKMode ¶ added in v0.1.25
type SDKMode int
SDKMode is how Process() will handle messages ModeSync = Process() will block until the pipeline is complete ModeAsync = Process() will return immediately and the pipeline will run in the background
type Streamdal ¶
type Streamdal struct {
// contains filtered or unexported fields
}
Streamdal is the main struct for this library
func (*Streamdal) Process ¶
func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessResponse
type Tail ¶
type Tail struct { Request *protos.Command CancelFunc context.CancelFunc // contains filtered or unexported fields }
func (*Tail) ShipResponse ¶
func (t *Tail) ShipResponse(tr *protos.TailResponse)
func (*Tail) ShouldSend ¶
type WazeroExecutionMode ¶ added in v0.1.31
type WazeroExecutionMode int
WazeroExecutionMode is used to indicate how the SDK will execute wasm modules
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package helper contains WASM-related helper functions and methods.
|
Package helper contains WASM-related helper functions and methods. |
Package hostfunc contains host function methods.
|
Package hostfunc contains host function methods. |
kvfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
loggerfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
Package metrics is responsible for tracking and publishing metrics to the Streamdal server.
|
Package metrics is responsible for tracking and publishing metrics to the Streamdal server. |
metricsfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |
Package server is a wrapper for the Client gRPC API.
|
Package server is a wrapper for the Client gRPC API. |
serverfakes
Code generated by counterfeiter.
|
Code generated by counterfeiter. |