Documentation
¶
Overview ¶
Package activity pushes per-session agent.Event values to the platform's /api/sessions/<id>/activity endpoint asynchronously.
The poster is a fire-and-forget seam: the runner Send()s every event it observes, the poster maps it to the platform's activity wire shape (thought / action / response / error / context), and a background worker drains the queue with bounded retries. Send() never blocks — when the queue is full events are dropped with a warn log so the runner cannot stall on platform I/O.
Endpoint contract: POST /api/sessions/<id>/activity. Body:
{
"workerId": "wkr_xxx",
"activity": {
"type": "thought" | "action" | "response" | "error" | "context",
"content": "string",
"toolName": "?optional",
"toolInput": {"...": "?optional object"},
"toolCategory": "?optional",
"toolOutput": "?optional",
"timestamp": "?ISO 8601 — server defaults"
}
}
Auth: Bearer runtime token (refreshed via [Config.CredentialProvider] — same model as github.com/RenseiAI/donmai/runtime/heartbeat).
Side effect: on the first successful activity POST the poster also fires a single best-effort POST /api/sessions/<id>/status with {"status":"running","workerId":"..."} to transition the platform-side session row from "claimed" to "running". This is gated by an atomic.Bool — terminal status is owned by [result.Poster].
Per-session lifecycle: build one Poster per [Runner.Run] (use the queued-work session id), call Start, defer Stop. Stop blocks for a short drain window before exiting.
Index ¶
Constants ¶
const ( // DefaultQueueSize is the bounded send-queue capacity. Sized so the // runner can buffer one tool-heavy minute (~256 events at typical // AssistantText + ToolUse cadence) before the queue starts dropping // on backpressure. DefaultQueueSize = 256 // DefaultMaxRetries is the per-event HTTP retry budget. After the // limit the event is dropped with a warn log; we never re-queue — // the platform's activity table is best-effort observability. DefaultMaxRetries = 5 // DefaultInitialBackoff is the first-retry sleep; subsequent // retries double up to MaxBackoff. Matches result.Poster's pattern // (1s base, exponential), but tighter on the floor because activity // events are higher-volume. DefaultInitialBackoff = 250 * time.Millisecond // DefaultMaxBackoff caps the exponential backoff for a single // event's retry loop. DefaultMaxBackoff = 5 * time.Second // DefaultHTTPTimeout is the per-request timeout used when the // caller does not inject an [http.Client]. Activity posts are tiny; // 5s is generous. DefaultHTTPTimeout = 5 * time.Second // DefaultStopDrainTimeout caps how long Stop waits for in-flight // jobs to drain before the worker goroutine returns. DefaultStopDrainTimeout = 2 * time.Second // MaxToolSummaryChars caps the activity content for ToolUseEvent / // ToolResultEvent so the platform's activity buffer doesn't grow // unbounded on noisy tool outputs. MaxToolSummaryChars = 200 // MaxToolOutputChars caps ToolResultEvent.Content forwarded as // toolOutput. Generous enough to preserve the gh pr create URL the // platform-side parser scans for. MaxToolOutputChars = 500 )
Default values for Config. Exposed so callers can tune individual knobs without re-deriving the rest.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// SessionID is the platform session UUID (path param of
// /api/sessions/<id>/activity). Required.
SessionID string
// WorkerID is the daemon worker that owns the session. Sent in the
// request body so the platform can verify ownership.
WorkerID string
// BaseURL is the platform API base, e.g. "https://app.rensei.ai".
// Required.
BaseURL string
// AuthToken is sent as Bearer in the Authorization header. Empty
// means no auth header — used by tests against unauthenticated
// httptest.Server instances.
AuthToken string
// CredentialProvider returns the latest worker id + runtime token.
// When set, every HTTP attempt calls it before posting so child
// runners pick up daemon-side runtime-token refreshes mid-session.
CredentialProvider CredentialProvider
// HTTPClient overrides http.DefaultClient.
HTTPClient *http.Client
// Logger overrides slog.Default(). The poster logs at debug for
// successful posts and warn for drops / unrecoverable failures.
Logger *slog.Logger
// Now overrides time.Now for deterministic tests.
Now func() time.Time
// Sleep overrides time.Sleep for retry backoff in tests.
Sleep func(time.Duration)
// ProviderName identifies the AgentRuntime provider that emitted the
// events ("claude", "codex", "stub", …). The platform-side hook bridge
// uses it to build a ProviderRef for Layer 6 hook events when
// translating activities into pre-tool-use / post-tool-use payloads.
// Empty is permitted; the bridge falls back to "unknown".
ProviderName string
// QueueSize overrides DefaultQueueSize.
QueueSize int
// MaxRetries overrides DefaultMaxRetries.
MaxRetries int
// InitialBackoff overrides DefaultInitialBackoff.
InitialBackoff time.Duration
// MaxBackoff overrides DefaultMaxBackoff.
MaxBackoff time.Duration
// StopDrainTimeout overrides DefaultStopDrainTimeout.
StopDrainTimeout time.Duration
}
Config carries the inputs Poster needs. SessionID and BaseURL are required; the rest have sensible defaults.
type CredentialProvider ¶
type CredentialProvider func(context.Context) (RuntimeCredentials, error)
CredentialProvider returns the freshest worker runtime credentials available to the caller. Implementations should be cheap and concurrency-safe; the poster invokes it before every HTTP retry so daemon-side runtime-token refreshes propagate without restart.
type Poster ¶
type Poster struct {
// contains filtered or unexported fields
}
Poster pushes agent.Event values to the platform's /api/sessions/<id>/activity endpoint asynchronously, with a bounded retry queue. Construct via New, call Poster.Start to launch the worker, Poster.Send to enqueue events, and Poster.Stop to drain + shut down. All methods are safe for concurrent use.
func New ¶
New validates cfg and returns a non-started Poster. Returns an error when SessionID or BaseURL is missing.
func (*Poster) Send ¶
Send enqueues ev for async delivery. Non-blocking — if the queue is full or the poster is stopped/never-started, the event is dropped with a warn log. Events that map to nothing (Init / System / ToolProgress) are filtered up-front so they never consume queue capacity.
func (*Poster) Start ¶
Start launches the background worker goroutine. Idempotent: subsequent calls are no-ops. The supplied ctx scopes the worker's lifetime; the worker also exits when Poster.Stop closes the queue.
type RuntimeCredentials ¶
RuntimeCredentials are the bearer-token credentials needed for an activity post. Empty fields fall back to the corresponding Config fields. Mirrored on github.com/RenseiAI/donmai/runtime/heartbeat.RuntimeCredentials for symmetry — the runner builds one provider for both seams.