events

package
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package events provides a publish/subscribe event bus for operational observability. Events flow from components (agent loop, signal bridge, scheduler, etc.) to subscribers (WebSocket handler, future metrics collector). The bus is nil-safe: calling Publish on a nil *Bus is a no-op, so components do not need guard checks.

Index

Constants

View Source
const (
	// SourceAgent identifies events from the core agent loop.
	SourceAgent = "agent"
	// SourceSignal identifies events from the Signal bridge.
	SourceSignal = "signal"
	// SourceDelegate identifies events from delegate task execution.
	SourceDelegate = "delegate"
	// SourceEmail identifies events from the email poller.
	SourceEmail = "email"
	// SourceMetacog identifies events from the metacognitive loop.
	SourceMetacog = "metacog"
	// SourceScheduler identifies events from the task scheduler.
	SourceScheduler = "scheduler"
	// SourceLoop identifies events from persistent loop goroutines.
	SourceLoop = "loop"
)

Source constants identify which component published an event.

View Source
const (
	// KindRequestStart signals the beginning of an agent request.
	// Data: request_id, conversation_id, channel.
	KindRequestStart = "request_start"
	// KindLLMCall signals the start of an LLM API call.
	// Data: request_id, iter, model.
	KindLLMCall = "llm_call"
	// KindLLMResponse signals completion of an LLM API call.
	// Data: request_id, iter, model, tokens_in, tokens_out,
	// cost_usd, tool_calls.
	KindLLMResponse = "llm_response"
	// KindToolCall signals the start of a tool execution.
	// Data: request_id, tool.
	KindToolCall = "tool_call"
	// KindToolDone signals completion of a tool execution.
	// Data: request_id, tool, ok, duration_ms.
	KindToolDone = "tool_done"
	// KindRequestComplete signals the end of an agent request.
	// Data: request_id, model, iterations, total_tokens_in,
	// total_tokens_out, total_cost_usd, elapsed_ms.
	KindRequestComplete = "request_complete"

	// KindMessageReceived signals an incoming Signal message.
	// Data: sender, conversation_id, message_len.
	KindMessageReceived = "message_received"
	// KindReactionReceived signals an incoming Signal reaction.
	// Data: sender, emoji.
	KindReactionReceived = "reaction_received"
	// KindSessionRotated signals a session was rotated due to inactivity.
	// Data: conversation_id, sender.
	KindSessionRotated = "session_rotated"

	// KindSpawn signals a delegate task was spawned.
	// Data: delegate_id, profile, task_len.
	KindSpawn = "spawn"
	// KindComplete signals a delegate task completed.
	// Data: delegate_id, iterations, total_tokens_in,
	// total_tokens_out, total_cost_usd, exhausted.
	KindComplete = "complete"

	// KindPollStart signals the start of an email poll cycle.
	// Data: accounts.
	KindPollStart = "poll_start"
	// KindPollComplete signals the end of an email poll cycle.
	// Data: new_messages, accounts.
	KindPollComplete = "poll_complete"

	// KindIterationStart signals the start of a metacognitive iteration.
	// Data: conversation_id, supervisor.
	KindIterationStart = "iteration_start"
	// KindSleepAdjust signals a metacognitive sleep duration change.
	// Data: sleep_seconds.
	KindSleepAdjust = "sleep_adjust"

	// KindTaskFired signals a scheduled task has begun executing.
	// Data: task_id, task_name.
	KindTaskFired = "task_fired"
	// KindTaskComplete signals a scheduled task has finished executing.
	// Data: task_id, task_name, ok, duration_ms.
	KindTaskComplete = "task_complete"

	// KindLoopStarted signals a loop goroutine has started.
	// Data: loop_id, loop_name, parent_id.
	KindLoopStarted = "loop_started"
	// KindLoopStopped signals a loop goroutine has stopped.
	// Data: loop_id, loop_name, iterations, attempts.
	KindLoopStopped = "loop_stopped"
	// KindLoopIterationStart signals the beginning of a loop iteration.
	// Data: loop_id, loop_name, conversation_id, supervisor, attempt.
	KindLoopIterationStart = "loop_iteration_start"
	// KindLoopIterationComplete signals the end of a loop iteration.
	// Data: loop_id, loop_name, model, input_tokens, output_tokens, elapsed_ms.
	KindLoopIterationComplete = "loop_iteration_complete"
	// KindLoopSleepStart signals a loop has entered its sleep phase.
	// Data: loop_id, loop_name, sleep_duration, initial (bool, true
	// only for the pre-first-iteration startup sleep).
	KindLoopSleepStart = "loop_sleep_start"
	// KindLoopWaitStart signals a loop has entered its wait phase,
	// blocking on a WaitFunc for an external event.
	// Data: loop_id, loop_name.
	KindLoopWaitStart = "loop_wait_start"
	// KindLoopStateChange signals a loop has changed lifecycle state.
	// Data: loop_id, loop_name, from, to.
	KindLoopStateChange = "loop_state_change"
	// KindLoopError signals a loop iteration encountered an error.
	// Data: loop_id, loop_name, error.
	KindLoopError = "loop_error"
	// KindLoopToolStart signals a tool execution has begun within a
	// loop iteration. Data: loop_id, loop_name, tool.
	KindLoopToolStart = "loop_tool_start"
	// KindLoopToolDone signals a tool execution has completed within a
	// loop iteration. Data: loop_id, loop_name, tool, error (if failed).
	KindLoopToolDone = "loop_tool_done"
	// KindLoopLLMStart signals an LLM API call is about to begin during
	// a loop iteration. Data: loop_id, loop_name, model.
	KindLoopLLMStart = "loop_llm_start"
	// KindLoopLLMResponse signals an LLM response was received during a
	// loop iteration. Data: loop_id, loop_name, model, input_tokens,
	// output_tokens.
	KindLoopLLMResponse = "loop_llm_response"
)

Kind constants describe the type of event within a source.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is a non-blocking broadcast event bus. Subscribers receive events on buffered channels; slow subscribers miss events rather than blocking publishers.

func New

func New() *Bus

New creates a new event bus ready for use.

func (*Bus) Publish

func (b *Bus) Publish(e Event)

Publish sends an event to all subscribers. Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber. Safe to call on a nil receiver (no-op).

func (*Bus) Subscribe

func (b *Bus) Subscribe(bufSize int) <-chan Event

Subscribe returns a channel that receives published events. The caller must eventually call Unsubscribe to avoid resource leaks. bufSize controls the channel buffer; 64 is a reasonable default for WebSocket consumers.

func (*Bus) SubscriberCount

func (b *Bus) SubscriberCount() int

SubscriberCount returns the number of active subscribers.

func (*Bus) Unsubscribe

func (b *Bus) Unsubscribe(ch <-chan Event)

Unsubscribe removes a subscription and closes the channel. Safe to call with a channel that is already unsubscribed (no-op).

type Event

type Event struct {
	// Timestamp is when the event occurred.
	Timestamp time.Time `json:"ts"`
	// Source identifies the component that published the event.
	Source string `json:"source"`
	// Kind describes the type of event within the source.
	Kind string `json:"kind"`
	// Data holds event-specific key/value pairs.
	Data map[string]any `json:"data,omitempty"`
}

Event represents a single operational event published by a component.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL