flux

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2026 License: MIT Imports: 10 Imported by: 0

README

flux

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

Reactive configuration synchronization for Go.

Watch external sources, validate changes, and apply them safely with automatic rollback on failure.

Watch, Validate, Apply

A capacitor watches a source, validates incoming data, and calls you back when it changes.

type Config struct {
    Port int    `json:"port"`
    Host string `json:"host"`
}

func (c Config) Validate() error {
    if c.Port < 1 || c.Port > 65535 {
        return errors.New("invalid port")
    }
    return nil
}

capacitor := flux.New[Config](
    file.New("/etc/app/config.json"),
    func(ctx context.Context, prev, curr Config) error {
        log.Printf("Port changed: %d -> %d", prev.Port, curr.Port)
        return reconfigureServer(curr)
    },
)

Start once, react forever. Invalid config is rejected, previous valid config is retained.

if err := capacitor.Start(ctx); err != nil {
    log.Fatal("Initial load failed:", err)
}

// State machine tracks health
capacitor.State()     // Loading -> Healthy -> Degraded -> Empty

// Current config always available
if cfg, ok := capacitor.Current(); ok {
    server.Listen(cfg.Host, cfg.Port)
}

// Last error for observability
if err := capacitor.LastError(); err != nil {
    metrics.RecordConfigError(err)
}

Multiple sources? Compose them with a reducer.

capacitor := flux.Compose[Config](
    func(ctx context.Context, prev, curr []Config) (Config, error) {
        // Merge defaults, file config, and environment overrides
        return mergeConfigs(curr[0], curr[1], curr[2]), nil
    },
    file.New("/etc/app/defaults.json"),
    file.New("/etc/app/config.json"),
    env.New("APP_"),
)

Same guarantees: validate first, reject invalid, retain previous, call back with valid.

Install

go get github.com/zoobz-io/flux

Requires Go 1.24+.

Quick Start

package main

import (
    "context"
    "errors"
    "log"

    "github.com/zoobz-io/flux"
    "github.com/zoobz-io/flux/file"
)

type Config struct {
    Port int    `json:"port"`
    Host string `json:"host"`
}

func (c Config) Validate() error {
    if c.Port < 1 || c.Port > 65535 {
        return errors.New("port must be between 1 and 65535")
    }
    if c.Host == "" {
        return errors.New("host is required")
    }
    return nil
}

func main() {
    ctx := context.Background()

    capacitor := flux.New[Config](
        file.New("/etc/myapp/config.json"),
        func(ctx context.Context, prev, curr Config) error {
            log.Printf("Config updated: %+v", curr)
            return nil
        },
    )

    if err := capacitor.Start(ctx); err != nil {
        log.Fatalf("Initial load failed: %v", err)
    }

    log.Printf("State: %s", capacitor.State())

    if cfg, ok := capacitor.Current(); ok {
        log.Printf("Listening on %s:%d", cfg.Host, cfg.Port)
    }

    // Capacitor watches in background until context is cancelled
    <-ctx.Done()
}

Capabilities

Feature Description Docs
State Machine Loading, Healthy, Degraded, Empty with clear transitions Concepts
Multi-Source Composition Merge configs from multiple watchers with custom reducers Multi-Source
Pluggable Providers File, Redis, Consul, etcd, NATS, Kubernetes, ZooKeeper, Firestore Providers
Validation Pipeline Type-safe validation with automatic rejection and rollback Architecture
Debouncing Configurable delay to batch rapid changes Best Practices
Signal Observability State changes and errors via capitan Fields
Testing Utilities Sync mode and channel watchers for deterministic tests Testing

Why flux?

  • Safe by default — Invalid config rejected, previous retained, callback only sees valid data
  • Four-state machine — Loading, Healthy, Degraded, Empty with clear transitions
  • Multi-source composition — Merge configs from files, Redis, Kubernetes, environment
  • Pluggable providers — File, Redis, Consul, etcd, NATS, Kubernetes, ZooKeeper, Firestore
  • Observablecapitan signals for state changes and failures
  • Testable — Sync mode and channel watchers for deterministic tests

