Documentation
¶
Overview ¶
Package events provides a publish/subscribe event bus for operational observability. Events flow from components (agent loop, signal bridge, scheduler, etc.) to subscribers (WebSocket handler, future metrics collector). The bus is nil-safe: calling Publish on a nil *Bus is a no-op, so components do not need guard checks.
Index ¶
Constants ¶
const ( // SourceAgent identifies events from the core agent loop. SourceAgent = "agent" // SourceSignal identifies events from the Signal bridge. SourceSignal = "signal" // SourceDelegate identifies events from delegate task execution. SourceDelegate = "delegate" // SourceEmail identifies events from the email poller. SourceEmail = "email" // SourceMetacog identifies events from the metacognitive loop. SourceMetacog = "metacog" // SourceScheduler identifies events from the task scheduler. SourceScheduler = "scheduler" // SourceLoop identifies events from persistent loop goroutines. SourceLoop = "loop" )
Source constants identify which component published an event.
const ( // KindRequestStart signals the beginning of an agent request. // Data: request_id, conversation_id, channel. KindRequestStart = "request_start" // KindLLMCall signals the start of an LLM API call. // Data: request_id, iter, model. KindLLMCall = "llm_call" // KindLLMResponse signals completion of an LLM API call. // Data: request_id, iter, model, tokens_in, tokens_out, // cost_usd, tool_calls. KindLLMResponse = "llm_response" // KindToolCall signals the start of a tool execution. // Data: request_id, tool. KindToolCall = "tool_call" // KindToolDone signals completion of a tool execution. // Data: request_id, tool, ok, duration_ms. KindToolDone = "tool_done" // KindRequestComplete signals the end of an agent request. // Data: request_id, model, iterations, total_tokens_in, // total_tokens_out, total_cost_usd, elapsed_ms. KindRequestComplete = "request_complete" // KindMessageReceived signals an incoming Signal message. // Data: sender, conversation_id, message_len. KindMessageReceived = "message_received" // KindReactionReceived signals an incoming Signal reaction. // Data: sender, emoji. KindReactionReceived = "reaction_received" // KindSessionRotated signals a session was rotated due to inactivity. // Data: conversation_id, sender. KindSessionRotated = "session_rotated" // KindSpawn signals a delegate task was spawned. // Data: delegate_id, profile, task_len. KindSpawn = "spawn" // KindComplete signals a delegate task completed. // Data: delegate_id, iterations, total_tokens_in, // total_tokens_out, total_cost_usd, exhausted. KindComplete = "complete" // KindPollStart signals the start of an email poll cycle. // Data: accounts. KindPollStart = "poll_start" // KindPollComplete signals the end of an email poll cycle. // Data: new_messages, accounts. KindPollComplete = "poll_complete" // KindIterationStart signals the start of a metacognitive iteration. // Data: conversation_id, supervisor. KindIterationStart = "iteration_start" // KindSleepAdjust signals a metacognitive sleep duration change. // Data: sleep_seconds. KindSleepAdjust = "sleep_adjust" // KindTaskFired signals a scheduled task has begun executing. // Data: task_id, task_name. KindTaskFired = "task_fired" // KindTaskComplete signals a scheduled task has finished executing. // Data: task_id, task_name, ok, duration_ms. KindTaskComplete = "task_complete" // KindLoopStarted signals a loop goroutine has started. // Data: loop_id, loop_name, parent_id. KindLoopStarted = "loop_started" // KindLoopStopped signals a loop goroutine has stopped. // Data: loop_id, loop_name, iterations, attempts. KindLoopStopped = "loop_stopped" // KindLoopIterationStart signals the beginning of a loop iteration. // Data: loop_id, loop_name, conversation_id, supervisor, attempt. KindLoopIterationStart = "loop_iteration_start" // KindLoopIterationComplete signals the end of a loop iteration. // Data: loop_id, loop_name, model, input_tokens, output_tokens, elapsed_ms. KindLoopIterationComplete = "loop_iteration_complete" // KindLoopSleepStart signals a loop has entered its sleep phase. // Data: loop_id, loop_name, sleep_duration, initial (bool, true // only for the pre-first-iteration startup sleep). KindLoopSleepStart = "loop_sleep_start" // KindLoopWaitStart signals a loop has entered its wait phase, // blocking on a WaitFunc for an external event. // Data: loop_id, loop_name. KindLoopWaitStart = "loop_wait_start" // KindLoopStateChange signals a loop has changed lifecycle state. // Data: loop_id, loop_name, from, to. KindLoopStateChange = "loop_state_change" // KindLoopError signals a loop iteration encountered an error. // Data: loop_id, loop_name, error. KindLoopError = "loop_error" // KindLoopToolStart signals a tool execution has begun within a // loop iteration. Data: loop_id, loop_name, tool. KindLoopToolStart = "loop_tool_start" // KindLoopToolDone signals a tool execution has completed within a // loop iteration. Data: loop_id, loop_name, tool, error (if failed). KindLoopToolDone = "loop_tool_done" // KindLoopLLMStart signals an LLM API call is about to begin during // a loop iteration. Data: loop_id, loop_name, model. KindLoopLLMStart = "loop_llm_start" // KindLoopLLMResponse signals an LLM response was received during a // loop iteration. Data: loop_id, loop_name, model, input_tokens, // output_tokens. KindLoopLLMResponse = "loop_llm_response" )
Kind constants describe the type of event within a source.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is a non-blocking broadcast event bus. Subscribers receive events on buffered channels; slow subscribers miss events rather than blocking publishers.
func (*Bus) Publish ¶
Publish sends an event to all subscribers. Non-blocking: if a subscriber's channel is full, the event is dropped for that subscriber. Safe to call on a nil receiver (no-op).
func (*Bus) Subscribe ¶
Subscribe returns a channel that receives published events. The caller must eventually call Unsubscribe to avoid resource leaks. bufSize controls the channel buffer; 64 is a reasonable default for WebSocket consumers.
func (*Bus) SubscriberCount ¶
SubscriberCount returns the number of active subscribers.
func (*Bus) Unsubscribe ¶
Unsubscribe removes a subscription and closes the channel. Safe to call with a channel that is already unsubscribed (no-op).
type Event ¶
type Event struct {
// Timestamp is when the event occurred.
Timestamp time.Time `json:"ts"`
// Source identifies the component that published the event.
Source string `json:"source"`
// Kind describes the type of event within the source.
Kind string `json:"kind"`
// Data holds event-specific key/value pairs.
Data map[string]any `json:"data,omitempty"`
}
Event represents a single operational event published by a component.