events

package
v0.1.0-alpha.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package events provides a publish-subscribe event bus for process lifecycle notifications within the Kahi supervisor.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExpandWebhookEnv

func ExpandWebhookEnv(s string) (string, error)

ExpandWebhookEnv resolves ${VAR} references in a string from environment.

func ValidateWebhookURL

func ValidateWebhookURL(rawURL string, allowInsecure bool) error

ValidateWebhookURL checks that a URL is valid and uses HTTPS unless allow_insecure is set or it's a localhost URL.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is the central event dispatcher. It is safe for concurrent use. When no subscribers exist, Publish is a no-op with zero allocations.

func NewBus

func NewBus(logger *slog.Logger) *Bus

NewBus creates a new event bus.

func (*Bus) Publish

func (b *Bus) Publish(event Event)

Publish dispatches an event to all subscribers of the event type. Handlers are called synchronously in registration order. A panicking handler is recovered and logged; remaining handlers still execute.

func (*Bus) Subscribe

func (b *Bus) Subscribe(eventType EventType, handler HandlerFunc) uint64

Subscribe registers a handler for the given event type. Returns a subscription ID that can be used to unsubscribe.

func (*Bus) SubscriberCount

func (b *Bus) SubscriberCount(eventType EventType) int

SubscriberCount returns the number of subscribers for an event type.

func (*Bus) Unsubscribe

func (b *Bus) Unsubscribe(id uint64)

Unsubscribe removes a subscription by ID.

type Event

type Event struct {
	Type      EventType
	Timestamp time.Time
	Data      map[string]string
}

Event carries data from a published event.

type EventType

type EventType string

EventType identifies a specific event category.

const (
	ProcessStateStopped  EventType = "PROCESS_STATE_STOPPED"
	ProcessStateStarting EventType = "PROCESS_STATE_STARTING"
	ProcessStateRunning  EventType = "PROCESS_STATE_RUNNING"
	ProcessStateBackoff  EventType = "PROCESS_STATE_BACKOFF"
	ProcessStateStopping EventType = "PROCESS_STATE_STOPPING"
	ProcessStateExited   EventType = "PROCESS_STATE_EXITED"
	ProcessStateFatal    EventType = "PROCESS_STATE_FATAL"
)

Process state events.

const (
	ProcessLogStdout EventType = "PROCESS_LOG_STDOUT"
	ProcessLogStderr EventType = "PROCESS_LOG_STDERR"
)

Process log events.

const (
	SupervisorStateRunning  EventType = "SUPERVISOR_STATE_RUNNING"
	SupervisorStateStopping EventType = "SUPERVISOR_STATE_STOPPING"
)

Supervisor state events.

const (
	ProcessGroupAdded   EventType = "PROCESS_GROUP_ADDED"
	ProcessGroupRemoved EventType = "PROCESS_GROUP_REMOVED"
)

Process group events.

const (
	Tick5    EventType = "TICK_5"
	Tick60   EventType = "TICK_60"
	Tick3600 EventType = "TICK_3600"
)

Periodic tick events.

type HandlerFunc

type HandlerFunc func(Event)

HandlerFunc processes an event.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener represents a single event listener process.

type ListenerPool

type ListenerPool struct {
	// contains filtered or unexported fields
}

ListenerPool manages a pool of event listener processes that follow the READY/RESULT handshake protocol.

func NewListenerPool

func NewListenerPool(name string, bus *Bus, logger *slog.Logger, eventTypes []EventType) *ListenerPool

NewListenerPool creates a pool that dispatches events to listener processes.

func (*ListenerPool) AddListener

func (lp *ListenerPool) AddListener(name string, stdin io.Writer, stdout io.Reader)

AddListener registers a listener process with its stdin/stdout pipes.

func (*ListenerPool) Stop

func (lp *ListenerPool) Stop()

Stop shuts down the listener pool.

type ListenerState

type ListenerState int

ListenerState tracks event listener process state.

const (
	ListenerAcknowledged ListenerState = iota
	ListenerReady
	ListenerBusy
)

type Ticker

type Ticker struct {
	// contains filtered or unexported fields
}

Ticker emits periodic TICK events. Call Stop to shut it down.

func NewTicker

func NewTicker(bus *Bus) *Ticker

NewTicker starts emitting TICK_5, TICK_60, and TICK_3600 events.

func (*Ticker) Stop

func (t *Ticker) Stop()

Stop terminates the ticker goroutine and waits for it to finish.

type WebhookConfig

type WebhookConfig struct {
	Name          string
	URL           string
	Events        []EventType
	Headers       map[string]string
	Timeout       time.Duration
	MaxRetries    int
	Template      string // "generic", "slack", "pagerduty"
	AllowInsecure bool
}

WebhookConfig describes a single webhook destination.

type WebhookManager

type WebhookManager struct {
	// contains filtered or unexported fields
}

WebhookManager subscribes to events and delivers HTTP POST notifications.

func NewWebhookManager

func NewWebhookManager(bus *Bus, configs []WebhookConfig, logger *slog.Logger) *WebhookManager

NewWebhookManager creates a webhook manager and subscribes to events.

func (*WebhookManager) Stop

func (wm *WebhookManager) Stop()

Stop unsubscribes from all events.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL