activity

package
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package activity pushes per-session agent.Event values to the platform's /api/sessions/<id>/activity endpoint asynchronously.

The poster is a fire-and-forget seam: the runner Send()s every event it observes, the poster maps it to the platform's activity wire shape (thought / action / response / error / context), and a background worker drains the queue with bounded retries. Send() never blocks — when the queue is full events are dropped with a warn log so the runner cannot stall on platform I/O.

Endpoint contract: POST /api/sessions/<id>/activity. Body:

{
  "workerId": "wkr_xxx",
  "activity": {
    "type": "thought" | "action" | "response" | "error" | "context",
    "content": "string",
    "toolName": "?optional",
    "toolInput": {"...": "?optional object"},
    "toolCategory": "?optional",
    "toolOutput": "?optional",
    "timestamp": "?ISO 8601 — server defaults"
  }
}

Auth: Bearer runtime token (refreshed via [Config.CredentialProvider] — same model as github.com/RenseiAI/donmai/runtime/heartbeat).

Side effect: on the first successful activity POST the poster also fires a single best-effort POST /api/sessions/<id>/status with {"status":"running","workerId":"..."} to transition the platform-side session row from "claimed" to "running". This is gated by an atomic.Bool — terminal status is owned by [result.Poster].

Per-session lifecycle: build one Poster per [Runner.Run] (use the queued-work session id), call Start, defer Stop. Stop blocks for a short drain window before exiting.

Index

Constants

View Source
const (
	// DefaultQueueSize is the bounded send-queue capacity. Sized so the
	// runner can buffer one tool-heavy minute (~256 events at typical
	// AssistantText + ToolUse cadence) before the queue starts dropping
	// on backpressure.
	DefaultQueueSize = 256

	// DefaultMaxRetries is the per-event HTTP retry budget. After the
	// limit the event is dropped with a warn log; we never re-queue —
	// the platform's activity table is best-effort observability.
	DefaultMaxRetries = 5

	// DefaultInitialBackoff is the first-retry sleep; subsequent
	// retries double up to MaxBackoff. Matches result.Poster's pattern
	// (1s base, exponential), but tighter on the floor because activity
	// events are higher-volume.
	DefaultInitialBackoff = 250 * time.Millisecond

	// DefaultMaxBackoff caps the exponential backoff for a single
	// event's retry loop.
	DefaultMaxBackoff = 5 * time.Second

	// DefaultHTTPTimeout is the per-request timeout used when the
	// caller does not inject an [http.Client]. Activity posts are tiny;
	// 5s is generous.
	DefaultHTTPTimeout = 5 * time.Second

	// DefaultStopDrainTimeout caps how long Stop waits for in-flight
	// jobs to drain before the worker goroutine returns.
	DefaultStopDrainTimeout = 2 * time.Second

	// MaxToolSummaryChars caps the activity content for ToolUseEvent /
	// ToolResultEvent so the platform's activity buffer doesn't grow
	// unbounded on noisy tool outputs.
	MaxToolSummaryChars = 200

	// MaxToolOutputChars caps ToolResultEvent.Content forwarded as
	// toolOutput. Generous enough to preserve the gh pr create URL the
	// platform-side parser scans for.
	MaxToolOutputChars = 500
)

Default values for Config. Exposed so callers can tune individual knobs without re-deriving the rest.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// SessionID is the platform session UUID (path param of
	// /api/sessions/<id>/activity). Required.
	SessionID string
	// WorkerID is the daemon worker that owns the session. Sent in the
	// request body so the platform can verify ownership.
	WorkerID string
	// BaseURL is the platform API base, e.g. "https://app.rensei.ai".
	// Required.
	BaseURL string
	// AuthToken is sent as Bearer in the Authorization header. Empty
	// means no auth header — used by tests against unauthenticated
	// httptest.Server instances.
	AuthToken string
	// CredentialProvider returns the latest worker id + runtime token.
	// When set, every HTTP attempt calls it before posting so child
	// runners pick up daemon-side runtime-token refreshes mid-session.
	CredentialProvider CredentialProvider

	// HTTPClient overrides http.DefaultClient.
	HTTPClient *http.Client
	// Logger overrides slog.Default(). The poster logs at debug for
	// successful posts and warn for drops / unrecoverable failures.
	Logger *slog.Logger
	// Now overrides time.Now for deterministic tests.
	Now func() time.Time
	// Sleep overrides time.Sleep for retry backoff in tests.
	Sleep func(time.Duration)

	// ProviderName identifies the AgentRuntime provider that emitted the
	// events ("claude", "codex", "stub", …). The platform-side hook bridge
	// uses it to build a ProviderRef for Layer 6 hook events when
	// translating activities into pre-tool-use / post-tool-use payloads.
	// Empty is permitted; the bridge falls back to "unknown".
	ProviderName string

	// QueueSize overrides DefaultQueueSize.
	QueueSize int
	// MaxRetries overrides DefaultMaxRetries.
	MaxRetries int
	// InitialBackoff overrides DefaultInitialBackoff.
	InitialBackoff time.Duration
	// MaxBackoff overrides DefaultMaxBackoff.
	MaxBackoff time.Duration
	// StopDrainTimeout overrides DefaultStopDrainTimeout.
	StopDrainTimeout time.Duration
}

Config carries the inputs Poster needs. SessionID and BaseURL are required; the rest have sensible defaults.

type CredentialProvider

type CredentialProvider func(context.Context) (RuntimeCredentials, error)

CredentialProvider returns the freshest worker runtime credentials available to the caller. Implementations should be cheap and concurrency-safe; the poster invokes it before every HTTP retry so daemon-side runtime-token refreshes propagate without restart.

type Poster

type Poster struct {
	// contains filtered or unexported fields
}

Poster pushes agent.Event values to the platform's /api/sessions/<id>/activity endpoint asynchronously, with a bounded retry queue. Construct via New, call Poster.Start to launch the worker, Poster.Send to enqueue events, and Poster.Stop to drain + shut down. All methods are safe for concurrent use.

func New

func New(cfg Config) (*Poster, error)

New validates cfg and returns a non-started Poster. Returns an error when SessionID or BaseURL is missing.

func (*Poster) Send

func (p *Poster) Send(_ context.Context, ev agent.Event)

Send enqueues ev for async delivery. Non-blocking — if the queue is full or the poster is stopped/never-started, the event is dropped with a warn log. Events that map to nothing (Init / System / ToolProgress) are filtered up-front so they never consume queue capacity.

func (*Poster) Start

func (p *Poster) Start(ctx context.Context) error

Start launches the background worker goroutine. Idempotent: subsequent calls are no-ops. The supplied ctx scopes the worker's lifetime; the worker also exits when Poster.Stop closes the queue.

func (*Poster) Stop

func (p *Poster) Stop() error

Stop closes the queue, waits up to [Config.StopDrainTimeout] for in-flight jobs to drain, and then returns. Idempotent and safe to call from a deferred runner cleanup path.

type RuntimeCredentials

type RuntimeCredentials struct {
	WorkerID  string
	AuthToken string
}

RuntimeCredentials are the bearer-token credentials needed for an activity post. Empty fields fall back to the corresponding Config fields. Mirrored on github.com/RenseiAI/donmai/runtime/heartbeat.RuntimeCredentials for symmetry — the runner builds one provider for both seams.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL