Documentation
¶
Overview ¶
Package events provides event handling for Helix Cluster OS.
This file defines the domain event types, the 5 HELIX_* JetStream streams, the {domain}.{entity_id}.{action} subject convention, and the HelixPublisher / HelixSubscriber interfaces that the real NATS backend implements.
Package events provides event handling for Helix Cluster OS.
Index ¶
- Constants
- Variables
- func AvroDecode(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (map[string]any, error)
- func AvroEncode(schema *AvroSchema, record map[string]any) ([]byte, error)
- func EncodeAuditEventAvro(e *AuditEvent, writer *AvroSchema) ([]byte, error)
- func EncodeNodeEventAvro(ne *NodeEvent, writer *AvroSchema) ([]byte, error)
- func EncodeSchedulerEventAvro(e *SchedulerEvent, writer *AvroSchema) ([]byte, error)
- func EncodeSessionEventAvro(e *SessionEvent, writer *AvroSchema) ([]byte, error)
- func EnsureStreams(ctx context.Context, js JetStreamAdmin) error
- func HelixSubject(domain, entityID, action string) string
- type AuditEvent
- type AvroField
- type AvroSchema
- type AvroType
- type Bus
- type Event
- type HelixBackend
- type HelixBackendConfig
- type HelixPublisher
- type HelixStream
- type HelixStreamDef
- type HelixStreamInfo
- type HelixSubscriber
- type JetStreamAdmin
- type NATSBackend
- func (n *NATSBackend) Close() error
- func (n *NATSBackend) Publish(e Event) error
- func (n *NATSBackend) PublishAuditEvent(e *AuditEvent) error
- func (n *NATSBackend) PublishNodeEvent(ne *NodeEvent) error
- func (n *NATSBackend) PublishSchedulerEvent(e *SchedulerEvent) error
- func (n *NATSBackend) PublishSessionEvent(e *SessionEvent) error
- func (n *NATSBackend) Subscribe(ctx context.Context, eventType string) (<-chan Event, func(), error)
- func (n *NATSBackend) SubscribeAuditEvents(ctx context.Context, subjectID, action string) (<-chan *AuditEvent, func(), error)
- func (n *NATSBackend) SubscribeNodeEvents(ctx context.Context, nodeID, action string) (<-chan *NodeEvent, func(), error)
- func (n *NATSBackend) SubscribeSchedulerEvents(ctx context.Context, entityID, action string) (<-chan *SchedulerEvent, func(), error)
- func (n *NATSBackend) SubscribeSessionEvents(ctx context.Context, sessionID, action string) (<-chan *SessionEvent, func(), error)
- type NATSBackendOption
- func WithAvro(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption
- func WithAvroAudit(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption
- func WithAvroEvents(registry *SchemaRegistry) NATSBackendOption
- func WithAvroScheduler(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption
- func WithAvroSession(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption
- type NodeEvent
- type RealJetStreamAdmin
- type SchedulerEvent
- type SchemaRegistry
- type SessionEvent
- type WireFormat
Constants ¶
const HelixMaxAge = 24 * time.Hour
HelixMaxAge is the retention window for all 5 Helix control-plane streams. Events older than this are evicted by JetStream. 24 h is a reasonable operational window for a single-node cluster; set to a larger value for multi-node / long-tail replay requirements.
const HelixReplicas = 1
HelixReplicas is the replication factor for all 5 streams. 1 is correct for single-node development and integration tests; production deployments should override via EnsureStreamsWithConfig.
Variables ¶
var AllHelixStreams = []HelixStream{ StreamNodes, StreamSessions, StreamScheduler, StreamHealth, StreamAlerts, }
AllHelixStreams is the ordered list of all five HELIX_* streams. Callers that must iterate them (e.g. stream admin setup) use this slice so they cannot accidentally miss one.
var HelixStreamConfigs = []HelixStreamDef{ { Name: StreamNodes, Subjects: []string{"helix.nodes.>"}, MaxAge: HelixMaxAge, Replicas: HelixReplicas, }, { Name: StreamSessions, Subjects: []string{"helix.sessions.>"}, MaxAge: HelixMaxAge, Replicas: HelixReplicas, }, { Name: StreamScheduler, Subjects: []string{"helix.scheduler.>"}, MaxAge: HelixMaxAge, Replicas: HelixReplicas, }, { Name: StreamHealth, Subjects: []string{"helix.health.>"}, MaxAge: HelixMaxAge, Replicas: HelixReplicas, }, { Name: StreamAlerts, Subjects: []string{"helix.alerts.>"}, MaxAge: HelixMaxAge, Replicas: HelixReplicas, }, }
HelixStreamConfigs is the ordered, authoritative config table for all 5 control-plane streams. This is the single source of truth.
Subject convention: "helix.<domain>.>" so all Helix events share the "helix." top-level namespace; the domain prefix matches helixStreamDomains in helix_events.go.
Functions ¶
func AvroDecode ¶
func AvroDecode(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (map[string]any, error)
AvroDecode decodes a single-object-encoded payload. It reads the writer fingerprint from the header, resolves the writer schema via the registry (returning an error for an unknown/unregistered fingerprint), decodes the body under the writer schema, and then RESOLVES the result to readerSchema: reader-only fields are filled from their defaults, writer-only fields are discarded. The returned map contains exactly readerSchema's fields.
func AvroEncode ¶
func AvroEncode(schema *AvroSchema, record map[string]any) ([]byte, error)
AvroEncode encodes record (a name->value map) under the writer schema into a single-object-encoded byte slice (magic + fingerprint + body). Every field declared by the schema MUST be present in record with a type-compatible Go value, otherwise an error is returned (encode-side validation).
func EncodeAuditEventAvro ¶
func EncodeAuditEventAvro(e *AuditEvent, writer *AvroSchema) ([]byte, error)
EncodeAuditEventAvro encodes an AuditEvent as Avro single-object bytes.
func EncodeNodeEventAvro ¶
func EncodeNodeEventAvro(ne *NodeEvent, writer *AvroSchema) ([]byte, error)
EncodeNodeEventAvro encodes a NodeEvent into Avro single-object bytes under the given writer schema. The returned bytes begin with the 0xC3 0x01 marker and the writer-schema fingerprint, so they are self-describing on the wire.
func EncodeSchedulerEventAvro ¶
func EncodeSchedulerEventAvro(e *SchedulerEvent, writer *AvroSchema) ([]byte, error)
EncodeSchedulerEventAvro encodes a SchedulerEvent as Avro single-object bytes.
func EncodeSessionEventAvro ¶
func EncodeSessionEventAvro(e *SessionEvent, writer *AvroSchema) ([]byte, error)
EncodeSessionEventAvro encodes a SessionEvent as Avro single-object bytes.
func EnsureStreams ¶
func EnsureStreams(ctx context.Context, js JetStreamAdmin) error
EnsureStreams creates or updates all 5 HELIX_* JetStream streams so their configuration matches HelixStreamConfigs. It is safe to call multiple times (idempotent): a second call with an already-correct broker state produces no error.
For each stream:
- If StreamInfo returns natsgo.ErrStreamNotFound → AddStream.
- If StreamInfo succeeds (stream exists) → UpdateStream to converge config.
- Any other StreamInfo error → return immediately with a wrapped error.
The ctx parameter is threaded through for future cancellation support; the current natsgo.JetStreamContext does not accept a ctx directly, so it is checked for cancellation before each operation.
func HelixSubject ¶
HelixSubject builds the canonical {domain}.{entity_id}.{action} NATS subject. All three components are sanitised: empty tokens are replaced with "_", and NATS-illegal characters (space, "*", ">") are replaced with "_". This prevents broken routing while preserving the three-token structure.
Examples:
HelixSubject("nodes", "n1", "heartbeat") → "nodes.n1.heartbeat"
HelixSubject("alerts", "zone-a", "fire") → "alerts.zone-a.fire"
Types ¶
type AuditEvent ¶
type AuditEvent struct {
RunID string `json:"run_id"`
Actor string `json:"actor"`
SubjectID string `json:"subject_id"`
Action string `json:"action"`
Timestamp time.Time `json:"timestamp"`
}
AuditEvent is an audit-log event (an actor performed an action on a subject). It carries the actor, the affected subject id, and the action.
func DecodeAuditEventAvro ¶
func DecodeAuditEventAvro(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (*AuditEvent, error)
DecodeAuditEventAvro decodes Avro single-object bytes into an AuditEvent.
func UnmarshalAuditEvent ¶
func UnmarshalAuditEvent(payload []byte) (*AuditEvent, error)
UnmarshalAuditEvent deserialises an AuditEvent from raw JSON bytes.
func (*AuditEvent) MarshalPayload ¶
func (e *AuditEvent) MarshalPayload() ([]byte, error)
MarshalPayload serialises the AuditEvent to JSON bytes for Event.Payload.
func (*AuditEvent) Subject ¶
func (e *AuditEvent) Subject() string
Subject returns the canonical "audit.{SubjectID}.{Action}" subject.
type AvroField ¶
type AvroField struct {
Name string
Type AvroType
// HasDefault marks that Default is meaningful. A reader-only field MUST
// have a default for the writer/reader schema pair to be resolvable.
HasDefault bool
Default any
}
AvroField is one field of a record schema.
type AvroSchema ¶
AvroSchema is a flat record schema: an ordered list of primitive fields.
func AuditEventSchemaV1 ¶
func AuditEventSchemaV1() *AvroSchema
AuditEventSchemaV1 is the flat Avro projection of AuditEvent.
func NodeEventSchemaV1 ¶
func NodeEventSchemaV1() *AvroSchema
NodeEventSchemaV1 is the original Avro schema for NodeEvent: the fields that shipped before schema evolution. Timestamp is carried as a long (unix nanos) and Metadata is intentionally excluded from the v1 Avro projection (maps are out of this subset); the Avro path serializes the scalar identity fields.
func NodeEventSchemaV2 ¶
func NodeEventSchemaV2() *AvroSchema
NodeEventSchemaV2 evolves V1 by adding the "action" field WITH a default, so a V1 reader can read a V2 payload (skips action) and a V2 reader can read a V1 payload (synthesizes action from the default). This is the backward- AND forward-compatible evolution Avro is chosen for.
func SchedulerEventSchemaV1 ¶
func SchedulerEventSchemaV1() *AvroSchema
SchedulerEventSchemaV1 is the flat Avro projection of SchedulerEvent.
func SessionEventSchemaV1 ¶
func SessionEventSchemaV1() *AvroSchema
SessionEventSchemaV1 is the flat Avro projection of SessionEvent.
func (*AvroSchema) Fingerprint ¶
func (s *AvroSchema) Fingerprint() uint64
Fingerprint returns the CRC-64-AVRO (Rabin) fingerprint of the schema's canonical form. Two schemas with the same canonical form (same record name, same ordered field names+types) share a fingerprint; defaults do NOT affect the fingerprint, matching Avro's Parsing Canonical Form which strips them.
type AvroType ¶
type AvroType string
AvroType enumerates the primitive Avro types supported by this subset.
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is an in-memory event bus.
type HelixBackend ¶
type HelixBackend struct {
// contains filtered or unexported fields
}
HelixBackend is a real NATS/JetStream-backed implementation of HelixPublisher and HelixSubscriber. It provisions one natsbus.Bus per HELIX_* stream so each stream has its own isolated JetStream stream and subject namespace:
HELIX_NODES → stream name "HELIX_NODES", prefix "nodes" HELIX_SESSIONS → stream name "HELIX_SESSIONS", prefix "sessions" HELIX_SCHEDULER → stream name "HELIX_SCHEDULER", prefix "scheduler" HELIX_HEALTH → stream name "HELIX_HEALTH", prefix "health" HELIX_ALERTS → stream name "HELIX_ALERTS", prefix "alerts"
Within each bus the wire subject is:
{prefix}.{entity_id}.{action} → e.g. "nodes.n1.heartbeat"
The event.Type passed to natsbus is "{entity_id}.{action}" (without the domain prefix, which is handled by natsbus internally).
Zero-value is not usable; construct via NewHelixBackend.
func NewHelixBackend ¶
func NewHelixBackend(ctx context.Context, cfg HelixBackendConfig) (*HelixBackend, error)
NewHelixBackend connects to NATS at cfg.URL and provisions all five HELIX_* JetStream streams (idempotent). Each stream gets its own natsbus.Bus instance sharing the same NATS URL.
func (*HelixBackend) Close ¶
func (b *HelixBackend) Close() error
Close closes all five underlying natsbus.Bus instances. It is idempotent.
func (*HelixBackend) PublishNodeEvent ¶
func (b *HelixBackend) PublishNodeEvent(ne *NodeEvent) error
PublishNodeEvent marshals ne to JSON, wraps it in an event.Event, and publishes it on the HELIX_NODES stream using the subject:
nodes.{ne.NodeID}.{ne.Action}
The natsbus prefix is "nodes" and the event.Type is "{ne.NodeID}.{ne.Action}", so the wire subject becomes "nodes.{ne.NodeID}.{ne.Action}" — exactly the {domain}.{entity_id}.{action} convention.
func (*HelixBackend) StreamInfo ¶
func (b *HelixBackend) StreamInfo(stream HelixStream) (*HelixStreamInfo, error)
StreamInfo returns metadata about the named HELIX_* stream. NumMessages reflects the count of events published via this HelixBackend instance on the given stream (tracked internally since natsbus.Bus does not expose JetStream StreamInfo directly). NumConsumers is zero — the integration tier captures broker-native consumer counts via the broker management API.
A non-error return proves the stream was provisioned (its bus was created during NewHelixBackend). This is §7.1 sink-side evidence of stream existence.
func (*HelixBackend) SubscribeNodeEvents ¶
func (b *HelixBackend) SubscribeNodeEvents(nodeID, action string) (<-chan *NodeEvent, func(), error)
SubscribeNodeEvents subscribes to NodeEvents on HELIX_NODES for the subject "nodes.{nodeID}.{action}". The returned channel delivers decoded NodeEvents. Calling stop or cancelling ctx closes the channel.
The natsbus subscribes to event.Type "{nodeID}.{action}" with the "nodes" prefix, which resolves to the exact wire subject "nodes.{nodeID}.{action}".
type HelixBackendConfig ¶
type HelixBackendConfig struct {
// URL is the NATS server URL (e.g. "nats://127.0.0.1:4222"). Required.
URL string
// ConnectTimeout bounds the initial connection attempt. Defaults to 5 s.
ConnectTimeout time.Duration
}
HelixBackendConfig holds connection parameters for HelixBackend.
type HelixPublisher ¶
type HelixPublisher interface {
// PublishNodeEvent publishes a NodeEvent on the StreamNodes stream using
// the subject derived from ne.Subject().
PublishNodeEvent(ne *NodeEvent) error
}
HelixPublisher is the interface for publishing domain events onto the NATS JetStream-backed Helix event bus. The real implementation is HelixBackend; unit tests inject a fake that implements this interface.
type HelixStream ¶
type HelixStream string
HelixStream enumerates the five authoritative JetStream streams used by Helix Cluster OS. Each stream captures a broad family of events (identified by a subject wildcard) so a single JetStream stream carries all topics for that domain.
const ( // StreamNodes captures node lifecycle events (heartbeats, joins, leaves, // failures). Subject wildcard: "nodes.>". StreamNodes HelixStream = "HELIX_NODES" // StreamSessions captures session lifecycle events (create, attach, detach, // terminate). Subject wildcard: "sessions.>". StreamSessions HelixStream = "HELIX_SESSIONS" // StreamScheduler captures scheduler events (assignment, preemption, queue // changes). Subject wildcard: "scheduler.>". StreamScheduler HelixStream = "HELIX_SCHEDULER" // StreamHealth captures health-check events (probe results, thresholds, // alerts). Subject wildcard: "health.>". StreamHealth HelixStream = "HELIX_HEALTH" // StreamAlerts captures alert events (fire, resolve, escalate). Subject // wildcard: "alerts.>". StreamAlerts HelixStream = "HELIX_ALERTS" )
type HelixStreamDef ¶
type HelixStreamDef struct {
// Name is the HELIX_* stream name (e.g. "HELIX_NODES").
Name HelixStream
// Subjects is the ordered list of NATS subject filters captured by this
// stream. Always exactly one entry per stream ("helix.<domain>.>").
Subjects []string
// MaxAge is the maximum age of messages retained in the stream.
MaxAge time.Duration
// Replicas is the number of JetStream replicas.
Replicas int
}
HelixStreamDef holds the authoritative JetStream configuration for one HELIX_* stream. It is the input to EnsureStreams; callers that need to inspect the expected configuration (e.g. integration assertions) use HelixStreamConfigs.
type HelixStreamInfo ¶
type HelixStreamInfo struct {
// Name is the HELIX_* stream name.
Name HelixStream
// NumMessages is the count of messages currently stored in the stream.
NumMessages uint64
// NumConsumers is the count of active consumers on the stream.
NumConsumers int
}
HelixStreamInfo captures metadata about a JetStream stream, returned by HelixBackend.StreamInfo for observability and §7.1 evidence.
type HelixSubscriber ¶
type HelixSubscriber interface {
// SubscribeNodeEvents subscribes to NodeEvents for a specific nodeID and
// action pattern. Passing "*" for either field matches any value.
SubscribeNodeEvents(nodeID, action string) (<-chan *NodeEvent, func(), error)
}
HelixSubscriber is the interface for subscribing to domain events from the NATS JetStream-backed Helix event bus.
type JetStreamAdmin ¶
type JetStreamAdmin interface {
// StreamInfo returns metadata for the named stream. Returns an error (whose
// Is-chain includes natsgo.ErrStreamNotFound) when the stream is absent.
StreamInfo(name string) (*natsgo.StreamInfo, error)
// AddStream creates a new stream with the provided config.
AddStream(cfg *natsgo.StreamConfig) (*natsgo.StreamInfo, error)
// UpdateStream updates an existing stream to match the provided config.
UpdateStream(cfg *natsgo.StreamConfig) (*natsgo.StreamInfo, error)
}
JetStreamAdmin is the narrow seam used by EnsureStreams. It exposes only the stream-administration primitives so unit tests can inject a deterministic fake without a real NATS broker.
The real implementation is a thin wrapper around natsgo.JetStreamContext (see RealJetStreamAdmin). Integration tests supply the real context directly via NewRealJetStreamAdmin.
type NATSBackend ¶
type NATSBackend struct {
// contains filtered or unexported fields
}
NATSBackend is a real NATS/JetStream-backed event bus. It delegates to the decoupled digital.vasic.eventbus/pkg/nats backend, mapping the root Event{ID,Type,Payload,Timestamp} type to/from eventbus's event.Event.
It is an additive alternative to the in-memory Bus: callers that want durable, distributed delivery construct a NATSBackend; the default in-memory Bus is untouched. The mapping carries ID, Type, and Timestamp verbatim and round- trips the raw Payload bytes losslessly through event.Event.Payload.
func NewNATSBackend ¶
func NewNATSBackend(ctx context.Context, url string, opts ...NATSBackendOption) (*NATSBackend, error)
NewNATSBackend connects to the NATS server at url and returns a real-backed event bus. The provided context bounds the connection setup. Close the returned backend to release the connection.
func (*NATSBackend) Close ¶
func (n *NATSBackend) Close() error
Close releases the underlying NATS connection and stops all subscriptions.
func (*NATSBackend) Publish ¶
func (n *NATSBackend) Publish(e Event) error
Publish maps the root Event onto an eventbus event.Event and publishes it. The raw Payload bytes are carried through event.Event.Payload; ID, Type, and Timestamp are preserved so a subscriber observes the same logical event.
func (*NATSBackend) PublishAuditEvent ¶
func (n *NATSBackend) PublishAuditEvent(e *AuditEvent) error
PublishAuditEvent publishes an AuditEvent on the "audit.>" subject family, honoring the backend's wire format. See PublishSessionEvent for semantics.
func (*NATSBackend) PublishNodeEvent ¶
func (n *NATSBackend) PublishNodeEvent(ne *NodeEvent) error
PublishNodeEvent publishes a NodeEvent over the backend, honoring the backend's configured wire format. In WireAvro mode the payload bytes are the Avro single-object encoding of ne (0xC3 0x01 marker + writer fingerprint + body); in the default WireJSON mode they are ne's JSON. The Event identity (ID=RunID, Type=subject) is identical across both formats so subscribers route the same.
func (*NATSBackend) PublishSchedulerEvent ¶
func (n *NATSBackend) PublishSchedulerEvent(e *SchedulerEvent) error
PublishSchedulerEvent publishes a SchedulerEvent on the StreamScheduler stream, honoring the backend's wire format. See PublishSessionEvent for semantics.
func (*NATSBackend) PublishSessionEvent ¶
func (n *NATSBackend) PublishSessionEvent(e *SessionEvent) error
PublishSessionEvent publishes a SessionEvent on the StreamSessions stream, honoring the backend's wire format. In WireAvro mode the payload is the Avro single-object encoding of e (0xC3 0x01 + writer fingerprint + body); in the default WireJSON mode it is e's JSON. Event identity (ID=RunID, Type=subject) is identical across both formats so subscribers route the same.
func (*NATSBackend) Subscribe ¶
func (n *NATSBackend) Subscribe( ctx context.Context, eventType string, ) (<-chan Event, func(), error)
Subscribe subscribes to events of eventType and returns a channel delivering decoded root Events, an unsubscribe function, and an error. The returned channel is closed when the unsubscribe func is called, ctx is cancelled, or the backend is closed.
func (*NATSBackend) SubscribeAuditEvents ¶
func (n *NATSBackend) SubscribeAuditEvents( ctx context.Context, subjectID, action string, ) (<-chan *AuditEvent, func(), error)
SubscribeAuditEvents subscribes to AuditEvents on the subject derived from subjectID/action. See SubscribeSessionEvents for semantics.
func (*NATSBackend) SubscribeNodeEvents ¶
func (n *NATSBackend) SubscribeNodeEvents( ctx context.Context, nodeID, action string, ) (<-chan *NodeEvent, func(), error)
SubscribeNodeEvents subscribes to NodeEvents on the subject derived from nodeID/action and decodes each delivered payload per the backend's wire format. In WireAvro mode the payload is decoded via the registry (writer resolution) to the configured reader schema; in WireJSON mode it is JSON-unmarshalled.
func (*NATSBackend) SubscribeSchedulerEvents ¶
func (n *NATSBackend) SubscribeSchedulerEvents( ctx context.Context, entityID, action string, ) (<-chan *SchedulerEvent, func(), error)
SubscribeSchedulerEvents subscribes to SchedulerEvents on the subject derived from entityID/action. See SubscribeSessionEvents for semantics.
func (*NATSBackend) SubscribeSessionEvents ¶
func (n *NATSBackend) SubscribeSessionEvents( ctx context.Context, sessionID, action string, ) (<-chan *SessionEvent, func(), error)
SubscribeSessionEvents subscribes to SessionEvents on the subject derived from sessionID/action and decodes each payload per the backend's wire format. In WireAvro mode the payload is decoded via the registry (writer resolution) to the configured writer schema as reader; in WireJSON mode it is JSON-unmarshalled.
type NATSBackendOption ¶
type NATSBackendOption func(*NATSBackend)
NATSBackendOption configures a NATSBackend at construction time.
func WithAvro ¶
func WithAvro(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption
WithAvro switches the backend's NodeEvent helpers to the Avro wire format, using writer as the publish-side schema and registry for subscribe-side writer-schema resolution. registry MUST have the writer schema (and any other acceptable writer schemas, e.g. both V1 and V2) registered. Passing this option does NOT change the generic Publish/Subscribe byte round-trip — it changes how PublishNodeEvent / SubscribeNodeEvents project a NodeEvent on/off the wire.
func WithAvroAudit ¶
func WithAvroAudit(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption
WithAvroAudit switches AuditEvent to the Avro wire format. See WithAvroSession for semantics.
func WithAvroEvents ¶
func WithAvroEvents(registry *SchemaRegistry) NATSBackendOption
WithAvroEvents switches the backend to the Avro wire format for ALL three non-Node event families at once, registering each family's V1 writer schema in registry so the subscribe side resolves the writer fingerprint regardless of which type produced a payload. It is the typed-event analogue of WithAvro for NodeEvent and composes with it (NodeEvent + the three families can all be Avro on the same backend). registry MUST be non-nil; it is shared as the resolution surface for every typed Subscribe* call.
func WithAvroScheduler ¶
func WithAvroScheduler(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption
WithAvroScheduler switches SchedulerEvent to the Avro wire format. See WithAvroSession for semantics.
func WithAvroSession ¶
func WithAvroSession(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption
WithAvroSession switches SessionEvent to the Avro wire format using writer as the publish-side schema and registry for subscribe-side resolution, registering writer in registry. JSON remains the default for any family not opted in.
type NodeEvent ¶
type NodeEvent struct {
// RunID is a per-test / per-invocation UUID embedded by the publisher so
// the subscriber can assert it received the IDENTICAL event (not a
// different one from a previous run). Required per §7.1 anti-bluff.
RunID string `json:"run_id"`
// NodeID is the unique identifier of the reporting node.
NodeID string `json:"node_id"`
// Action describes what happened (e.g. "heartbeat", "join", "leave",
// "failure"). It maps to the third token of the subject triple.
Action string `json:"action"`
// Timestamp is the wall-clock time at which the event was created.
Timestamp time.Time `json:"timestamp"`
// Metadata is a free-form key-value bag for additional context.
Metadata map[string]string `json:"metadata,omitempty"`
}
NodeEvent is a structured event emitted by a Helix cluster node. It is the canonical payload for messages published on the StreamNodes stream.
func DecodeNodeEventAvro ¶
func DecodeNodeEventAvro(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (*NodeEvent, error)
DecodeNodeEventAvro decodes Avro single-object bytes into a NodeEvent. It reads the writer fingerprint from the header, resolves the writer schema via the registry (rejecting an unregistered fingerprint), and resolves the result to readerSchema — so a V1 reader can decode a V2 payload (skips action) and a V2 reader can decode a V1 payload (action defaulted).
func UnmarshalNodeEvent ¶
UnmarshalNodeEvent deserialises a NodeEvent from raw JSON bytes. It is the inverse of MarshalPayload and is the canonical way to decode Event.Payload on the subscriber side.
func (*NodeEvent) MarshalPayload ¶
MarshalPayload serialises the NodeEvent to JSON bytes suitable for embedding in an Event.Payload.
type RealJetStreamAdmin ¶
type RealJetStreamAdmin struct {
// contains filtered or unexported fields
}
RealJetStreamAdmin wraps natsgo.JetStreamContext to satisfy JetStreamAdmin. Construct via NewRealJetStreamAdmin.
func NewRealJetStreamAdmin ¶
func NewRealJetStreamAdmin(js natsgo.JetStreamContext) *RealJetStreamAdmin
NewRealJetStreamAdmin creates a JetStreamAdmin backed by js.
func (*RealJetStreamAdmin) AddStream ¶
func (r *RealJetStreamAdmin) AddStream(cfg *natsgo.StreamConfig) (*natsgo.StreamInfo, error)
AddStream delegates to natsgo.JetStreamContext.AddStream.
func (*RealJetStreamAdmin) StreamInfo ¶
func (r *RealJetStreamAdmin) StreamInfo(name string) (*natsgo.StreamInfo, error)
StreamInfo delegates to natsgo.JetStreamContext.StreamInfo.
func (*RealJetStreamAdmin) UpdateStream ¶
func (r *RealJetStreamAdmin) UpdateStream(cfg *natsgo.StreamConfig) (*natsgo.StreamInfo, error)
UpdateStream delegates to natsgo.JetStreamContext.UpdateStream.
type SchedulerEvent ¶
type SchedulerEvent struct {
RunID string `json:"run_id"`
EntityID string `json:"entity_id"`
Action string `json:"action"`
Timestamp time.Time `json:"timestamp"`
}
SchedulerEvent is a scheduler event (assignment / preemption / queue change) carried on the HELIX_SCHEDULER stream under "scheduler.>". EntityID is the scheduled unit (e.g. a task or run id).
func DecodeSchedulerEventAvro ¶
func DecodeSchedulerEventAvro(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (*SchedulerEvent, error)
DecodeSchedulerEventAvro decodes Avro single-object bytes into a SchedulerEvent.
func UnmarshalSchedulerEvent ¶
func UnmarshalSchedulerEvent(payload []byte) (*SchedulerEvent, error)
UnmarshalSchedulerEvent deserialises a SchedulerEvent from raw JSON bytes.
func (*SchedulerEvent) MarshalPayload ¶
func (e *SchedulerEvent) MarshalPayload() ([]byte, error)
MarshalPayload serialises the SchedulerEvent to JSON bytes for Event.Payload.
func (*SchedulerEvent) Subject ¶
func (e *SchedulerEvent) Subject() string
Subject returns the canonical "scheduler.{EntityID}.{Action}" subject.
type SchemaRegistry ¶
type SchemaRegistry struct {
// contains filtered or unexported fields
}
SchemaRegistry maps writer-schema fingerprints to their schemas so a decoder can look up the schema a payload was written with (validated routing). It is the "schema-validated routing" surface: a payload whose fingerprint is not registered is rejected rather than mis-decoded.
func NewSchemaRegistry ¶
func NewSchemaRegistry() *SchemaRegistry
NewSchemaRegistry returns an empty registry.
func (*SchemaRegistry) Lookup ¶
func (r *SchemaRegistry) Lookup(fp uint64) (*AvroSchema, bool)
Lookup returns the registered schema for a fingerprint, or (nil,false).
func (*SchemaRegistry) Register ¶
func (r *SchemaRegistry) Register(s *AvroSchema)
Register adds a schema to the registry keyed by its fingerprint. Registering the same schema twice is idempotent.
type SessionEvent ¶
type SessionEvent struct {
RunID string `json:"run_id"`
SessionID string `json:"session_id"`
Action string `json:"action"`
Timestamp time.Time `json:"timestamp"`
}
SessionEvent is a session-lifecycle event (create / attach / detach / terminate) carried on the HELIX_SESSIONS stream under "sessions.>".
func DecodeSessionEventAvro ¶
func DecodeSessionEventAvro(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (*SessionEvent, error)
DecodeSessionEventAvro decodes Avro single-object bytes into a SessionEvent.
func UnmarshalSessionEvent ¶
func UnmarshalSessionEvent(payload []byte) (*SessionEvent, error)
UnmarshalSessionEvent deserialises a SessionEvent from raw JSON bytes.
func (*SessionEvent) MarshalPayload ¶
func (e *SessionEvent) MarshalPayload() ([]byte, error)
MarshalPayload serialises the SessionEvent to JSON bytes for Event.Payload.
func (*SessionEvent) Subject ¶
func (e *SessionEvent) Subject() string
Subject returns the canonical "sessions.{SessionID}.{Action}" subject.
type WireFormat ¶
type WireFormat int
WireFormat selects the serialization used on the NATS wire path.
const ( // WireJSON is the default, back-compatible JSON serialization. WireJSON WireFormat = iota // WireAvro carries the payload as an Avro single-object-encoded record // (0xC3 0x01 + writer fingerprint + body), decoded via a SchemaRegistry. WireAvro )