Configuration as a Service

Flux enables a pattern: define once, update anywhere, validate always.

Your configuration lives in external sources — files, Redis, Kubernetes ConfigMaps. Flux watches, validates, and delivers changes. Your application just reacts.

// In your infrastructure
configMap := &corev1.ConfigMap{
    Data: map[string]string{
        "config": `{"port": 8080, "host": "0.0.0.0"}`,
    },
}

// In your application
capacitor := flux.New[Config](
    kubernetes.New(client, "default", "app-config", "config"),
    func(ctx context.Context, prev, curr Config) error {
        return server.Reconfigure(curr)
    },
)

Update the ConfigMap, flux delivers the change. Invalid update? Rejected. Previous config retained. Your application never sees bad data.

Documentation

  • Overview — Design philosophy and architecture
Learn
Guides
  • Testing — Sync mode, channel watchers, deterministic tests
  • Providers — Configuring file, Redis, Kubernetes, and other watchers
  • State Management — State transitions, error recovery, circuit breakers
  • Best Practices — Validation design, graceful degradation, observability
Cookbook
Reference

Contributing

See CONTRIBUTING.md for guidelines. Run make help for available commands.

License

MIT License — see LICENSE for details.

Documentation

Overview

Package flux provides reactive configuration synchronization primitives.

The core type is Capacitor, which watches external sources for changes, deserializes and validates the data, and delivers it to application code with automatic rollback on failure.

Capacitor

A Capacitor monitors a source for changes and processes them through a pipeline:

Source → Deserialize → Validate → Pipeline → Store

If any step fails, the previous valid configuration is retained and the Capacitor enters a degraded state while continuing to watch for valid updates.

Validation

Configuration types must implement the Validator interface:

type Validator interface {
    Validate() error
}

This gives full control over validation logic. For simple cases, you can delegate to a validation library like go-playground/validator within your Validate method.

State Machine

Capacitor maintains one of four states:

  • Loading: Initial state, no config yet
  • Healthy: Valid config applied
  • Degraded: Last change failed, previous config still active
  • Empty: Initial load failed, no valid config ever obtained

Watchers

The Watcher interface abstracts change sources. The core package provides ChannelWatcher for testing. Additional watchers are available in pkg/:

  • pkg/file: File watcher using fsnotify
  • pkg/redis: Redis keyspace notifications
  • pkg/consul: Consul blocking queries
  • pkg/etcd: etcd Watch API
  • pkg/nats: NATS JetStream KV
  • pkg/kubernetes: ConfigMap/Secret watch
  • pkg/zookeeper: ZooKeeper node watch
  • pkg/firestore: Firestore realtime listeners

Example

type AppConfig struct {
    Port int    `json:"port"`
    Host string `json:"host"`
}

func (c AppConfig) Validate() error {
    if c.Port < 1 || c.Port > 65535 {
        return errors.New("port must be between 1 and 65535")
    }
    if c.Host == "" {
        return errors.New("host is required")
    }
    return nil
}

capacitor := flux.New[AppConfig](
    file.New("/etc/myapp/config.json"),
    func(ctx context.Context, prev, curr AppConfig) error {
        log.Printf("config changed: %+v -> %+v", prev, curr)
        return app.Reconfigure(curr)
    },
)

if err := capacitor.Start(ctx); err != nil {
    log.Printf("initial config failed: %v", err)
}

Index

Constants

View Source
const DefaultDebounce = 100 * time.Millisecond

DefaultDebounce is the default debounce duration for change processing.

Variables

View Source
var (
	// KeyState is the current state of the Capacitor.
	KeyState = capitan.NewStringKey("state")

	// KeyOldState is the previous state before a transition.
	KeyOldState = capitan.NewStringKey("old_state")

	// KeyNewState is the new state after a transition.
	KeyNewState = capitan.NewStringKey("new_state")

	// KeyError is the error message when an operation fails.
	KeyError = capitan.NewStringKey("error")

	// KeyDebounce is the configured debounce duration.
	KeyDebounce = capitan.NewDurationKey("debounce")
)

Field keys for Capacitor events.

View Source
var (
	// CapacitorStarted is emitted when a Capacitor begins watching.
	CapacitorStarted = capitan.NewSignal(
		"flux.capacitor.started",
		"Capacitor watching started",
	)

	// CapacitorStopped is emitted when a Capacitor stops watching.
	CapacitorStopped = capitan.NewSignal(
		"flux.capacitor.stopped",
		"Capacitor watching stopped",
	)

	// CapacitorStateChanged is emitted when a Capacitor transitions between states.
	CapacitorStateChanged = capitan.NewSignal(
		"flux.capacitor.state.changed",
		"Capacitor state transition",
	)
)

Capacitor lifecycle signals.

View Source
var (
	// CapacitorChangeReceived is emitted when raw data is received from the watcher.
	CapacitorChangeReceived = capitan.NewSignal(
		"flux.capacitor.change.received",
		"Raw change received from watcher",
	)

	// CapacitorTransformFailed is emitted when the transform function fails.
	CapacitorTransformFailed = capitan.NewSignal(
		"flux.capacitor.transform.failed",
		"Transform function failed",
	)

	// CapacitorValidationFailed is emitted when validation fails.
	CapacitorValidationFailed = capitan.NewSignal(
		"flux.capacitor.validation.failed",
		"Validation failed",
	)

	// CapacitorApplyFailed is emitted when the apply function fails.
	CapacitorApplyFailed = capitan.NewSignal(
		"flux.capacitor.apply.failed",
		"Apply function failed",
	)

	// CapacitorApplySucceeded is emitted when configuration is successfully applied.
	CapacitorApplySucceeded = capitan.NewSignal(
		"flux.capacitor.apply.succeeded",
		"Config applied successfully",
	)
)

Change processing signals.

Functions

func UseApply

func UseApply[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) (*Request[T], error)) pipz.Chainable[*Request[T]]

UseApply creates a processor that can transform the request and fail. Use for operations like enrichment, validation, or transformation that may produce errors.

func UseBackoff

func UseBackoff[T Validator](maxAttempts int, baseDelay time.Duration, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

UseBackoff wraps a processor with exponential backoff retry logic. Failed operations are retried with increasing delays.

func UseEffect

func UseEffect[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) error) pipz.Chainable[*Request[T]]

UseEffect creates a processor that performs a side effect. The request passes through unchanged. Use for logging, metrics, or notifications that should not affect the configuration value.

func UseEnrich

func UseEnrich[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) (*Request[T], error)) pipz.Chainable[*Request[T]]

UseEnrich creates a processor that attempts optional enhancement. If the enrichment fails, the error is logged but processing continues with the original request. Use for non-critical enhancements.

func UseFallback

func UseFallback[T Validator](primary pipz.Chainable[*Request[T]], fallbacks ...pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

UseFallback wraps a processor with fallback alternatives. If the primary fails, each fallback is tried in order.

func UseFilter

func UseFilter[T Validator](identity pipz.Identity, condition func(context.Context, *Request[T]) bool, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

UseFilter wraps a processor with a condition. If the condition returns false, the request passes through unchanged.

func UseMutate

func UseMutate[T Validator](identity pipz.Identity, transformer func(context.Context, *Request[T]) *Request[T], condition func(context.Context, *Request[T]) bool) pipz.Chainable[*Request[T]]

UseMutate creates a processor that conditionally transforms the request. The transformer is only applied if the condition returns true.

func UseRateLimit

func UseRateLimit[T Validator](rate float64, burst int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

UseRateLimit wraps a processor with rate limiting. Uses a token bucket algorithm with the specified rate (tokens per second) and burst size. When tokens are exhausted, requests wait for availability.

func UseRetry

func UseRetry[T Validator](maxAttempts int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

UseRetry wraps a processor with retry logic. Failed operations are retried immediately up to maxAttempts times.

func UseTimeout

func UseTimeout[T Validator](d time.Duration, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

UseTimeout wraps a processor with a deadline. If processing takes longer than the specified duration, the operation fails.

func UseTransform

func UseTransform[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) *Request[T]) pipz.Chainable[*Request[T]]

UseTransform creates a processor that transforms the request. Cannot fail. Use for pure transformations that always succeed.

Types

type Capacitor

type Capacitor[T Validator] struct {
	// contains filtered or unexported fields
}

Capacitor watches a source for changes, unmarshals and validates the data, and delivers it to application code with automatic rollback on failure.

func New

func New[T Validator](
	watcher Watcher,
	fn func(ctx context.Context, prev, curr T) error,
	opts ...Option[T],
) *Capacitor[T]

New creates a Capacitor that watches a source for configuration changes.

The watcher emits raw bytes when the source changes. Bytes are automatically unmarshaled to type T using the configured codec. The struct is validated by calling T.Validate(). On success, the callback is invoked with previous and current values.

Pipeline options (With*) configure the processing pipeline. Instance configuration uses chainable methods before calling Start().

Example:

capacitor := flux.New[Config](
    file.New("config.json"),
    func(ctx context.Context, prev, curr Config) error {
        log.Printf("config changed: port %d -> %d", prev.Port, curr.Port)
        return nil
    },
    flux.WithRetry(3),
).Debounce(200 * time.Millisecond)

func (*Capacitor[T]) Clock

func (c *Capacitor[T]) Clock(clock clockz.Clock) *Capacitor[T]

Clock sets a custom clock for time operations. Use this with clockz.FakeClock for deterministic debounce testing. Must be called before Start().

func (*Capacitor[T]) Codec

func (c *Capacitor[T]) Codec(codec Codec) *Capacitor[T]

Codec sets the codec for deserializing configuration data. Default: JSONCodec. Must be called before Start().

func (*Capacitor[T]) Current

func (c *Capacitor[T]) Current() (T, bool)

Current returns the current valid configuration and true, or the zero value and false if no valid configuration has been applied.

func (*Capacitor[T]) Debounce

func (c *Capacitor[T]) Debounce(d time.Duration) *Capacitor[T]

Debounce sets the debounce duration for change processing. Changes arriving within this duration are coalesced into a single update. Default: 100ms. Must be called before Start().

func (*Capacitor[T]) ErrorHistory

func (c *Capacitor[T]) ErrorHistory() []error

ErrorHistory returns the recent error history, oldest first. Returns nil if error history is not enabled (see WithErrorHistory).

func (*Capacitor[T]) ErrorHistorySize

func (c *Capacitor[T]) ErrorHistorySize(n int) *Capacitor[T]

ErrorHistorySize sets the number of recent errors to retain. When set, ErrorHistory() returns up to this many recent errors. Use 0 (default) to only retain the most recent error via LastError(). Must be called before Start().

func (*Capacitor[T]) LastError

func (c *Capacitor[T]) LastError() error

LastError returns the last error encountered, or nil if no error occurred.

func (*Capacitor[T]) Metrics

func (c *Capacitor[T]) Metrics(provider MetricsProvider) *Capacitor[T]

Metrics sets a metrics provider for observability integration. The provider receives callbacks on state changes, processing success/failure, and change events. Must be called before Start().

func (*Capacitor[T]) OnStop

func (c *Capacitor[T]) OnStop(fn func(State)) *Capacitor[T]

OnStop sets a callback that is invoked when the capacitor stops watching. The callback receives the final state of the capacitor. This is useful for graceful shutdown scenarios where cleanup is needed. Must be called before Start().

func (*Capacitor[T]) Process

func (c *Capacitor[T]) Process(ctx context.Context) bool

Process reads and processes the next value from the watcher. This is only available in sync mode and is used for deterministic testing. Returns false if no value is available or the channel is closed.

func (*Capacitor[T]) Start

func (c *Capacitor[T]) Start(ctx context.Context) error

Start begins watching for changes. It blocks until the first configuration is processed (success or failure), then continues watching asynchronously.

If the initial configuration fails, Start returns the error but continues watching in the background for valid updates.

In sync mode, Start only processes the initial value. Use Process() to manually trigger processing of subsequent values.

Start can only be called once. Subsequent calls return an error.

func (*Capacitor[T]) StartupTimeout

func (c *Capacitor[T]) StartupTimeout(d time.Duration) *Capacitor[T]

StartupTimeout sets the maximum duration to wait for the initial configuration value from the watcher. If the watcher fails to emit within this duration, Start() returns an error. Default: no timeout (wait indefinitely). Must be called before Start().

func (*Capacitor[T]) State

func (c *Capacitor[T]) State() State

State returns the current state of the Capacitor.

func (*Capacitor[T]) SyncMode

func (c *Capacitor[T]) SyncMode() *Capacitor[T]

SyncMode enables synchronous processing for testing. In sync mode, changes are processed immediately without debouncing or async goroutines, making tests deterministic. Must be called before Start().

type ChannelWatcher

type ChannelWatcher struct {
	// contains filtered or unexported fields
}

ChannelWatcher wraps an existing byte channel as a Watcher. Useful for testing and custom sources that already produce bytes.

func NewChannelWatcher

func NewChannelWatcher(ch <-chan []byte) *ChannelWatcher

NewChannelWatcher creates a ChannelWatcher that forwards values from the given channel through an internal goroutine.

func NewSyncChannelWatcher

func NewSyncChannelWatcher(ch <-chan []byte) *ChannelWatcher

NewSyncChannelWatcher creates a ChannelWatcher that returns the source channel directly without an intermediate goroutine. Use with WithSyncMode() for deterministic testing.

func (*ChannelWatcher) Watch

func (w *ChannelWatcher) Watch(ctx context.Context) (<-chan []byte, error)

Watch returns a channel that emits values from the wrapped channel.

type Codec

type Codec interface {
	// Unmarshal deserializes bytes into a value.
	Unmarshal(data []byte, v any) error

	// ContentType returns the MIME type for observability and debugging.
	ContentType() string
}

Codec defines the deserialization contract for configuration data. Implement this interface to use alternative formats like TOML, HCL, or custom binary formats.

type CompositeCapacitor

type CompositeCapacitor[T Validator] struct {
	// contains filtered or unexported fields
}

CompositeCapacitor watches multiple sources, unmarshals and validates each, and delivers the slice of parsed values to a reducer for merging.

func Compose

func Compose[T Validator](
	reducer Reducer[T],
	sources []Watcher,
	opts ...Option[T],
) *CompositeCapacitor[T]

Compose creates a CompositeCapacitor for multiple sources.

Each source emits raw bytes when it changes. Bytes from each source are automatically unmarshaled to type T using the configured codec. Each parsed value is validated by calling T.Validate(). When all sources are ready, the reducer receives the previous and new slices of parsed values in the same order as the sources. On initial load, prev will be nil. The reducer merges them and returns the final configuration.

Pipeline options (With*) configure the processing pipeline. Instance configuration uses chainable methods before calling Start().

Example:

type Config struct {
    Port    int `json:"port"`
    Timeout int `json:"timeout"`
}

func (c Config) Validate() error {
    if c.Port < 1 || c.Port > 65535 {
        return errors.New("port must be between 1 and 65535")
    }
    return nil
}

capacitor := flux.Compose[Config](
    func(ctx context.Context, prev, curr []Config) (Config, error) {
        merged := curr[0]  // defaults
        if curr[1].Port != 0 {
            merged.Port = curr[1].Port  // file overrides
        }
        return merged, nil
    },
    []flux.Watcher{defaultsWatcher, fileWatcher},
).Debounce(200 * time.Millisecond)

func (*CompositeCapacitor[T]) Clock

func (c *CompositeCapacitor[T]) Clock(clock clockz.Clock) *CompositeCapacitor[T]

Clock sets a custom clock for time operations. Use this with clockz.FakeClock for deterministic debounce testing. Must be called before Start().

func (*CompositeCapacitor[T]) Codec

func (c *CompositeCapacitor[T]) Codec(codec Codec) *CompositeCapacitor[T]

Codec sets the codec for deserializing configuration data. Default: JSONCodec. Must be called before Start().

func (*CompositeCapacitor[T]) Current

func (c *CompositeCapacitor[T]) Current() (T, bool)

Current returns the last successfully merged configuration. Unlike the single-source Capacitor, this returns the value produced by the reducer function, representing the merged result of all sources.

func (*CompositeCapacitor[T]) Debounce

func (c *CompositeCapacitor[T]) Debounce(d time.Duration) *CompositeCapacitor[T]

Debounce sets the debounce duration for change processing. Changes arriving within this duration are coalesced into a single update. Default: 100ms. Must be called before Start().

func (*CompositeCapacitor[T]) ErrorHistory

func (c *CompositeCapacitor[T]) ErrorHistory() []error

ErrorHistory returns the recent error history, oldest first. Returns nil if error history is not enabled (see WithErrorHistory).

func (*CompositeCapacitor[T]) ErrorHistorySize

func (c *CompositeCapacitor[T]) ErrorHistorySize(n int) *CompositeCapacitor[T]

ErrorHistorySize sets the number of recent errors to retain. When set, ErrorHistory() returns up to this many recent errors. Use 0 (default) to only retain the most recent error via LastError(). Must be called before Start().

func (*CompositeCapacitor[T]) LastError

func (c *CompositeCapacitor[T]) LastError() error

LastError returns the last error encountered.

func (*CompositeCapacitor[T]) Metrics

func (c *CompositeCapacitor[T]) Metrics(provider MetricsProvider) *CompositeCapacitor[T]

Metrics sets a metrics provider for observability integration. The provider receives callbacks on state changes, processing success/failure, and change events. Must be called before Start().

func (*CompositeCapacitor[T]) OnStop

func (c *CompositeCapacitor[T]) OnStop(fn func(State)) *CompositeCapacitor[T]

OnStop sets a callback that is invoked when the capacitor stops watching. The callback receives the final state of the capacitor. This is useful for graceful shutdown scenarios where cleanup is needed. Must be called before Start().

func (*CompositeCapacitor[T]) Process

func (c *CompositeCapacitor[T]) Process(ctx context.Context) bool

Process manually processes pending changes in sync mode.

func (*CompositeCapacitor[T]) SourceErrors

func (c *CompositeCapacitor[T]) SourceErrors() []SourceError

SourceErrors returns errors from individual sources, if any. This provides granular insight into which sources are failing.

func (*CompositeCapacitor[T]) Start

func (c *CompositeCapacitor[T]) Start(ctx context.Context) error

Start begins watching all sources.

func (*CompositeCapacitor[T]) StartupTimeout

func (c *CompositeCapacitor[T]) StartupTimeout(d time.Duration) *CompositeCapacitor[T]

StartupTimeout sets the maximum duration to wait for the initial configuration value from each source. If any source fails to emit within this duration, Start() returns an error. Default: no timeout (wait indefinitely). Must be called before Start().

func (*CompositeCapacitor[T]) State

func (c *CompositeCapacitor[T]) State() State

State returns the current state of the CompositeCapacitor.

func (*CompositeCapacitor[T]) SyncMode

func (c *CompositeCapacitor[T]) SyncMode() *CompositeCapacitor[T]

SyncMode enables synchronous processing for testing. In sync mode, changes are processed immediately without debouncing or async goroutines, making tests deterministic. Must be called before Start().

type JSONCodec

type JSONCodec struct{}

JSONCodec implements Codec using encoding/json.

func (JSONCodec) ContentType

func (JSONCodec) ContentType() string

ContentType returns the JSON MIME type.

func (JSONCodec) Unmarshal

func (JSONCodec) Unmarshal(data []byte, v any) error

Unmarshal deserializes JSON bytes into v.

type MetricsProvider

type MetricsProvider interface {
	// OnStateChange is called when the capacitor transitions between states.
	OnStateChange(from, to State)

	// OnProcessSuccess is called when a configuration is successfully processed.
	// Duration is the time taken to process (unmarshal, validate, callback).
	OnProcessSuccess(duration time.Duration)

	// OnProcessFailure is called when processing fails at any stage.
	// Stage indicates where the failure occurred: "unmarshal", "validate", or "callback".
	OnProcessFailure(stage string, duration time.Duration)

	// OnChangeReceived is called when raw data is received from the watcher.
	OnChangeReceived()
}

MetricsProvider allows integration with metrics systems like Prometheus, StatsD, etc. Implement this interface to receive callbacks on key capacitor events.

type NoOpMetricsProvider

type NoOpMetricsProvider struct{}

NoOpMetricsProvider is a no-op implementation of MetricsProvider. Use this as an embedded type to implement only the methods you need.

func (NoOpMetricsProvider) OnChangeReceived

func (NoOpMetricsProvider) OnChangeReceived()

OnChangeReceived implements MetricsProvider.

func (NoOpMetricsProvider) OnProcessFailure

func (NoOpMetricsProvider) OnProcessFailure(_ string, _ time.Duration)

OnProcessFailure implements MetricsProvider.

func (NoOpMetricsProvider) OnProcessSuccess

func (NoOpMetricsProvider) OnProcessSuccess(_ time.Duration)

OnProcessSuccess implements MetricsProvider.

func (NoOpMetricsProvider) OnStateChange

func (NoOpMetricsProvider) OnStateChange(_, _ State)

OnStateChange implements MetricsProvider.

type Option

type Option[T Validator] func(pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

Option configures the processing pipeline for a Capacitor or CompositeCapacitor. Pipeline options wrap the callback with middleware for retry, timeout, circuit breaking, and other reliability patterns.

Instance configuration (debounce, sync mode, codec, etc.) is handled via chainable methods on the Capacitor/CompositeCapacitor before calling Start().

func WithBackoff

func WithBackoff[T Validator](maxAttempts int, baseDelay time.Duration) Option[T]

WithBackoff wraps the pipeline with exponential backoff retry logic. Failed operations are retried with increasing delays: baseDelay, 2*baseDelay, 4*baseDelay, etc.

func WithCircuitBreaker

func WithCircuitBreaker[T Validator](failures int, recovery time.Duration) Option[T]

WithCircuitBreaker wraps the pipeline with circuit breaker protection. After 'failures' consecutive failures, the circuit opens and rejects further requests until 'recovery' time has passed.

The circuit breaker has three states:

  • Closed: Normal operation, requests pass through
  • Open: After threshold failures, requests are rejected immediately
  • Half-Open: After recovery timeout, one request is allowed to test recovery

Note: Circuit breaker is stateful and protects the entire pipeline. There is no Use* equivalent - it only makes sense as a wrapper.

func WithErrorHandler

func WithErrorHandler[T Validator](handler pipz.Chainable[*pipz.Error[*Request[T]]]) Option[T]

WithErrorHandler adds error observation to the pipeline. Errors are passed to the handler for logging, metrics, or alerting, but the error still propagates. Use this for observability, not recovery.

Note: There is no Use* equivalent - error handling wraps the pipeline.

func WithFallback

func WithFallback[T Validator](fallbacks ...pipz.Chainable[*Request[T]]) Option[T]

WithFallback wraps the pipeline with fallback processors. If the primary pipeline fails, each fallback is tried in order until one succeeds.

func WithMiddleware

func WithMiddleware[T Validator](processors ...pipz.Chainable[*Request[T]]) Option[T]

WithMiddleware wraps the pipeline with a sequence of processors. Processors execute in order, with the wrapped pipeline (callback) last.

Use the Use* functions to create processors for common patterns, or provide custom pipz.Chainable implementations directly.

Example:

flux.New[Config](
    watcher,
    callback,
    flux.WithMiddleware(
        flux.UseEffect[Config]("log", logFn),
        flux.UseApply[Config]("enrich", enrichFn),
        flux.UseRateLimit[Config](10, 5),
    ),
    flux.WithCircuitBreaker[Config](5, 30*time.Second),
).Debounce(200 * time.Millisecond)

func WithPipeline

func WithPipeline[T Validator](identity pipz.Identity) Option[T]

WithPipeline wraps the entire processing pipeline with a pipz.Pipeline for correlated tracing. Each Process() call generates a unique execution ID, while the pipeline ID remains stable (derived from the identity).

Use pipz.ExecutionIDFromContext and pipz.PipelineIDFromContext in middleware or signal handlers to extract correlation IDs for observability.

This option should typically be applied last (outermost) to ensure all nested processors have access to the correlation context.

Example:

var configPipelineID = pipz.NewIdentity("myapp:config", "Configuration pipeline")

capacitor := flux.New[Config](
    watcher,
    callback,
    flux.WithRetry[Config](3),
    flux.WithPipeline[Config](configPipelineID),
)

func WithRetry

func WithRetry[T Validator](maxAttempts int) Option[T]

WithRetry wraps the pipeline with retry logic. Failed operations are retried immediately up to maxAttempts times. For exponential backoff between retries, use WithBackoff instead.

func WithTimeout

func WithTimeout[T Validator](d time.Duration) Option[T]

WithTimeout wraps the pipeline with a timeout. If processing takes longer than the specified duration, the operation fails with a timeout error.

type Reducer

type Reducer[T Validator] func(ctx context.Context, prev, curr []T) (T, error)

Reducer merges multiple configuration sources into a single configuration. It receives the previous merged values (nil on first call) and the current parsed values from each source in the same order as the sources were provided.

type Request

type Request[T Validator] struct {
	// Previous is the last successfully applied configuration.
	// On initial load, this will be the zero value of T.
	Previous T

	// Current is the newly parsed and validated configuration.
	// Pipeline stages may modify this value before it is stored.
	Current T

	// Raw contains the original bytes received from the watcher.
	// This is useful for debugging or logging purposes.
	Raw []byte
}

Request carries configuration data through the processing pipeline. It provides access to both the previous and current configuration values, allowing pipeline stages to make decisions based on what changed.

type SourceError

type SourceError struct {
	Index int
	Error error
}

SourceError represents an error from a specific source in a CompositeCapacitor.

type State

type State int32

State represents the current state of a Capacitor.

const (
	// StateLoading indicates the Capacitor is initializing and has not yet
	// processed any configuration.
	StateLoading State = iota

	// StateHealthy indicates the Capacitor has a valid configuration applied.
	StateHealthy

	// StateDegraded indicates the last configuration change failed validation
	// or application. The previous valid configuration remains active.
	StateDegraded

	// StateEmpty indicates the initial configuration load failed and no valid
	// configuration has ever been obtained. The Capacitor continues watching
	// for valid updates.
	StateEmpty
)

func (State) String

func (s State) String() string

String returns the string representation of the state.

type Validator

type Validator interface {
	Validate() error
}

Validator is the interface that configuration types must implement. This allows users to define their own validation logic.

type Watcher

type Watcher interface {
	// Watch begins observing the source and returns a channel that emits
	// raw bytes when changes occur. The channel is closed when the context
	// is canceled or an unrecoverable error occurs.
	//
	// Implementations should emit the current value immediately to support
	// initial configuration loading.
	Watch(ctx context.Context) (<-chan []byte, error)
}

Watcher observes a source for changes and emits raw bytes on a channel. Implementations must emit the current value immediately upon Watch() being called to support initial configuration loading.

type YAMLCodec

type YAMLCodec struct{}

YAMLCodec implements Codec using gopkg.in/yaml.v3.

func (YAMLCodec) ContentType

func (YAMLCodec) ContentType() string

ContentType returns the YAML MIME type.

func (YAMLCodec) Unmarshal

func (YAMLCodec) Unmarshal(data []byte, v any) error

Unmarshal deserializes YAML bytes into v.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL