Documentation
¶
Overview ¶
Package pipeline provides configuration option types for analysis pipeline items and composable building blocks for concurrent pipeline construction: RunPC (producer-consumer goroutine skeleton), Phase/RunPhases (chain-of-responsibility), Batcher (batching strategies), DispatchFunc (dispatch strategy), and Fetcher (cache decorator pattern).
Index ¶
- func RunPhases[S any](ctx context.Context, s S, phases ...Phase[S]) (S, error)
- func SignalOnDrain[T any](src <-chan T) (forwarded <-chan T, drained <-chan struct{})
- type Batcher
- type ConfigurationOption
- type ConfigurationOptionType
- type DispatchFunc
- type Fetcher
- type FetcherFunc
- type PassthroughBatcher
- type Phase
- type PhaseFunc
- type RunPC
- type SharedResponse
- type ThresholdBatcher
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RunPhases ¶
RunPhases executes phases sequentially, threading state through each one. Returns immediately on the first error, preserving the partial state. Returns the input state unchanged when no phases are provided.
func SignalOnDrain ¶
func SignalOnDrain[T any](src <-chan T) (forwarded <-chan T, drained <-chan struct{})
SignalOnDrain forwards items from src to the returned forwarded channel and closes the returned drained channel once src is exhausted. This enables ending pipeline stage spans independently.
Types ¶
type Batcher ¶
type Batcher[In, Batch any] interface { // Add adds an item. Returns true if the batch is ready to flush. Add(In) bool // Flush returns the current batch and resets. Returns false if empty. Flush() (Batch, bool) }
Batcher accumulates input items and produces batches.
type ConfigurationOption ¶
type ConfigurationOption struct {
// Default is the initial value of the configuration option.
Default any
// Name identifies the configuration option in facts.
Name string
// Description represents the help text about the configuration option.
Description string
// Flag corresponds to the CLI token with "--" prepended.
Flag string
// Type specifies the kind of the configuration option's value.
Type ConfigurationOptionType
}
ConfigurationOption allows for the unified, retrospective way to setup PipelineItem-s.
func (ConfigurationOption) FormatDefault ¶
func (opt ConfigurationOption) FormatDefault() string
FormatDefault converts the default value of ConfigurationOption to string. Used in the command line interface to show the argument's default value.
type ConfigurationOptionType ¶
type ConfigurationOptionType int
ConfigurationOptionType represents the possible types of a ConfigurationOption's value.
const ( // BoolConfigurationOption reflects the boolean value type. BoolConfigurationOption ConfigurationOptionType = iota // IntConfigurationOption reflects the integer value type. IntConfigurationOption // StringConfigurationOption reflects the string value type. StringConfigurationOption // FloatConfigurationOption reflects a floating point value type. FloatConfigurationOption // StringsConfigurationOption reflects the array of strings value type. StringsConfigurationOption // PathConfigurationOption reflects the file system path value type. PathConfigurationOption )
func (ConfigurationOptionType) String ¶
func (opt ConfigurationOptionType) String() string
String returns an empty string for the boolean type, "int" for integers and "string" for strings. It is used in the command line interface to show the argument's type.
type DispatchFunc ¶
DispatchFunc sends a request to a worker pool. The worker channel is captured in the closure, keeping the dispatch strategy decoupled from request semantics.
type Fetcher ¶
Fetcher retrieves a response for a given request. It serves as the base interface for the cache decorator pattern: wrap a Fetcher with "check cache → fetch misses → update cache" logic.
type FetcherFunc ¶
FetcherFunc adapts a plain function to the Fetcher interface.
type PassthroughBatcher ¶
type PassthroughBatcher[T any] struct { // contains filtered or unexported fields }
PassthroughBatcher wraps each input item as a single-element batch. Add always returns true, meaning every item is immediately ready.
func (*PassthroughBatcher[T]) Add ¶
func (b *PassthroughBatcher[T]) Add(item T) bool
Add stores the item and returns true (always ready).
func (*PassthroughBatcher[T]) Flush ¶
func (b *PassthroughBatcher[T]) Flush() ([]T, bool)
Flush returns the stored item as a single-element slice and resets. Returns false if Add was not called since the last flush.
type RunPC ¶
type RunPC[In, Out, Job any] struct { // Buffer sets the capacity of the internal jobs channel. // Values below 1 are clamped to 1. Buffer int // Produce reads the input and sends work items on the jobs channel. Produce func(ctx context.Context, in In, jobs chan<- Job) // Consume reads work items from jobs and sends results on out. Consume func(ctx context.Context, jobs <-chan Job, out chan<- Out) }
RunPC is a producer-consumer micro-skeleton that owns the goroutine topology: channel creation, goroutine spawning, and orderly shutdown.
Type parameters:
- In: the input consumed by the producer
- Out: the output emitted by the consumer
- Job: the internal work item flowing from producer to consumer
The Produce function reads from in and writes jobs to the jobs channel. The Consume function reads jobs and writes results to the out channel. Neither function should close its output channel; RunPC handles that.
type SharedResponse ¶
type SharedResponse[T any] struct { // contains filtered or unexported fields }
SharedResponse evaluates a computation exactly once and caches the result for concurrent access by multiple goroutines. The computation receives a context.Context for cancellation support.
func NewSharedResponse ¶
func NewSharedResponse[T any](compute func(context.Context) (T, error)) *SharedResponse[T]
NewSharedResponse creates a SharedResponse that will evaluate compute on the first call to SharedResponse.Get. The compute function must not be nil.
func (*SharedResponse[T]) Get ¶
func (s *SharedResponse[T]) Get(ctx context.Context) (T, error)
Get evaluates the compute function exactly once (via sync.Once) and returns the cached (result, error) pair. Subsequent calls return the same values without re-evaluation, regardless of the context passed.
type ThresholdBatcher ¶
type ThresholdBatcher[T any] struct { // contains filtered or unexported fields }
ThresholdBatcher accumulates items into a slice until the count reaches the configured threshold, at which point Add returns true.
func NewThresholdBatcher ¶
func NewThresholdBatcher[T any](threshold int) *ThresholdBatcher[T]
NewThresholdBatcher creates a batcher that signals readiness after threshold items. Threshold values below 1 are clamped to 1.
func (*ThresholdBatcher[T]) Add ¶
func (b *ThresholdBatcher[T]) Add(item T) bool
Add appends an item. Returns true when the batch reaches the threshold.
func (*ThresholdBatcher[T]) Flush ¶
func (b *ThresholdBatcher[T]) Flush() ([]T, bool)
Flush returns the accumulated items and resets the internal buffer. Returns false if no items have been added since the last flush.
type WorkerPool ¶
type WorkerPool[T any] struct { // MaxParallel is the maximum number of concurrent goroutines. // Zero defaults to runtime.NumCPU(). MaxParallel int // Work processes a single item. Must not be nil. Work func(ctx context.Context, item T) error }
WorkerPool runs Work on each item with at most MaxParallel goroutines. Returns the first non-nil error encountered, or nil. Remaining goroutines observe context cancellation on first error.