events

package
v0.0.0-...-54ed9d2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package events provides event handling for Helix Cluster OS.

This file defines the domain event types, the 5 HELIX_* JetStream streams, the {domain}.{entity_id}.{action} subject convention, and the HelixPublisher / HelixSubscriber interfaces that the real NATS backend implements.

Package events provides event handling for Helix Cluster OS.

Index

Constants

View Source
const HelixMaxAge = 24 * time.Hour

HelixMaxAge is the retention window for all 5 Helix control-plane streams. Events older than this are evicted by JetStream. 24 h is a reasonable operational window for a single-node cluster; set to a larger value for multi-node / long-tail replay requirements.

View Source
const HelixReplicas = 1

HelixReplicas is the replication factor for all 5 streams. 1 is correct for single-node development and integration tests; production deployments should override via EnsureStreamsWithConfig.

Variables

AllHelixStreams is the ordered list of all five HELIX_* streams. Callers that must iterate them (e.g. stream admin setup) use this slice so they cannot accidentally miss one.

View Source
var HelixStreamConfigs = []HelixStreamDef{
	{
		Name:     StreamNodes,
		Subjects: []string{"helix.nodes.>"},
		MaxAge:   HelixMaxAge,
		Replicas: HelixReplicas,
	},
	{
		Name:     StreamSessions,
		Subjects: []string{"helix.sessions.>"},
		MaxAge:   HelixMaxAge,
		Replicas: HelixReplicas,
	},
	{
		Name:     StreamScheduler,
		Subjects: []string{"helix.scheduler.>"},
		MaxAge:   HelixMaxAge,
		Replicas: HelixReplicas,
	},
	{
		Name:     StreamHealth,
		Subjects: []string{"helix.health.>"},
		MaxAge:   HelixMaxAge,
		Replicas: HelixReplicas,
	},
	{
		Name:     StreamAlerts,
		Subjects: []string{"helix.alerts.>"},
		MaxAge:   HelixMaxAge,
		Replicas: HelixReplicas,
	},
}

HelixStreamConfigs is the ordered, authoritative config table for all 5 control-plane streams. This is the single source of truth.

Subject convention: "helix.<domain>.>" so all Helix events share the "helix." top-level namespace; the domain prefix matches helixStreamDomains in helix_events.go.

Functions

func AvroDecode

func AvroDecode(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (map[string]any, error)

AvroDecode decodes a single-object-encoded payload. It reads the writer fingerprint from the header, resolves the writer schema via the registry (returning an error for an unknown/unregistered fingerprint), decodes the body under the writer schema, and then RESOLVES the result to readerSchema: reader-only fields are filled from their defaults, writer-only fields are discarded. The returned map contains exactly readerSchema's fields.

func AvroEncode

func AvroEncode(schema *AvroSchema, record map[string]any) ([]byte, error)

AvroEncode encodes record (a name->value map) under the writer schema into a single-object-encoded byte slice (magic + fingerprint + body). Every field declared by the schema MUST be present in record with a type-compatible Go value, otherwise an error is returned (encode-side validation).

func EncodeAuditEventAvro

func EncodeAuditEventAvro(e *AuditEvent, writer *AvroSchema) ([]byte, error)

EncodeAuditEventAvro encodes an AuditEvent as Avro single-object bytes.

func EncodeNodeEventAvro

func EncodeNodeEventAvro(ne *NodeEvent, writer *AvroSchema) ([]byte, error)

EncodeNodeEventAvro encodes a NodeEvent into Avro single-object bytes under the given writer schema. The returned bytes begin with the 0xC3 0x01 marker and the writer-schema fingerprint, so they are self-describing on the wire.

func EncodeSchedulerEventAvro

func EncodeSchedulerEventAvro(e *SchedulerEvent, writer *AvroSchema) ([]byte, error)

EncodeSchedulerEventAvro encodes a SchedulerEvent as Avro single-object bytes.

func EncodeSessionEventAvro

func EncodeSessionEventAvro(e *SessionEvent, writer *AvroSchema) ([]byte, error)

EncodeSessionEventAvro encodes a SessionEvent as Avro single-object bytes.

func EnsureStreams

func EnsureStreams(ctx context.Context, js JetStreamAdmin) error

EnsureStreams creates or updates all 5 HELIX_* JetStream streams so their configuration matches HelixStreamConfigs. It is safe to call multiple times (idempotent): a second call with an already-correct broker state produces no error.

For each stream:

  • If StreamInfo returns natsgo.ErrStreamNotFound → AddStream.
  • If StreamInfo succeeds (stream exists) → UpdateStream to converge config.
  • Any other StreamInfo error → return immediately with a wrapped error.

The ctx parameter is threaded through for future cancellation support; the current natsgo.JetStreamContext does not accept a ctx directly, so it is checked for cancellation before each operation.

func HelixSubject

func HelixSubject(domain, entityID, action string) string

HelixSubject builds the canonical {domain}.{entity_id}.{action} NATS subject. All three components are sanitised: empty tokens are replaced with "_", and NATS-illegal characters (space, "*", ">") are replaced with "_". This prevents broken routing while preserving the three-token structure.

Examples:

HelixSubject("nodes", "n1", "heartbeat") → "nodes.n1.heartbeat"
HelixSubject("alerts", "zone-a", "fire")  → "alerts.zone-a.fire"

Types

type AuditEvent

type AuditEvent struct {
	RunID     string    `json:"run_id"`
	Actor     string    `json:"actor"`
	SubjectID string    `json:"subject_id"`
	Action    string    `json:"action"`
	Timestamp time.Time `json:"timestamp"`
}

AuditEvent is an audit-log event (an actor performed an action on a subject). It carries the actor, the affected subject id, and the action.

func DecodeAuditEventAvro

func DecodeAuditEventAvro(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (*AuditEvent, error)

DecodeAuditEventAvro decodes Avro single-object bytes into an AuditEvent.

func UnmarshalAuditEvent

func UnmarshalAuditEvent(payload []byte) (*AuditEvent, error)

UnmarshalAuditEvent deserialises an AuditEvent from raw JSON bytes.

func (*AuditEvent) MarshalPayload

func (e *AuditEvent) MarshalPayload() ([]byte, error)

MarshalPayload serialises the AuditEvent to JSON bytes for Event.Payload.

func (*AuditEvent) Subject

func (e *AuditEvent) Subject() string

Subject returns the canonical "audit.{SubjectID}.{Action}" subject.

type AvroField

type AvroField struct {
	Name string
	Type AvroType
	// HasDefault marks that Default is meaningful. A reader-only field MUST
	// have a default for the writer/reader schema pair to be resolvable.
	HasDefault bool
	Default    any
}

AvroField is one field of a record schema.

type AvroSchema

type AvroSchema struct {
	Name   string
	Fields []AvroField
}

AvroSchema is a flat record schema: an ordered list of primitive fields.

func AuditEventSchemaV1

func AuditEventSchemaV1() *AvroSchema

AuditEventSchemaV1 is the flat Avro projection of AuditEvent.

func NodeEventSchemaV1

func NodeEventSchemaV1() *AvroSchema

NodeEventSchemaV1 is the original Avro schema for NodeEvent: the fields that shipped before schema evolution. Timestamp is carried as a long (unix nanos) and Metadata is intentionally excluded from the v1 Avro projection (maps are out of this subset); the Avro path serializes the scalar identity fields.

func NodeEventSchemaV2

func NodeEventSchemaV2() *AvroSchema

NodeEventSchemaV2 evolves V1 by adding the "action" field WITH a default, so a V1 reader can read a V2 payload (skips action) and a V2 reader can read a V1 payload (synthesizes action from the default). This is the backward- AND forward-compatible evolution Avro is chosen for.

func SchedulerEventSchemaV1

func SchedulerEventSchemaV1() *AvroSchema

SchedulerEventSchemaV1 is the flat Avro projection of SchedulerEvent.

func SessionEventSchemaV1

func SessionEventSchemaV1() *AvroSchema

SessionEventSchemaV1 is the flat Avro projection of SessionEvent.

func (*AvroSchema) Fingerprint

func (s *AvroSchema) Fingerprint() uint64

Fingerprint returns the CRC-64-AVRO (Rabin) fingerprint of the schema's canonical form. Two schemas with the same canonical form (same record name, same ordered field names+types) share a fingerprint; defaults do NOT affect the fingerprint, matching Avro's Parsing Canonical Form which strips them.

type AvroType

type AvroType string

AvroType enumerates the primitive Avro types supported by this subset.

const (
	AvroString  AvroType = "string"
	AvroLong    AvroType = "long"
	AvroInt     AvroType = "int"
	AvroBoolean AvroType = "boolean"
	AvroBytes   AvroType = "bytes"
	AvroDouble  AvroType = "double"
)

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is an in-memory event bus.

func NewBus

func NewBus() *Bus

NewBus creates a new Bus.

func (*Bus) Publish

func (b *Bus) Publish(e Event)

Publish publishes an event.

func (*Bus) Subscribe

func (b *Bus) Subscribe(eventType string, handler func(Event))

Subscribe registers a handler for an event type.

type Event

type Event struct {
	ID        string
	Type      string
	Payload   []byte
	Timestamp time.Time
}

Event is a domain event.

type HelixBackend

type HelixBackend struct {
	// contains filtered or unexported fields
}

HelixBackend is a real NATS/JetStream-backed implementation of HelixPublisher and HelixSubscriber. It provisions one natsbus.Bus per HELIX_* stream so each stream has its own isolated JetStream stream and subject namespace:

HELIX_NODES     → stream name "HELIX_NODES",     prefix "nodes"
HELIX_SESSIONS  → stream name "HELIX_SESSIONS",  prefix "sessions"
HELIX_SCHEDULER → stream name "HELIX_SCHEDULER", prefix "scheduler"
HELIX_HEALTH    → stream name "HELIX_HEALTH",    prefix "health"
HELIX_ALERTS    → stream name "HELIX_ALERTS",    prefix "alerts"

Within each bus the wire subject is:

{prefix}.{entity_id}.{action}   →   e.g. "nodes.n1.heartbeat"

The event.Type passed to natsbus is "{entity_id}.{action}" (without the domain prefix, which is handled by natsbus internally).

Zero-value is not usable; construct via NewHelixBackend.

func NewHelixBackend

func NewHelixBackend(ctx context.Context, cfg HelixBackendConfig) (*HelixBackend, error)

NewHelixBackend connects to NATS at cfg.URL and provisions all five HELIX_* JetStream streams (idempotent). Each stream gets its own natsbus.Bus instance sharing the same NATS URL.

func (*HelixBackend) Close

func (b *HelixBackend) Close() error

Close closes all five underlying natsbus.Bus instances. It is idempotent.

func (*HelixBackend) PublishNodeEvent

func (b *HelixBackend) PublishNodeEvent(ne *NodeEvent) error

PublishNodeEvent marshals ne to JSON, wraps it in an event.Event, and publishes it on the HELIX_NODES stream using the subject:

nodes.{ne.NodeID}.{ne.Action}

The natsbus prefix is "nodes" and the event.Type is "{ne.NodeID}.{ne.Action}", so the wire subject becomes "nodes.{ne.NodeID}.{ne.Action}" — exactly the {domain}.{entity_id}.{action} convention.

func (*HelixBackend) StreamInfo

func (b *HelixBackend) StreamInfo(stream HelixStream) (*HelixStreamInfo, error)

StreamInfo returns metadata about the named HELIX_* stream. NumMessages reflects the count of events published via this HelixBackend instance on the given stream (tracked internally since natsbus.Bus does not expose JetStream StreamInfo directly). NumConsumers is zero — the integration tier captures broker-native consumer counts via the broker management API.

A non-error return proves the stream was provisioned (its bus was created during NewHelixBackend). This is §7.1 sink-side evidence of stream existence.

func (*HelixBackend) SubscribeNodeEvents

func (b *HelixBackend) SubscribeNodeEvents(nodeID, action string) (<-chan *NodeEvent, func(), error)

SubscribeNodeEvents subscribes to NodeEvents on HELIX_NODES for the subject "nodes.{nodeID}.{action}". The returned channel delivers decoded NodeEvents. Calling stop or cancelling ctx closes the channel.

The natsbus subscribes to event.Type "{nodeID}.{action}" with the "nodes" prefix, which resolves to the exact wire subject "nodes.{nodeID}.{action}".

type HelixBackendConfig

type HelixBackendConfig struct {
	// URL is the NATS server URL (e.g. "nats://127.0.0.1:4222"). Required.
	URL string
	// ConnectTimeout bounds the initial connection attempt. Defaults to 5 s.
	ConnectTimeout time.Duration
}

HelixBackendConfig holds connection parameters for HelixBackend.

type HelixPublisher

type HelixPublisher interface {
	// PublishNodeEvent publishes a NodeEvent on the StreamNodes stream using
	// the subject derived from ne.Subject().
	PublishNodeEvent(ne *NodeEvent) error
}

HelixPublisher is the interface for publishing domain events onto the NATS JetStream-backed Helix event bus. The real implementation is HelixBackend; unit tests inject a fake that implements this interface.

type HelixStream

type HelixStream string

HelixStream enumerates the five authoritative JetStream streams used by Helix Cluster OS. Each stream captures a broad family of events (identified by a subject wildcard) so a single JetStream stream carries all topics for that domain.

const (
	// StreamNodes captures node lifecycle events (heartbeats, joins, leaves,
	// failures). Subject wildcard: "nodes.>".
	StreamNodes HelixStream = "HELIX_NODES"

	// StreamSessions captures session lifecycle events (create, attach, detach,
	// terminate). Subject wildcard: "sessions.>".
	StreamSessions HelixStream = "HELIX_SESSIONS"

	// StreamScheduler captures scheduler events (assignment, preemption, queue
	// changes). Subject wildcard: "scheduler.>".
	StreamScheduler HelixStream = "HELIX_SCHEDULER"

	// StreamHealth captures health-check events (probe results, thresholds,
	// alerts). Subject wildcard: "health.>".
	StreamHealth HelixStream = "HELIX_HEALTH"

	// StreamAlerts captures alert events (fire, resolve, escalate). Subject
	// wildcard: "alerts.>".
	StreamAlerts HelixStream = "HELIX_ALERTS"
)

type HelixStreamDef

type HelixStreamDef struct {
	// Name is the HELIX_* stream name (e.g. "HELIX_NODES").
	Name HelixStream
	// Subjects is the ordered list of NATS subject filters captured by this
	// stream.  Always exactly one entry per stream ("helix.<domain>.>").
	Subjects []string
	// MaxAge is the maximum age of messages retained in the stream.
	MaxAge time.Duration
	// Replicas is the number of JetStream replicas.
	Replicas int
}

HelixStreamDef holds the authoritative JetStream configuration for one HELIX_* stream. It is the input to EnsureStreams; callers that need to inspect the expected configuration (e.g. integration assertions) use HelixStreamConfigs.

type HelixStreamInfo

type HelixStreamInfo struct {
	// Name is the HELIX_* stream name.
	Name HelixStream
	// NumMessages is the count of messages currently stored in the stream.
	NumMessages uint64
	// NumConsumers is the count of active consumers on the stream.
	NumConsumers int
}

HelixStreamInfo captures metadata about a JetStream stream, returned by HelixBackend.StreamInfo for observability and §7.1 evidence.

type HelixSubscriber

type HelixSubscriber interface {
	// SubscribeNodeEvents subscribes to NodeEvents for a specific nodeID and
	// action pattern. Passing "*" for either field matches any value.
	SubscribeNodeEvents(nodeID, action string) (<-chan *NodeEvent, func(), error)
}

HelixSubscriber is the interface for subscribing to domain events from the NATS JetStream-backed Helix event bus.

type JetStreamAdmin

type JetStreamAdmin interface {
	// StreamInfo returns metadata for the named stream. Returns an error (whose
	// Is-chain includes natsgo.ErrStreamNotFound) when the stream is absent.
	StreamInfo(name string) (*natsgo.StreamInfo, error)
	// AddStream creates a new stream with the provided config.
	AddStream(cfg *natsgo.StreamConfig) (*natsgo.StreamInfo, error)
	// UpdateStream updates an existing stream to match the provided config.
	UpdateStream(cfg *natsgo.StreamConfig) (*natsgo.StreamInfo, error)
}

JetStreamAdmin is the narrow seam used by EnsureStreams. It exposes only the stream-administration primitives so unit tests can inject a deterministic fake without a real NATS broker.

The real implementation is a thin wrapper around natsgo.JetStreamContext (see RealJetStreamAdmin). Integration tests supply the real context directly via NewRealJetStreamAdmin.

type NATSBackend

type NATSBackend struct {
	// contains filtered or unexported fields
}

NATSBackend is a real NATS/JetStream-backed event bus. It delegates to the decoupled digital.vasic.eventbus/pkg/nats backend, mapping the root Event{ID,Type,Payload,Timestamp} type to/from eventbus's event.Event.

It is an additive alternative to the in-memory Bus: callers that want durable, distributed delivery construct a NATSBackend; the default in-memory Bus is untouched. The mapping carries ID, Type, and Timestamp verbatim and round- trips the raw Payload bytes losslessly through event.Event.Payload.

func NewNATSBackend

func NewNATSBackend(ctx context.Context, url string, opts ...NATSBackendOption) (*NATSBackend, error)

NewNATSBackend connects to the NATS server at url and returns a real-backed event bus. The provided context bounds the connection setup. Close the returned backend to release the connection.

func (*NATSBackend) Close

func (n *NATSBackend) Close() error

Close releases the underlying NATS connection and stops all subscriptions.

func (*NATSBackend) Publish

func (n *NATSBackend) Publish(e Event) error

Publish maps the root Event onto an eventbus event.Event and publishes it. The raw Payload bytes are carried through event.Event.Payload; ID, Type, and Timestamp are preserved so a subscriber observes the same logical event.

func (*NATSBackend) PublishAuditEvent

func (n *NATSBackend) PublishAuditEvent(e *AuditEvent) error

PublishAuditEvent publishes an AuditEvent on the "audit.>" subject family, honoring the backend's wire format. See PublishSessionEvent for semantics.

func (*NATSBackend) PublishNodeEvent

func (n *NATSBackend) PublishNodeEvent(ne *NodeEvent) error

PublishNodeEvent publishes a NodeEvent over the backend, honoring the backend's configured wire format. In WireAvro mode the payload bytes are the Avro single-object encoding of ne (0xC3 0x01 marker + writer fingerprint + body); in the default WireJSON mode they are ne's JSON. The Event identity (ID=RunID, Type=subject) is identical across both formats so subscribers route the same.

func (*NATSBackend) PublishSchedulerEvent

func (n *NATSBackend) PublishSchedulerEvent(e *SchedulerEvent) error

PublishSchedulerEvent publishes a SchedulerEvent on the StreamScheduler stream, honoring the backend's wire format. See PublishSessionEvent for semantics.

func (*NATSBackend) PublishSessionEvent

func (n *NATSBackend) PublishSessionEvent(e *SessionEvent) error

PublishSessionEvent publishes a SessionEvent on the StreamSessions stream, honoring the backend's wire format. In WireAvro mode the payload is the Avro single-object encoding of e (0xC3 0x01 + writer fingerprint + body); in the default WireJSON mode it is e's JSON. Event identity (ID=RunID, Type=subject) is identical across both formats so subscribers route the same.

func (*NATSBackend) Subscribe

func (n *NATSBackend) Subscribe(
	ctx context.Context, eventType string,
) (<-chan Event, func(), error)

Subscribe subscribes to events of eventType and returns a channel delivering decoded root Events, an unsubscribe function, and an error. The returned channel is closed when the unsubscribe func is called, ctx is cancelled, or the backend is closed.

func (*NATSBackend) SubscribeAuditEvents

func (n *NATSBackend) SubscribeAuditEvents(
	ctx context.Context, subjectID, action string,
) (<-chan *AuditEvent, func(), error)

SubscribeAuditEvents subscribes to AuditEvents on the subject derived from subjectID/action. See SubscribeSessionEvents for semantics.

func (*NATSBackend) SubscribeNodeEvents

func (n *NATSBackend) SubscribeNodeEvents(
	ctx context.Context, nodeID, action string,
) (<-chan *NodeEvent, func(), error)

SubscribeNodeEvents subscribes to NodeEvents on the subject derived from nodeID/action and decodes each delivered payload per the backend's wire format. In WireAvro mode the payload is decoded via the registry (writer resolution) to the configured reader schema; in WireJSON mode it is JSON-unmarshalled.

func (*NATSBackend) SubscribeSchedulerEvents

func (n *NATSBackend) SubscribeSchedulerEvents(
	ctx context.Context, entityID, action string,
) (<-chan *SchedulerEvent, func(), error)

SubscribeSchedulerEvents subscribes to SchedulerEvents on the subject derived from entityID/action. See SubscribeSessionEvents for semantics.

func (*NATSBackend) SubscribeSessionEvents

func (n *NATSBackend) SubscribeSessionEvents(
	ctx context.Context, sessionID, action string,
) (<-chan *SessionEvent, func(), error)

SubscribeSessionEvents subscribes to SessionEvents on the subject derived from sessionID/action and decodes each payload per the backend's wire format. In WireAvro mode the payload is decoded via the registry (writer resolution) to the configured writer schema as reader; in WireJSON mode it is JSON-unmarshalled.

type NATSBackendOption

type NATSBackendOption func(*NATSBackend)

NATSBackendOption configures a NATSBackend at construction time.

func WithAvro

func WithAvro(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption

WithAvro switches the backend's NodeEvent helpers to the Avro wire format, using writer as the publish-side schema and registry for subscribe-side writer-schema resolution. registry MUST have the writer schema (and any other acceptable writer schemas, e.g. both V1 and V2) registered. Passing this option does NOT change the generic Publish/Subscribe byte round-trip — it changes how PublishNodeEvent / SubscribeNodeEvents project a NodeEvent on/off the wire.

func WithAvroAudit

func WithAvroAudit(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption

WithAvroAudit switches AuditEvent to the Avro wire format. See WithAvroSession for semantics.

func WithAvroEvents

func WithAvroEvents(registry *SchemaRegistry) NATSBackendOption

WithAvroEvents switches the backend to the Avro wire format for ALL three non-Node event families at once, registering each family's V1 writer schema in registry so the subscribe side resolves the writer fingerprint regardless of which type produced a payload. It is the typed-event analogue of WithAvro for NodeEvent and composes with it (NodeEvent + the three families can all be Avro on the same backend). registry MUST be non-nil; it is shared as the resolution surface for every typed Subscribe* call.

func WithAvroScheduler

func WithAvroScheduler(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption

WithAvroScheduler switches SchedulerEvent to the Avro wire format. See WithAvroSession for semantics.

func WithAvroSession

func WithAvroSession(writer *AvroSchema, registry *SchemaRegistry) NATSBackendOption

WithAvroSession switches SessionEvent to the Avro wire format using writer as the publish-side schema and registry for subscribe-side resolution, registering writer in registry. JSON remains the default for any family not opted in.

type NodeEvent

type NodeEvent struct {
	// RunID is a per-test / per-invocation UUID embedded by the publisher so
	// the subscriber can assert it received the IDENTICAL event (not a
	// different one from a previous run). Required per §7.1 anti-bluff.
	RunID string `json:"run_id"`

	// NodeID is the unique identifier of the reporting node.
	NodeID string `json:"node_id"`

	// Action describes what happened (e.g. "heartbeat", "join", "leave",
	// "failure"). It maps to the third token of the subject triple.
	Action string `json:"action"`

	// Timestamp is the wall-clock time at which the event was created.
	Timestamp time.Time `json:"timestamp"`

	// Metadata is a free-form key-value bag for additional context.
	Metadata map[string]string `json:"metadata,omitempty"`
}

NodeEvent is a structured event emitted by a Helix cluster node. It is the canonical payload for messages published on the StreamNodes stream.

func DecodeNodeEventAvro

func DecodeNodeEventAvro(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (*NodeEvent, error)

DecodeNodeEventAvro decodes Avro single-object bytes into a NodeEvent. It reads the writer fingerprint from the header, resolves the writer schema via the registry (rejecting an unregistered fingerprint), and resolves the result to readerSchema — so a V1 reader can decode a V2 payload (skips action) and a V2 reader can decode a V1 payload (action defaulted).

func UnmarshalNodeEvent

func UnmarshalNodeEvent(payload []byte) (*NodeEvent, error)

UnmarshalNodeEvent deserialises a NodeEvent from raw JSON bytes. It is the inverse of MarshalPayload and is the canonical way to decode Event.Payload on the subscriber side.

func (*NodeEvent) MarshalPayload

func (e *NodeEvent) MarshalPayload() ([]byte, error)

MarshalPayload serialises the NodeEvent to JSON bytes suitable for embedding in an Event.Payload.

func (*NodeEvent) Subject

func (e *NodeEvent) Subject() string

Subject returns the canonical NATS subject for this NodeEvent. It follows the {domain}.{entity_id}.{action} convention.

type RealJetStreamAdmin

type RealJetStreamAdmin struct {
	// contains filtered or unexported fields
}

RealJetStreamAdmin wraps natsgo.JetStreamContext to satisfy JetStreamAdmin. Construct via NewRealJetStreamAdmin.

func NewRealJetStreamAdmin

func NewRealJetStreamAdmin(js natsgo.JetStreamContext) *RealJetStreamAdmin

NewRealJetStreamAdmin creates a JetStreamAdmin backed by js.

func (*RealJetStreamAdmin) AddStream

AddStream delegates to natsgo.JetStreamContext.AddStream.

func (*RealJetStreamAdmin) StreamInfo

func (r *RealJetStreamAdmin) StreamInfo(name string) (*natsgo.StreamInfo, error)

StreamInfo delegates to natsgo.JetStreamContext.StreamInfo.

func (*RealJetStreamAdmin) UpdateStream

func (r *RealJetStreamAdmin) UpdateStream(cfg *natsgo.StreamConfig) (*natsgo.StreamInfo, error)

UpdateStream delegates to natsgo.JetStreamContext.UpdateStream.

type SchedulerEvent

type SchedulerEvent struct {
	RunID     string    `json:"run_id"`
	EntityID  string    `json:"entity_id"`
	Action    string    `json:"action"`
	Timestamp time.Time `json:"timestamp"`
}

SchedulerEvent is a scheduler event (assignment / preemption / queue change) carried on the HELIX_SCHEDULER stream under "scheduler.>". EntityID is the scheduled unit (e.g. a task or run id).

func DecodeSchedulerEventAvro

func DecodeSchedulerEventAvro(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (*SchedulerEvent, error)

DecodeSchedulerEventAvro decodes Avro single-object bytes into a SchedulerEvent.

func UnmarshalSchedulerEvent

func UnmarshalSchedulerEvent(payload []byte) (*SchedulerEvent, error)

UnmarshalSchedulerEvent deserialises a SchedulerEvent from raw JSON bytes.

func (*SchedulerEvent) MarshalPayload

func (e *SchedulerEvent) MarshalPayload() ([]byte, error)

MarshalPayload serialises the SchedulerEvent to JSON bytes for Event.Payload.

func (*SchedulerEvent) Subject

func (e *SchedulerEvent) Subject() string

Subject returns the canonical "scheduler.{EntityID}.{Action}" subject.

type SchemaRegistry

type SchemaRegistry struct {
	// contains filtered or unexported fields
}

SchemaRegistry maps writer-schema fingerprints to their schemas so a decoder can look up the schema a payload was written with (validated routing). It is the "schema-validated routing" surface: a payload whose fingerprint is not registered is rejected rather than mis-decoded.

func NewSchemaRegistry

func NewSchemaRegistry() *SchemaRegistry

NewSchemaRegistry returns an empty registry.

func (*SchemaRegistry) Lookup

func (r *SchemaRegistry) Lookup(fp uint64) (*AvroSchema, bool)

Lookup returns the registered schema for a fingerprint, or (nil,false).

func (*SchemaRegistry) Register

func (r *SchemaRegistry) Register(s *AvroSchema)

Register adds a schema to the registry keyed by its fingerprint. Registering the same schema twice is idempotent.

type SessionEvent

type SessionEvent struct {
	RunID     string    `json:"run_id"`
	SessionID string    `json:"session_id"`
	Action    string    `json:"action"`
	Timestamp time.Time `json:"timestamp"`
}

SessionEvent is a session-lifecycle event (create / attach / detach / terminate) carried on the HELIX_SESSIONS stream under "sessions.>".

func DecodeSessionEventAvro

func DecodeSessionEventAvro(data []byte, registry *SchemaRegistry, readerSchema *AvroSchema) (*SessionEvent, error)

DecodeSessionEventAvro decodes Avro single-object bytes into a SessionEvent.

func UnmarshalSessionEvent

func UnmarshalSessionEvent(payload []byte) (*SessionEvent, error)

UnmarshalSessionEvent deserialises a SessionEvent from raw JSON bytes.

func (*SessionEvent) MarshalPayload

func (e *SessionEvent) MarshalPayload() ([]byte, error)

MarshalPayload serialises the SessionEvent to JSON bytes for Event.Payload.

func (*SessionEvent) Subject

func (e *SessionEvent) Subject() string

Subject returns the canonical "sessions.{SessionID}.{Action}" subject.

type WireFormat

type WireFormat int

WireFormat selects the serialization used on the NATS wire path.

const (
	// WireJSON is the default, back-compatible JSON serialization.
	WireJSON WireFormat = iota
	// WireAvro carries the payload as an Avro single-object-encoded record
	// (0xC3 0x01 + writer fingerprint + body), decoded via a SchemaRegistry.
	WireAvro
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL