chat

package
v0.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package chat is the substrate layer that backs coord's Post, Ask, and Subscribe on top of NATS JetStream. Send and Watch route through a per-project JetStream stream (chat-<proj> with subjects chat.<proj>.>); Request uses raw NATS request/reply on the ask subject family. See docs/adr/0047-chat-on-jetstream.md for the decision record covering why chat lives on a JetStream stream and not on EdgeSync notify + libfossil.

This package is internal and unexported: callers outside github.com/danmestas/bones must not depend on it. The Envelope type crosses the package boundary into coord where eventFromEnvelope translates it into coord.ChatMessage per ADR 0003's substrate-hiding rule; no chat type appears on any public coord signature.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("chat: manager is closed")

ErrClosed reports that a public method was called on a Manager whose Close has returned. Parallel to internal/tasks.ErrClosed and internal/holds.ErrClosed so every substrate manager surfaces the same close-race sentinel.

Functions

This section is empty.

Types

type Config

type Config struct {
	// AgentID identifies this chat instance across the substrate. It is
	// threaded through to outgoing Envelope as the From field so
	// receivers can attribute messages back to a sender.
	AgentID string

	// ProjectPrefix is the <proj> segment used to build chat subjects
	// (chat.<proj>.<short>) and ask subjects (<proj>.ask.<recipient>).
	// Derived at the coord layer from coord.Config.AgentID per ADR 0008;
	// the chat package takes it as pre-derived input.
	ProjectPrefix string

	// Nats is the pre-connected NATS handle from coord. The chat manager
	// does not dial its own connection — it shares the one coord opened
	// so reconnection policy, auth, and TLS remain a single-source
	// concern in coord.Config. The handle is used directly for Ask's
	// request/reply path and to construct the JetStream context for
	// Send/Watch.
	Nats *nats.Conn

	// MaxRetentionAge bounds the JetStream stream's MaxAge. Zero means
	// unbounded — chat history persists until disk is full or the
	// operator runs `nats stream purge`. Per ADR 0047 the default is
	// unbounded so coord.Prime preserves agent context across long
	// absences (ADR 0036).
	MaxRetentionAge time.Duration

	// MaxSubscribers caps the number of concurrent Watch callers coord
	// will hand out. Validated here so an obviously-broken value fails
	// at Open rather than at first subscribe.
	MaxSubscribers int
}

Config configures Open. The operator supplies the identity/routing fields; MaxRetentionAge is optional (0 = unbounded).

func (Config) Validate

func (c Config) Validate() error

Validate checks every Config field against its documented bounds and returns the first violation as an error. The error message follows the shape "chat.Config: <field>: <reason>". Validate is pure; it does not panic on bad operator input per invariant 9 — panics are reserved for programmer-error invariants inside Open and the wrappers.

type Envelope added in v0.8.0

type Envelope struct {
	ID        string    `json:"id"`
	From      string    `json:"from"`
	Thread    string    `json:"thread"` // 8-hex SHA-256 short
	Body      string    `json:"body"`
	Timestamp time.Time `json:"timestamp"`
	ReplyTo   string    `json:"reply_to,omitempty"`
}

Envelope is the wire format for a chat message on the JetStream stream. JSON-serialized and persisted across the retention window; possibly read by a consumer running a different bones version. This type is deliberately distinct from coord.ChatMessage (which is the public Go API) so the two can evolve under different stability rules — see ADR 0047.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns the JetStream stream that backs chat for one project. Send writes envelopes via js.PublishMsg with W3C trace context in headers; Watch returns ordered consumers scoped to the appropriate subject filter. Request and Respond use the raw *nats.Conn for ADR 0008's Ask substrate (request/reply on <proj>.ask.<recipient>).

Every public method is safe to call concurrently. Close is idempotent via an atomic CAS on closed. Coord owns the *nats.Conn lifecycle; chat is a borrower.

func Open

func Open(ctx context.Context, cfg Config) (*Manager, error)

Open validates cfg, opens (or creates) the per-project chat stream on the supplied NATS connection, and returns a Manager. Constructing a Manager does not consume a goroutine; Watch spawns one per call. Callers must invoke Close to release resources.

The stream name is derived as "chat-<ProjectPrefix>" with subjects "chat.<ProjectPrefix>.>" (so per-thread subjects fall under it). If the stream exists, retention is updated to match cfg.MaxRetentionAge — JetStream merges the config in place. If absent, the stream is created with file storage. cfg.MaxRetentionAge of 0 means unbounded retention, which preserves coord.Prime semantics across long absences (ADR 0036, ADR 0047).

Open does not dial NATS: the connection comes pre-wired from coord so reconnection, auth, and TLS stay a single-source concern.

func (*Manager) Close

func (m *Manager) Close() error

Close releases resources held by the Manager. The stream itself is not deleted (it persists across coord lifecycles; deletion is an operator decision via `nats stream delete`). The shared *nats.Conn is NOT closed here — coord owns the connection lifecycle. Subsequent calls are no-ops; safe to call more than once.

func (*Manager) ListThreads

func (m *Manager) ListThreads(ctx context.Context) ([]ThreadSummary, error)

ListThreads returns all threads on this Manager's project, sorted by last activity (most recent first). Reads via a one-shot ordered consumer with DeliverAll — see scanStream for complexity notes.

func (*Manager) Request

func (m *Manager) Request(
	ctx context.Context, subject string, payload []byte,
) ([]byte, error)

Request sends payload to subject via NATS request/reply and returns the reply payload. ctx bounds the wait — a deadline-less ctx on an offline recipient never returns. coord.Ask builds the subject as <proj>.ask.<recipient> and hands it to this method; chat itself is subject-agnostic so the same wrapper serves any future request/reply caller.

Errors from the NATS RequestWithContext path are wrapped with the chat.Request prefix so substrate failures are distinguishable from caller-contract violations. A nil Manager or nil ctx panics; empty subject panics; an empty payload is permitted because NATS treats zero-length payloads as valid.

func (*Manager) Respond

func (m *Manager) Respond(
	subject string,
	handler func(payload []byte) ([]byte, error),
) (func() error, error)

Respond registers a NATS subscription on subject that drives handler for every incoming request. handler receives the request payload and returns either a reply payload or an error. On a nil-error return, Respond publishes the reply via msg.Respond. On a non-nil error return, no reply is published — by design: the chat substrate does not model error payloads, so handler failure is surfaced to the Ask caller as a no-responders timeout.

The returned closure is an idempotent unsubscribe: the first call tears down the subscription; subsequent calls are no-ops and return nil. sync.Once-guarded so concurrent callers cannot double-close the underlying subscription.

Subject is forwarded to the NATS connection as-is; chat itself is subject-agnostic. coord.Answer supplies "<proj>.ask.<agentID>".

Returns ErrClosed if the Manager has been closed. A nil Manager or nil handler panics (programmer error); empty subject panics.

func (*Manager) Send

func (m *Manager) Send(
	ctx context.Context, thread, body string,
) error

Send publishes body to a chat thread. thread is a caller-supplied name that Manager maps to a deterministic 8-hex short via SHA-256. Two Managers on the same substrate that both post to "t1" compute the same short and therefore publish on the same NATS subject — cross-Manager and cross-restart thread identity falls out of the hash with no coordination substrate.

The publish is a single JetStream operation: durable in the stream AND visible to live subscribers in one round trip, with no fossil- then-NATS split. W3C trace context from ctx is injected into the NATS message headers via the standard `traceparent` header so subscribers can stitch spans across the publish/receive boundary.

Returns ErrClosed if the Manager has been closed. Any error from js.PublishMsg surfaces wrapped with the chat.Send prefix.

func (*Manager) ThreadsForAgent

func (m *Manager) ThreadsForAgent(
	ctx context.Context, agentID string, maxThreads int,
) ([]ThreadSummary, error)

ThreadsForAgent returns up to maxThreads recent threads where the given agentID has sent at least one message. Threads are sorted by last activity (most recent first). Used by coord.Prime per ADR 0036.

Implementation walks the stream once (O(N) in stream size), groups by Thread, and filters to threads where any envelope's From matches agentID. Same complexity class as the pre-ADR-0047 fossil checkin scan.

func (*Manager) Watch

func (m *Manager) Watch(
	ctx context.Context, thread string,
) <-chan Envelope

Watch returns a channel of Envelope values for the given thread. thread is a caller-supplied name — chat hashes it internally into the same deterministic 8-hex short that Send uses, so a Watch on "t1" receives every message any Manager has Sent to "t1" on this project/substrate. The channel closes when ctx is canceled.

The Envelope type crosses the package boundary because this is an INTERNAL package — the translation into coord.ChatMessage (ADR 0003, ADR 0047) lives in coord.

A nil Manager, nil ctx, or empty thread panics (programmer error). Use-after-close returns an already-closed channel rather than panicking so deferred consumer drain stays quiet.

func (*Manager) WatchAll

func (m *Manager) WatchAll(ctx context.Context) <-chan Envelope

WatchAll returns a channel of Envelope values for every thread in this Manager's project. The channel closes when ctx is canceled. This is the project-wide counterpart to Watch: coord.Subscribe routes through WatchAll when the caller passes an empty pattern.

A nil Manager or nil ctx panics (programmer error). Use-after-close returns an already-closed channel, same shape as Watch.

func (*Manager) WatchPattern

func (m *Manager) WatchPattern(
	ctx context.Context, pattern string,
) <-chan Envelope

WatchPattern returns a channel of Envelope values for every thread whose NATS subject segment matches pattern. Unlike Watch — which hashes its thread argument into a deterministic short — WatchPattern passes pattern through as the subject suffix, so callers can supply NATS subject wildcards ("*" for every thread, a literal short for a single known stream, or an already-hashed short).

Empty pattern is asserted — use WatchAll for project-wide streams. Use-after-close returns an already-closed channel, same shape as Watch. A nil Manager or nil ctx panics (programmer error).

type ThreadSummary

type ThreadSummary struct {
	// contains filtered or unexported fields
}

ThreadSummary is a read-only view of a chat thread for agent context recovery. Used by coord.Prime per ADR 0036.

func (ThreadSummary) LastActivity

func (t ThreadSummary) LastActivity() time.Time

func (ThreadSummary) LastBody

func (t ThreadSummary) LastBody() string

func (ThreadSummary) MessageCount

func (t ThreadSummary) MessageCount() int

func (ThreadSummary) ThreadShort

func (t ThreadSummary) ThreadShort() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL