Documentation
¶
Index ¶
- Variables
- func GlobalEmit(ctx context.Context, body []byte, attrKVs ...any) error
- func IsGlobalSignerSet() bool
- func NewAuthHeaderProvider(cfg AuthConfig) (chipingress.HeaderProvider, error)
- func SetGlobalEmitter(d *DurableEmitter)
- func SetGlobalSigner(signer Signer)
- type AuthConfig
- type BatchEmitter
- type BatchInserter
- type Config
- type DurableEmitter
- type DurableEmitterMetricsConfig
- type DurableEvent
- type DurableEventStore
- type DurableQueueObserver
- type DurableQueueStats
- type Hooks
- type PgDurableEventStore
- func (s *PgDurableEventStore) Delete(ctx context.Context, id int64) error
- func (s *PgDurableEventStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error)
- func (s *PgDurableEventStore) Insert(ctx context.Context, payload []byte) (int64, error)
- func (s *PgDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error)
- func (s *PgDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error)
- func (s *PgDurableEventStore) MarkDelivered(ctx context.Context, id int64) error
- func (s *PgDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error)
- func (s *PgDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error)
- func (s *PgDurableEventStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error)
- type SetupConfig
- type Signer
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotInitialized = errors.New("durable emitter not initialized") ErrEmitFailed = errors.New("durable emitter emit failed") )
Functions ¶
func GlobalEmit ¶
GlobalEmit emits an event via the global DurableEmitter.
func IsGlobalSignerSet ¶
func IsGlobalSignerSet() bool
IsGlobalSignerSet reports whether rotating DurableEmitter auth has a signer configured.
func NewAuthHeaderProvider ¶
func NewAuthHeaderProvider(cfg AuthConfig) (chipingress.HeaderProvider, error)
NewAuthHeaderProvider builds a chip ingress HeaderProvider for DurableEmitter clients, delegating the static/rotating provider logic to chipingress.NewHeaderProvider.
For rotating auth (AuthHeadersTTL > 0) the signer is wrapped in a lazy holder so the CSA keystore can be injected after startup via SetGlobalSigner.
func SetGlobalEmitter ¶
func SetGlobalEmitter(d *DurableEmitter)
SetGlobalEmitter sets the global DurableEmitter.
func SetGlobalSigner ¶
func SetGlobalSigner(signer Signer)
SetGlobalSigner injects the CSA keystore used to refresh rotating chip ingress auth headers. No-op when rotating auth is not configured.
Types ¶
type AuthConfig ¶
type AuthConfig struct {
AuthHeaders map[string]string
AuthHeadersTTL time.Duration
AuthPublicKeyHex string
// AuthKeySigner may be nil at init time for LOOP plugins; call SetGlobalSigner
// after the CSA keystore is available.
AuthKeySigner Signer
}
AuthConfig configures chip ingress auth headers for DurableEmitter clients.
type BatchEmitter ¶
type BatchEmitter interface {
// QueueMessage enqueues a single CloudEvent for batched delivery.
// Returns an error only if the internal buffer is full or the client
// has been stopped. Callers must treat a non-nil return as a
// drop (the event is still persisted; retransmit will retry).
QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error
// Start begins background processing. Must be called before QueueMessage.
Start(ctx context.Context)
// Stop flushes any queued events, waits for all in-flight network calls
// and callbacks to complete, then closes the underlying transport.
Stop()
}
BatchEmitter is the transport interface DurableEmitter delegates to for batched delivery of CloudEvents to Chip Ingress.
*batch.Client from pkg/chipingress/batch satisfies this interface and handles seqnum stamping, gRPC size splitting, concurrency limiting, and graceful shutdown with a configurable timeout.
The callback passed to QueueMessage is invoked once after the batch containing the event is sent. A nil error means the RPC succeeded; a non-nil error means the batch was dropped — the event remains in the DB and the retransmit loop will retry it.
type BatchInserter ¶
type BatchInserter interface {
InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error)
}
BatchInserter is optionally implemented by DurableEventStore implementations to support multi-row inserts for higher throughput. When the store implements this interface and InsertBatchSize > 0, DurableEmitter coalesces Emit() calls into batched INSERTs, dramatically reducing per-event transaction overhead.
type Config ¶
type Config struct {
// RetransmitInterval controls how often the retransmit loop ticks.
RetransmitInterval time.Duration
// RetransmitAfter is the minimum age of an event before the retransmit
// loop considers it. This gives the batch publish path time to succeed.
RetransmitAfter time.Duration
// RetransmitBatchSize caps how many pending rows are listed per retransmit tick.
RetransmitBatchSize int
// ExpiryInterval controls how often the expiry loop ticks.
ExpiryInterval time.Duration
// EventTTL is the maximum age of an event before it is expired.
EventTTL time.Duration
// PublishTimeout is the deadline for DB operations in delivery callbacks
// (MarkDeliveredBatch). The actual gRPC publish timeout is configured on
// the BatchEmitter (batch.Client) directly.
PublishTimeout time.Duration
// PurgeInterval is how often the purge loop runs to batch-delete rows that
// were marked delivered (Postgres). Zero defaults to 250ms.
PurgeInterval time.Duration
// PurgeBatchSize is the maximum rows removed per PurgeDelivered call. Zero defaults to 500.
PurgeBatchSize int
// DisablePruning disables the background purge (PurgeDelivered) and expiry
// (DeleteExpired) loops. Events remain in the DB after delivery. Useful for
// post-test analysis of created_at / delivered_at timestamps.
DisablePruning bool
// Hooks is optional instrumentation (load tests, profiling). Nil fields are skipped.
// Callbacks may run from many goroutines; implementations must be thread-safe.
Hooks *Hooks
// Metrics enables OpenTelemetry instruments (queue, publish, store, optional process stats).
// When non-nil, a meter must be supplied to NewDurableEmitter; nil disables instrumentation.
Metrics *DurableEmitterMetricsConfig
// InsertBatchSize enables write coalescing when > 0 and the store implements
// BatchInserter. Multiple concurrent Emit() calls are grouped into a single
// multi-row INSERT, dramatically reducing per-event transaction overhead.
// Each coalescer worker collects up to InsertBatchSize payloads before flushing.
InsertBatchSize int
// InsertBatchFlushInterval is the linger time after the first payload arrives
// in a coalescing batch. Zero defaults to 2ms.
InsertBatchFlushInterval time.Duration
// InsertBatchWorkers is the number of concurrent batch-insert goroutines.
// Zero defaults to 4.
InsertBatchWorkers int
}
Config configures the DurableEmitter behaviour.
func DefaultConfig ¶
func DefaultConfig() Config
type DurableEmitter ¶
func GetGlobalEmitter ¶
func GetGlobalEmitter() *DurableEmitter
GetGlobalEmitter returns the global DurableEmitter, or nil if Setup has not been called.
func NewDurableEmitter ¶
func NewDurableEmitter( store DurableEventStore, batchEmitter BatchEmitter, fallbackClient chipingress.Client, retransmitEnabled bool, cfg Config, lggr logger.Logger, meter metric.Meter, ) (*DurableEmitter, error)
NewDurableEmitter constructs a DurableEmitter as a service.
batchEmitter is the transport layer (typically *batch.Client from pkg/chipingress/batch) responsible for batched gRPC delivery, seqnum stamping, size splitting, and concurrency limiting.
fallbackClient, when non-nil, is used to retry individual events via a direct unary Publish RPC whenever the batch emitter reports a delivery failure. This gives a fast second-chance path before the DB-backed retransmit loop kicks in. Pass nil to disable single-event fallback (events are left in the DB and delivered by the retransmit loop).
func Setup ¶
func Setup( store DurableEventStore, cfg SetupConfig, lggr logger.Logger, ) (*DurableEmitter, error)
Setup creates a DurableEmitter with dedicated batch and fallback chip ingress clients, registers it as the global emitter, and returns it unconfigured.
func (*DurableEmitter) Emit ¶
Emit persists the event then hands it to the BatchEmitter for async delivery. Returns nil once the insert is accepted (or the coalesced insert path completes successfully). Returns an error when the service is not in the Started state (e.g. before Start or after Close).
type DurableEmitterMetricsConfig ¶
type DurableEmitterMetricsConfig struct {
// PollInterval is how often queue and optional process gauges refresh. Zero = 10s.
PollInterval time.Duration
// NearExpiryLead is the window before EventTTL used for queue.near_ttl (DLQ pressure proxy). Zero = 5m.
NearExpiryLead time.Duration
// MaxQueuePayloadBytes, if > 0, records capacity_usage_ratio = queue_payload_bytes / max.
MaxQueuePayloadBytes int64
}
DurableEmitterMetricsConfig enables OpenTelemetry metrics for DurableEmitter. Set on Config.Metrics; nil disables instrumentation.
When non-nil, an otel Meter must be supplied to NewDurableEmitter so that instruments can be registered. DurableEmitter does not look up a global meter on its own — callers are responsible for supplying one (usually via otel.Meter("durableemitter") or an equivalently scoped meter from their telemetry stack).
type DurableEvent ¶
type DurableEvent struct {
ID int64
Payload []byte // serialized CloudEventPb proto
CreatedAt time.Time
}
DurableEvent represents a persisted event awaiting delivery to Chip.
type DurableEventStore ¶
type DurableEventStore interface {
// Insert persists a serialized event and returns its assigned ID.
Insert(ctx context.Context, payload []byte) (int64, error)
// Delete physically removes a row (corrupt payloads, policy drops, tests).
Delete(ctx context.Context, id int64) error
// MarkDelivered records successful delivery to Chip. The row must no longer
// appear in ListPending. Postgres implementations typically set delivered_at;
// a background PurgeDelivered removes rows later. MemDurableEventStore removes
// the row immediately (same as Delete).
MarkDelivered(ctx context.Context, id int64) error
// MarkDeliveredBatch marks multiple events as delivered in a single operation.
// Semantically equivalent to calling MarkDelivered for each id.
MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error)
// PurgeDelivered deletes up to batchLimit rows already marked delivered.
// Implementations that remove rows in MarkDelivered may return 0, nil always.
PurgeDelivered(ctx context.Context, batchLimit int) (deleted int64, err error)
// ListPending returns events created before the given cutoff, ordered by
// creation time ascending, up to limit rows.
ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error)
// DeleteExpired removes undelivered events older than ttl and returns the
// count deleted. Implementations MUST NOT delete or count already-delivered
// rows here (those are reclaimed by PurgeDelivered): the returned count is
// used to decrement the in-memory pending/queue-depth counter, which only
// tracks undelivered events. Counting delivered rows would double-subtract
// them and drive the queue-depth gauge negative.
DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error)
}
DurableEventStore abstracts the persistence layer for durable chip events. Implementations must be safe for concurrent use.
type DurableQueueObserver ¶
type DurableQueueObserver interface {
// ObserveDurableQueue returns live queue statistics. eventTTL and nearExpiryLead
// match Config (nearExpiryLead should be << eventTTL).
ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error)
}
DurableQueueObserver is optionally implemented by DurableEventStore implementations so DurableEmitter can export queue depth and age gauges when metrics are enabled.
type DurableQueueStats ¶
type DurableQueueStats struct {
// Depth is the number of undelivered (delivered_at IS NULL) rows — the
// delivery backlog.
Depth int64
// TotalRows is the number of rows physically present in the table, including
// delivered-but-not-yet-purged rows. This is the authoritative "queue depth"
// (actual table count): it is read directly from the DB so it stays correct
// regardless of how many writers share the table or which in-memory delta
// updates were lost to failed/partial DB operations.
TotalRows int64
PayloadBytes int64
OldestPendingAge time.Duration // 0 if the queue is empty
// NearTTLCount is the number of rows within nearExpiryLead of EventTTL (still
// pending, not yet removed by expiry). Serves as a DLQ-pressure proxy; there is
// no separate dead-letter table in the default design.
NearTTLCount int64
}
DurableQueueStats is a point-in-time snapshot of the pending queue for metrics.
type Hooks ¶
type Hooks struct {
// OnEmitInsert is called after each store.Insert in Emit (the DB write that
// blocks the caller). elapsed covers only the INSERT; err is nil on success.
OnEmitInsert func(elapsed time.Duration, err error)
// OnBatchPublish is called from the delivery callback after each event's
// batch is sent. elapsed is measured from QueueMessage call to callback
// invocation; batchSize is always 1 (one callback per event); err is nil
// on success.
OnBatchPublish func(elapsed time.Duration, batchSize int, err error)
// OnBatchMarkDelivered is called after MarkDeliveredBatch following a successful delivery.
OnBatchMarkDelivered func(elapsed time.Duration, count int)
}
Hooks records delivery latency to locate pipeline bottlenecks.
type PgDurableEventStore ¶
type PgDurableEventStore struct {
// contains filtered or unexported fields
}
PgDurableEventStore is a Postgres-backed implementation of DurableEventStore. Tests live in chainlink/core/services/durableemitter as they require DB migrations.
func NewPgDurableEventStore ¶
func NewPgDurableEventStore(ds sqlutil.DataSource) *PgDurableEventStore
func (*PgDurableEventStore) Delete ¶
func (s *PgDurableEventStore) Delete(ctx context.Context, id int64) error
func (*PgDurableEventStore) DeleteExpired ¶
func (*PgDurableEventStore) InsertBatch ¶
func (*PgDurableEventStore) ListPending ¶
func (s *PgDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error)
func (*PgDurableEventStore) MarkDelivered ¶
func (s *PgDurableEventStore) MarkDelivered(ctx context.Context, id int64) error
func (*PgDurableEventStore) MarkDeliveredBatch ¶
func (*PgDurableEventStore) ObserveDurableQueue ¶
func (s *PgDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error)
ObserveDurableQueue implements DurableQueueObserver for queue depth / age gauges. cnt/payload_sum/min_created describe only the undelivered backlog, while total is the authoritative physical row count (incl. delivered-but-not-purged rows) used as the queue-depth gauge.
func (*PgDurableEventStore) PurgeDelivered ¶
type SetupConfig ¶
type SetupConfig struct {
// Endpoint is the gRPC address for the Chip Ingress service.
Endpoint string
// InsecureConnection disables TLS when true.
InsecureConnection bool
// Auth configures chip ingress credentials. AuthKeySigner may be nil at init
// for LOOP plugins; call SetGlobalSigner after the CSA keystore is available.
Auth AuthConfig
// RetransmitEnabled controls whether the retransmit and cleanup loops run.
// Set to true for the host (chainlink node) process.
// Set to false for LOOP plugin processes — the host's retransmit loop picks
// up any rows inserted by plugin-side DurableEmitters from the shared DB.
RetransmitEnabled bool
// Batch client tuning — zero values use package defaults.
BatchSize int // default: 50
BatchInterval time.Duration // default: 50ms
MaxConcurrentSends int // default: 4
MaxPublishTimeout time.Duration // default: 5s
ShutdownTimeout time.Duration // default: 30s
// EmitterConfig overrides DefaultConfig when non-nil.
EmitterConfig *Config
// Meter is the OpenTelemetry meter for instrumentation. Nil disables metrics.
Meter metric.Meter
}
SetupConfig holds all configuration required to create and start a DurableEmitter including its chip ingress transport clients.
type Signer ¶
type Signer = chipingress.Signer
Signer signs auth header payloads using the node's CSA key. It is an alias of chipingress.Signer so DurableEmitter callers don't need to import chipingress directly.