Documentation
¶
Overview ¶
Package flux provides reactive configuration synchronization primitives.
The core type is Capacitor, which watches external sources for changes, deserializes and validates the data, and delivers it to application code with automatic rollback on failure.
Capacitor ¶
A Capacitor monitors a source for changes and processes them through a pipeline:
Source → Deserialize → Validate → Pipeline → Store
If any step fails, the previous valid configuration is retained and the Capacitor enters a degraded state while continuing to watch for valid updates.
Validation ¶
Configuration types must implement the Validator interface:
type Validator interface {
Validate() error
}
This gives full control over validation logic. For simple cases, you can delegate to a validation library like go-playground/validator within your Validate method.
State Machine ¶
Capacitor maintains one of four states:
- Loading: Initial state, no config yet
- Healthy: Valid config applied
- Degraded: Last change failed, previous config still active
- Empty: Initial load failed, no valid config ever obtained
Watchers ¶
The Watcher interface abstracts change sources. The core package provides ChannelWatcher for testing. Additional watchers are available in pkg/:
- pkg/file: File watcher using fsnotify
- pkg/redis: Redis keyspace notifications
- pkg/consul: Consul blocking queries
- pkg/etcd: etcd Watch API
- pkg/nats: NATS JetStream KV
- pkg/kubernetes: ConfigMap/Secret watch
- pkg/zookeeper: ZooKeeper node watch
- pkg/firestore: Firestore realtime listeners
Example ¶
type AppConfig struct {
Port int `json:"port"`
Host string `json:"host"`
}
func (c AppConfig) Validate() error {
if c.Port < 1 || c.Port > 65535 {
return errors.New("port must be between 1 and 65535")
}
if c.Host == "" {
return errors.New("host is required")
}
return nil
}
capacitor := flux.New[AppConfig](
file.New("/etc/myapp/config.json"),
func(ctx context.Context, prev, curr AppConfig) error {
log.Printf("config changed: %+v -> %+v", prev, curr)
return app.Reconfigure(curr)
},
)
if err := capacitor.Start(ctx); err != nil {
log.Printf("initial config failed: %v", err)
}
Index ¶
- Constants
- Variables
- func UseApply[T Validator](identity pipz.Identity, ...) pipz.Chainable[*Request[T]]
- func UseBackoff[T Validator](maxAttempts int, baseDelay time.Duration, ...) pipz.Chainable[*Request[T]]
- func UseEffect[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) error) pipz.Chainable[*Request[T]]
- func UseEnrich[T Validator](identity pipz.Identity, ...) pipz.Chainable[*Request[T]]
- func UseFallback[T Validator](primary pipz.Chainable[*Request[T]], fallbacks ...pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
- func UseFilter[T Validator](identity pipz.Identity, condition func(context.Context, *Request[T]) bool, ...) pipz.Chainable[*Request[T]]
- func UseMutate[T Validator](identity pipz.Identity, ...) pipz.Chainable[*Request[T]]
- func UseRateLimit[T Validator](rate float64, burst int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
- func UseRetry[T Validator](maxAttempts int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
- func UseTimeout[T Validator](d time.Duration, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
- func UseTransform[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) *Request[T]) pipz.Chainable[*Request[T]]
- type Capacitor
- func (c *Capacitor[T]) Clock(clock clockz.Clock) *Capacitor[T]
- func (c *Capacitor[T]) Codec(codec Codec) *Capacitor[T]
- func (c *Capacitor[T]) Current() (T, bool)
- func (c *Capacitor[T]) Debounce(d time.Duration) *Capacitor[T]
- func (c *Capacitor[T]) ErrorHistory() []error
- func (c *Capacitor[T]) ErrorHistorySize(n int) *Capacitor[T]
- func (c *Capacitor[T]) LastError() error
- func (c *Capacitor[T]) Metrics(provider MetricsProvider) *Capacitor[T]
- func (c *Capacitor[T]) OnStop(fn func(State)) *Capacitor[T]
- func (c *Capacitor[T]) Process(ctx context.Context) bool
- func (c *Capacitor[T]) Start(ctx context.Context) error
- func (c *Capacitor[T]) StartupTimeout(d time.Duration) *Capacitor[T]
- func (c *Capacitor[T]) State() State
- func (c *Capacitor[T]) SyncMode() *Capacitor[T]
- type ChannelWatcher
- type Codec
- type CompositeCapacitor
- func (c *CompositeCapacitor[T]) Clock(clock clockz.Clock) *CompositeCapacitor[T]
- func (c *CompositeCapacitor[T]) Codec(codec Codec) *CompositeCapacitor[T]
- func (c *CompositeCapacitor[T]) Current() (T, bool)
- func (c *CompositeCapacitor[T]) Debounce(d time.Duration) *CompositeCapacitor[T]
- func (c *CompositeCapacitor[T]) ErrorHistory() []error
- func (c *CompositeCapacitor[T]) ErrorHistorySize(n int) *CompositeCapacitor[T]
- func (c *CompositeCapacitor[T]) LastError() error
- func (c *CompositeCapacitor[T]) Metrics(provider MetricsProvider) *CompositeCapacitor[T]
- func (c *CompositeCapacitor[T]) OnStop(fn func(State)) *CompositeCapacitor[T]
- func (c *CompositeCapacitor[T]) Process(ctx context.Context) bool
- func (c *CompositeCapacitor[T]) SourceErrors() []SourceError
- func (c *CompositeCapacitor[T]) Start(ctx context.Context) error
- func (c *CompositeCapacitor[T]) StartupTimeout(d time.Duration) *CompositeCapacitor[T]
- func (c *CompositeCapacitor[T]) State() State
- func (c *CompositeCapacitor[T]) SyncMode() *CompositeCapacitor[T]
- type JSONCodec
- type MetricsProvider
- type NoOpMetricsProvider
- type Option
- func WithBackoff[T Validator](maxAttempts int, baseDelay time.Duration) Option[T]
- func WithCircuitBreaker[T Validator](failures int, recovery time.Duration) Option[T]
- func WithErrorHandler[T Validator](handler pipz.Chainable[*pipz.Error[*Request[T]]]) Option[T]
- func WithFallback[T Validator](fallbacks ...pipz.Chainable[*Request[T]]) Option[T]
- func WithMiddleware[T Validator](processors ...pipz.Chainable[*Request[T]]) Option[T]
- func WithPipeline[T Validator](identity pipz.Identity) Option[T]
- func WithRetry[T Validator](maxAttempts int) Option[T]
- func WithTimeout[T Validator](d time.Duration) Option[T]
- type Reducer
- type Request
- type SourceError
- type State
- type Validator
- type Watcher
- type YAMLCodec
Constants ¶
const DefaultDebounce = 100 * time.Millisecond
DefaultDebounce is the default debounce duration for change processing.
Variables ¶
var ( // KeyState is the current state of the Capacitor. KeyState = capitan.NewStringKey("state") // KeyOldState is the previous state before a transition. KeyOldState = capitan.NewStringKey("old_state") // KeyNewState is the new state after a transition. KeyNewState = capitan.NewStringKey("new_state") // KeyError is the error message when an operation fails. KeyError = capitan.NewStringKey("error") // KeyDebounce is the configured debounce duration. KeyDebounce = capitan.NewDurationKey("debounce") )
Field keys for Capacitor events.
var ( // CapacitorStarted is emitted when a Capacitor begins watching. CapacitorStarted = capitan.NewSignal( "flux.capacitor.started", "Capacitor watching started", ) // CapacitorStopped is emitted when a Capacitor stops watching. CapacitorStopped = capitan.NewSignal( "flux.capacitor.stopped", "Capacitor watching stopped", ) // CapacitorStateChanged is emitted when a Capacitor transitions between states. CapacitorStateChanged = capitan.NewSignal( "flux.capacitor.state.changed", "Capacitor state transition", ) )
Capacitor lifecycle signals.
var ( // CapacitorChangeReceived is emitted when raw data is received from the watcher. CapacitorChangeReceived = capitan.NewSignal( "flux.capacitor.change.received", "Raw change received from watcher", ) // CapacitorTransformFailed is emitted when the transform function fails. CapacitorTransformFailed = capitan.NewSignal( "flux.capacitor.transform.failed", "Transform function failed", ) // CapacitorValidationFailed is emitted when validation fails. CapacitorValidationFailed = capitan.NewSignal( "flux.capacitor.validation.failed", "Validation failed", ) // CapacitorApplyFailed is emitted when the apply function fails. CapacitorApplyFailed = capitan.NewSignal( "flux.capacitor.apply.failed", "Apply function failed", ) // CapacitorApplySucceeded is emitted when configuration is successfully applied. CapacitorApplySucceeded = capitan.NewSignal( "flux.capacitor.apply.succeeded", "Config applied successfully", ) )
Change processing signals.
Functions ¶
func UseApply ¶
func UseApply[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) (*Request[T], error)) pipz.Chainable[*Request[T]]
UseApply creates a processor that can transform the request and fail. Use for operations like enrichment, validation, or transformation that may produce errors.
func UseBackoff ¶
func UseBackoff[T Validator](maxAttempts int, baseDelay time.Duration, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
UseBackoff wraps a processor with exponential backoff retry logic. Failed operations are retried with increasing delays.
func UseEffect ¶
func UseEffect[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) error) pipz.Chainable[*Request[T]]
UseEffect creates a processor that performs a side effect. The request passes through unchanged. Use for logging, metrics, or notifications that should not affect the configuration value.
func UseEnrich ¶
func UseEnrich[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) (*Request[T], error)) pipz.Chainable[*Request[T]]
UseEnrich creates a processor that attempts optional enhancement. If the enrichment fails, the error is logged but processing continues with the original request. Use for non-critical enhancements.
func UseFallback ¶
func UseFallback[T Validator](primary pipz.Chainable[*Request[T]], fallbacks ...pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
UseFallback wraps a processor with fallback alternatives. If the primary fails, each fallback is tried in order.
func UseFilter ¶
func UseFilter[T Validator](identity pipz.Identity, condition func(context.Context, *Request[T]) bool, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
UseFilter wraps a processor with a condition. If the condition returns false, the request passes through unchanged.
func UseMutate ¶
func UseMutate[T Validator](identity pipz.Identity, transformer func(context.Context, *Request[T]) *Request[T], condition func(context.Context, *Request[T]) bool) pipz.Chainable[*Request[T]]
UseMutate creates a processor that conditionally transforms the request. The transformer is only applied if the condition returns true.
func UseRateLimit ¶
func UseRateLimit[T Validator](rate float64, burst int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
UseRateLimit wraps a processor with rate limiting. Uses a token bucket algorithm with the specified rate (tokens per second) and burst size. When tokens are exhausted, requests wait for availability.
func UseRetry ¶
func UseRetry[T Validator](maxAttempts int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
UseRetry wraps a processor with retry logic. Failed operations are retried immediately up to maxAttempts times.
Types ¶
type Capacitor ¶
type Capacitor[T Validator] struct { // contains filtered or unexported fields }
Capacitor watches a source for changes, unmarshals and validates the data, and delivers it to application code with automatic rollback on failure.
func New ¶
func New[T Validator]( watcher Watcher, fn func(ctx context.Context, prev, curr T) error, opts ...Option[T], ) *Capacitor[T]
New creates a Capacitor that watches a source for configuration changes.
The watcher emits raw bytes when the source changes. Bytes are automatically unmarshaled to type T using the configured codec. The struct is validated by calling T.Validate(). On success, the callback is invoked with previous and current values.
Pipeline options (With*) configure the processing pipeline. Instance configuration uses chainable methods before calling Start().
Example:
capacitor := flux.New[Config](
file.New("config.json"),
func(ctx context.Context, prev, curr Config) error {
log.Printf("config changed: port %d -> %d", prev.Port, curr.Port)
return nil
},
flux.WithRetry(3),
).Debounce(200 * time.Millisecond)
func (*Capacitor[T]) Clock ¶
Clock sets a custom clock for time operations. Use this with clockz.FakeClock for deterministic debounce testing. Must be called before Start().
func (*Capacitor[T]) Codec ¶
Codec sets the codec for deserializing configuration data. Default: JSONCodec. Must be called before Start().
func (*Capacitor[T]) Current ¶
Current returns the current valid configuration and true, or the zero value and false if no valid configuration has been applied.
func (*Capacitor[T]) Debounce ¶
Debounce sets the debounce duration for change processing. Changes arriving within this duration are coalesced into a single update. Default: 100ms. Must be called before Start().
func (*Capacitor[T]) ErrorHistory ¶
ErrorHistory returns the recent error history, oldest first. Returns nil if error history is not enabled (see WithErrorHistory).
func (*Capacitor[T]) ErrorHistorySize ¶
ErrorHistorySize sets the number of recent errors to retain. When set, ErrorHistory() returns up to this many recent errors. Use 0 (default) to only retain the most recent error via LastError(). Must be called before Start().
func (*Capacitor[T]) LastError ¶
LastError returns the last error encountered, or nil if no error occurred.
func (*Capacitor[T]) Metrics ¶
func (c *Capacitor[T]) Metrics(provider MetricsProvider) *Capacitor[T]
Metrics sets a metrics provider for observability integration. The provider receives callbacks on state changes, processing success/failure, and change events. Must be called before Start().
func (*Capacitor[T]) OnStop ¶
OnStop sets a callback that is invoked when the capacitor stops watching. The callback receives the final state of the capacitor. This is useful for graceful shutdown scenarios where cleanup is needed. Must be called before Start().
func (*Capacitor[T]) Process ¶
Process reads and processes the next value from the watcher. This is only available in sync mode and is used for deterministic testing. Returns false if no value is available or the channel is closed.
func (*Capacitor[T]) Start ¶
Start begins watching for changes. It blocks until the first configuration is processed (success or failure), then continues watching asynchronously.
If the initial configuration fails, Start returns the error but continues watching in the background for valid updates.
In sync mode, Start only processes the initial value. Use Process() to manually trigger processing of subsequent values.
Start can only be called once. Subsequent calls return an error.
func (*Capacitor[T]) StartupTimeout ¶
StartupTimeout sets the maximum duration to wait for the initial configuration value from the watcher. If the watcher fails to emit within this duration, Start() returns an error. Default: no timeout (wait indefinitely). Must be called before Start().
type ChannelWatcher ¶
type ChannelWatcher struct {
// contains filtered or unexported fields
}
ChannelWatcher wraps an existing byte channel as a Watcher. Useful for testing and custom sources that already produce bytes.
func NewChannelWatcher ¶
func NewChannelWatcher(ch <-chan []byte) *ChannelWatcher
NewChannelWatcher creates a ChannelWatcher that forwards values from the given channel through an internal goroutine.
func NewSyncChannelWatcher ¶
func NewSyncChannelWatcher(ch <-chan []byte) *ChannelWatcher
NewSyncChannelWatcher creates a ChannelWatcher that returns the source channel directly without an intermediate goroutine. Use with WithSyncMode() for deterministic testing.
type Codec ¶
type Codec interface {
// Unmarshal deserializes bytes into a value.
Unmarshal(data []byte, v any) error
// ContentType returns the MIME type for observability and debugging.
ContentType() string
}
Codec defines the deserialization contract for configuration data. Implement this interface to use alternative formats like TOML, HCL, or custom binary formats.
type CompositeCapacitor ¶
type CompositeCapacitor[T Validator] struct { // contains filtered or unexported fields }
CompositeCapacitor watches multiple sources, unmarshals and validates each, and delivers the slice of parsed values to a reducer for merging.
func Compose ¶
func Compose[T Validator]( reducer Reducer[T], sources []Watcher, opts ...Option[T], ) *CompositeCapacitor[T]
Compose creates a CompositeCapacitor for multiple sources.
Each source emits raw bytes when it changes. Bytes from each source are automatically unmarshaled to type T using the configured codec. Each parsed value is validated by calling T.Validate(). When all sources are ready, the reducer receives the previous and new slices of parsed values in the same order as the sources. On initial load, prev will be nil. The reducer merges them and returns the final configuration.
Pipeline options (With*) configure the processing pipeline. Instance configuration uses chainable methods before calling Start().
Example:
type Config struct {
Port int `json:"port"`
Timeout int `json:"timeout"`
}
func (c Config) Validate() error {
if c.Port < 1 || c.Port > 65535 {
return errors.New("port must be between 1 and 65535")
}
return nil
}
capacitor := flux.Compose[Config](
func(ctx context.Context, prev, curr []Config) (Config, error) {
merged := curr[0] // defaults
if curr[1].Port != 0 {
merged.Port = curr[1].Port // file overrides
}
return merged, nil
},
[]flux.Watcher{defaultsWatcher, fileWatcher},
).Debounce(200 * time.Millisecond)
func (*CompositeCapacitor[T]) Clock ¶
func (c *CompositeCapacitor[T]) Clock(clock clockz.Clock) *CompositeCapacitor[T]
Clock sets a custom clock for time operations. Use this with clockz.FakeClock for deterministic debounce testing. Must be called before Start().
func (*CompositeCapacitor[T]) Codec ¶
func (c *CompositeCapacitor[T]) Codec(codec Codec) *CompositeCapacitor[T]
Codec sets the codec for deserializing configuration data. Default: JSONCodec. Must be called before Start().
func (*CompositeCapacitor[T]) Current ¶
func (c *CompositeCapacitor[T]) Current() (T, bool)
Current returns the last successfully merged configuration. Unlike the single-source Capacitor, this returns the value produced by the reducer function, representing the merged result of all sources.
func (*CompositeCapacitor[T]) Debounce ¶
func (c *CompositeCapacitor[T]) Debounce(d time.Duration) *CompositeCapacitor[T]
Debounce sets the debounce duration for change processing. Changes arriving within this duration are coalesced into a single update. Default: 100ms. Must be called before Start().
func (*CompositeCapacitor[T]) ErrorHistory ¶
func (c *CompositeCapacitor[T]) ErrorHistory() []error
ErrorHistory returns the recent error history, oldest first. Returns nil if error history is not enabled (see WithErrorHistory).
func (*CompositeCapacitor[T]) ErrorHistorySize ¶
func (c *CompositeCapacitor[T]) ErrorHistorySize(n int) *CompositeCapacitor[T]
ErrorHistorySize sets the number of recent errors to retain. When set, ErrorHistory() returns up to this many recent errors. Use 0 (default) to only retain the most recent error via LastError(). Must be called before Start().
func (*CompositeCapacitor[T]) LastError ¶
func (c *CompositeCapacitor[T]) LastError() error
LastError returns the last error encountered.
func (*CompositeCapacitor[T]) Metrics ¶
func (c *CompositeCapacitor[T]) Metrics(provider MetricsProvider) *CompositeCapacitor[T]
Metrics sets a metrics provider for observability integration. The provider receives callbacks on state changes, processing success/failure, and change events. Must be called before Start().
func (*CompositeCapacitor[T]) OnStop ¶
func (c *CompositeCapacitor[T]) OnStop(fn func(State)) *CompositeCapacitor[T]
OnStop sets a callback that is invoked when the capacitor stops watching. The callback receives the final state of the capacitor. This is useful for graceful shutdown scenarios where cleanup is needed. Must be called before Start().
func (*CompositeCapacitor[T]) Process ¶
func (c *CompositeCapacitor[T]) Process(ctx context.Context) bool
Process manually processes pending changes in sync mode.
func (*CompositeCapacitor[T]) SourceErrors ¶
func (c *CompositeCapacitor[T]) SourceErrors() []SourceError
SourceErrors returns errors from individual sources, if any. This provides granular insight into which sources are failing.
func (*CompositeCapacitor[T]) Start ¶
func (c *CompositeCapacitor[T]) Start(ctx context.Context) error
Start begins watching all sources.
func (*CompositeCapacitor[T]) StartupTimeout ¶
func (c *CompositeCapacitor[T]) StartupTimeout(d time.Duration) *CompositeCapacitor[T]
StartupTimeout sets the maximum duration to wait for the initial configuration value from each source. If any source fails to emit within this duration, Start() returns an error. Default: no timeout (wait indefinitely). Must be called before Start().
func (*CompositeCapacitor[T]) State ¶
func (c *CompositeCapacitor[T]) State() State
State returns the current state of the CompositeCapacitor.
func (*CompositeCapacitor[T]) SyncMode ¶
func (c *CompositeCapacitor[T]) SyncMode() *CompositeCapacitor[T]
SyncMode enables synchronous processing for testing. In sync mode, changes are processed immediately without debouncing or async goroutines, making tests deterministic. Must be called before Start().
type JSONCodec ¶
type JSONCodec struct{}
JSONCodec implements Codec using encoding/json.
func (JSONCodec) ContentType ¶
ContentType returns the JSON MIME type.
type MetricsProvider ¶
type MetricsProvider interface {
// OnStateChange is called when the capacitor transitions between states.
OnStateChange(from, to State)
// OnProcessSuccess is called when a configuration is successfully processed.
// Duration is the time taken to process (unmarshal, validate, callback).
OnProcessSuccess(duration time.Duration)
// OnProcessFailure is called when processing fails at any stage.
// Stage indicates where the failure occurred: "unmarshal", "validate", or "callback".
OnProcessFailure(stage string, duration time.Duration)
// OnChangeReceived is called when raw data is received from the watcher.
OnChangeReceived()
}
MetricsProvider allows integration with metrics systems like Prometheus, StatsD, etc. Implement this interface to receive callbacks on key capacitor events.
type NoOpMetricsProvider ¶
type NoOpMetricsProvider struct{}
NoOpMetricsProvider is a no-op implementation of MetricsProvider. Use this as an embedded type to implement only the methods you need.
func (NoOpMetricsProvider) OnChangeReceived ¶
func (NoOpMetricsProvider) OnChangeReceived()
OnChangeReceived implements MetricsProvider.
func (NoOpMetricsProvider) OnProcessFailure ¶
func (NoOpMetricsProvider) OnProcessFailure(_ string, _ time.Duration)
OnProcessFailure implements MetricsProvider.
func (NoOpMetricsProvider) OnProcessSuccess ¶
func (NoOpMetricsProvider) OnProcessSuccess(_ time.Duration)
OnProcessSuccess implements MetricsProvider.
func (NoOpMetricsProvider) OnStateChange ¶
func (NoOpMetricsProvider) OnStateChange(_, _ State)
OnStateChange implements MetricsProvider.
type Option ¶
Option configures the processing pipeline for a Capacitor or CompositeCapacitor. Pipeline options wrap the callback with middleware for retry, timeout, circuit breaking, and other reliability patterns.
Instance configuration (debounce, sync mode, codec, etc.) is handled via chainable methods on the Capacitor/CompositeCapacitor before calling Start().
func WithBackoff ¶
WithBackoff wraps the pipeline with exponential backoff retry logic. Failed operations are retried with increasing delays: baseDelay, 2*baseDelay, 4*baseDelay, etc.
func WithCircuitBreaker ¶
WithCircuitBreaker wraps the pipeline with circuit breaker protection. After 'failures' consecutive failures, the circuit opens and rejects further requests until 'recovery' time has passed.
The circuit breaker has three states:
- Closed: Normal operation, requests pass through
- Open: After threshold failures, requests are rejected immediately
- Half-Open: After recovery timeout, one request is allowed to test recovery
Note: Circuit breaker is stateful and protects the entire pipeline. There is no Use* equivalent - it only makes sense as a wrapper.
func WithErrorHandler ¶
WithErrorHandler adds error observation to the pipeline. Errors are passed to the handler for logging, metrics, or alerting, but the error still propagates. Use this for observability, not recovery.
Note: There is no Use* equivalent - error handling wraps the pipeline.
func WithFallback ¶
WithFallback wraps the pipeline with fallback processors. If the primary pipeline fails, each fallback is tried in order until one succeeds.
func WithMiddleware ¶
WithMiddleware wraps the pipeline with a sequence of processors. Processors execute in order, with the wrapped pipeline (callback) last.
Use the Use* functions to create processors for common patterns, or provide custom pipz.Chainable implementations directly.
Example:
flux.New[Config](
watcher,
callback,
flux.WithMiddleware(
flux.UseEffect[Config]("log", logFn),
flux.UseApply[Config]("enrich", enrichFn),
flux.UseRateLimit[Config](10, 5),
),
flux.WithCircuitBreaker[Config](5, 30*time.Second),
).Debounce(200 * time.Millisecond)
func WithPipeline ¶
WithPipeline wraps the entire processing pipeline with a pipz.Pipeline for correlated tracing. Each Process() call generates a unique execution ID, while the pipeline ID remains stable (derived from the identity).
Use pipz.ExecutionIDFromContext and pipz.PipelineIDFromContext in middleware or signal handlers to extract correlation IDs for observability.
This option should typically be applied last (outermost) to ensure all nested processors have access to the correlation context.
Example:
var configPipelineID = pipz.NewIdentity("myapp:config", "Configuration pipeline")
capacitor := flux.New[Config](
watcher,
callback,
flux.WithRetry[Config](3),
flux.WithPipeline[Config](configPipelineID),
)
type Reducer ¶
Reducer merges multiple configuration sources into a single configuration. It receives the previous merged values (nil on first call) and the current parsed values from each source in the same order as the sources were provided.
type Request ¶
type Request[T Validator] struct { // Previous is the last successfully applied configuration. // On initial load, this will be the zero value of T. Previous T // Current is the newly parsed and validated configuration. // Pipeline stages may modify this value before it is stored. Current T // Raw contains the original bytes received from the watcher. // This is useful for debugging or logging purposes. Raw []byte }
Request carries configuration data through the processing pipeline. It provides access to both the previous and current configuration values, allowing pipeline stages to make decisions based on what changed.
type SourceError ¶
SourceError represents an error from a specific source in a CompositeCapacitor.
type State ¶
type State int32
State represents the current state of a Capacitor.
const ( // StateLoading indicates the Capacitor is initializing and has not yet // processed any configuration. StateLoading State = iota // StateHealthy indicates the Capacitor has a valid configuration applied. StateHealthy // StateDegraded indicates the last configuration change failed validation // or application. The previous valid configuration remains active. StateDegraded // StateEmpty indicates the initial configuration load failed and no valid // configuration has ever been obtained. The Capacitor continues watching // for valid updates. StateEmpty )
type Validator ¶
type Validator interface {
Validate() error
}
Validator is the interface that configuration types must implement. This allows users to define their own validation logic.
type Watcher ¶
type Watcher interface {
// Watch begins observing the source and returns a channel that emits
// raw bytes when changes occur. The channel is closed when the context
// is canceled or an unrecoverable error occurs.
//
// Implementations should emit the current value immediately to support
// initial configuration loading.
Watch(ctx context.Context) (<-chan []byte, error)
}
Watcher observes a source for changes and emits raw bytes on a channel. Implementations must emit the current value immediately upon Watch() being called to support initial configuration loading.
type YAMLCodec ¶
type YAMLCodec struct{}
YAMLCodec implements Codec using gopkg.in/yaml.v3.
func (YAMLCodec) ContentType ¶
ContentType returns the YAML MIME type.