Documentation
¶
Overview ¶
Package pubsub wraps Google Cloud Pub/Sub for AILANG messaging. Pub/Sub serves as a notification/transport layer on top of Firestore storage. Messages are stored durably in Firestore; Pub/Sub provides instant push notification that new work is available (replacing SQLite polling).
Index ¶
- Constants
- func DecodePushEnvelope(body io.Reader) (data []byte, attrs map[string]string, messageID string, err error)
- type CascadeEnvelopeFields
- type CascadeMessageData
- type Client
- func (c *Client) Close() error
- func (c *Client) Prefix() string
- func (c *Client) ProjectID() string
- func (c *Client) Subscription(baseName string) *pubsub.Subscription
- func (c *Client) SubscriptionName(baseName string) string
- func (c *Client) Topic(baseName string) *pubsub.Topic
- func (c *Client) TopicName(baseName string) string
- type MessageAttributes
- type MessageHandler
- type MessageNotification
- type Publisher
- func (p *Publisher) PublishCascade(ctx context.Context, messageID string, attrs MessageAttributes, ...) error
- func (p *Publisher) PublishCascadeWithEnvelope(ctx context.Context, messageID string, attrs MessageAttributes, ...) error
- func (p *Publisher) PublishCompletion(ctx context.Context, completion TaskCompletion, workspace string) error
- func (p *Publisher) PublishEvent(ctx context.Context, eventJSON []byte, eventType, taskID, workspace string) error
- func (p *Publisher) PublishMessage(ctx context.Context, messageID string, attrs MessageAttributes) error
- func (p *Publisher) PublishTask(ctx context.Context, taskID, agentID, workspace, provider string) error
- func (p *Publisher) Stop()
- type PushEnvelope
- type PushMessage
- type Subscriber
- type TaskCompletion
- type TaskDispatch
Constants ¶
const ( TopicMessages = "messages" // New message notifications (attribute-routed by inbox) TopicTasks = "tasks" // Task dispatch to Cloud Run Jobs TopicCompletions = "completions" // Task completion notifications TopicEvents = "events" // Real-time dashboard/laptop event streaming TopicDeadLetter = "dead-letter" // Failed message sink TopicCascade = "cascade" // Authoritative package cascade triggers (M-PKG-AUTONOMOUS-CASCADE-SAFE M2) )
Topic base names. The full topic name is "{prefix}-{base}".
const ( SubMessagesCoordinator = "messages-coordinator" // Cloud Run coordinator SubMessagesLaptop = "messages-laptop" // Developer laptop (pull) SubTasksExecutor = "tasks-executor" // Eventarc → Cloud Run Job SubCompletionsCoordinator = "completions-coordinator" // Coordinator receives results SubEventsDashboard = "events-dashboard" // Dashboard server SubEventsLaptop = "events-laptop" // Laptop real-time updates )
Subscription base names. The full name is "{prefix}-{base}".
const DefaultTopicPrefix = "ailang"
DefaultTopicPrefix is the default prefix for all AILANG Pub/Sub topics.
const SourceCascade = "cascade"
SourceCascade is the value for the "source" message attribute on cascade-topic publishes. Agents check this against the message they receive to decide whether to act on a bump request.
Variables ¶
This section is empty.
Functions ¶
func DecodePushEnvelope ¶
func DecodePushEnvelope(body io.Reader) (data []byte, attrs map[string]string, messageID string, err error)
DecodePushEnvelope reads a Pub/Sub push HTTP body, unmarshals the envelope, and base64-decodes the Data field. Returns the decoded data bytes, attributes map, and message ID.
Types ¶
type CascadeEnvelopeFields ¶ added in v0.14.3
type CascadeEnvelopeFields struct {
RootPackage string `json:"root_package"` // vendor/name@version
ChangeClass string `json:"change_class"` // "A" (content-only), "B" (additive), "C" (interface change)
FromVersion string `json:"from_version"`
ToVersion string `json:"to_version"`
FromInterfaceHash string `json:"from_interface_hash"`
ToInterfaceHash string `json:"to_interface_hash"`
FromContentHash string `json:"from_content_hash"`
ToContentHash string `json:"to_content_hash"`
EffectsWidened bool `json:"effects_widened"`
PrevEffectCeiling []string `json:"prev_effect_ceiling,omitempty"`
NewEffectCeiling []string `json:"new_effect_ceiling,omitempty"`
}
CascadeEnvelopeFields are the cascade-specific routing/classification fields the coordinator extracts at dispatch time. These map to attributes on the outgoing Pub/Sub message AND get embedded as JSON in the message data field (see PublishCascadeWithEnvelope) so the cloud coordinator can decide whether a deterministic bump is sufficient or AI escalation is needed without having to fetch the full envelope from a separate store.
M-PKG-CASCADE-DETERMINISTIC-FIRST.
type CascadeMessageData ¶ added in v0.14.3
type CascadeMessageData struct {
MessageID string `json:"message_id"`
Envelope *CascadeEnvelopeFields `json:"envelope,omitempty"`
}
CascadeMessageData is the JSON shape of the Pub/Sub message data field for cascade messages. M-PKG-CASCADE-DETERMINISTIC-FIRST: previously this was just `{"message_id": "..."}` (a pointer); now it is self-contained so the cloud coordinator can dispatch deterministically without a separate fetch from an inbox store.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps the Google Cloud Pub/Sub client with AILANG topic naming conventions.
func (*Client) Subscription ¶
func (c *Client) Subscription(baseName string) *pubsub.Subscription
Subscription returns a handle to an existing subscription.
func (*Client) SubscriptionName ¶
SubscriptionName returns the full subscription name with prefix.
type MessageAttributes ¶
type MessageAttributes struct {
Inbox string // Target agent inbox (e.g., "design-doc-creator")
Workspace string // Project workspace (e.g., "sunholo-data/ailang")
FromAgent string // Sender agent ID (e.g., "user", "sprint-planner")
Category string // Message category (e.g., "bug", "feature", "general")
MessageType string // Message type (e.g., "request", "notification")
// Source identifies which Pub/Sub topic the message arrived on.
// (M-PKG-AUTONOMOUS-CASCADE-SAFE M1) Used by the agent to distinguish
// authoritative cascade-driven bumps ("cascade") from public-routed
// feedback ("messages" or empty). Set by the publisher; the receiving
// adapter copies it through to Message.Source for downstream guards.
Source string
}
MessageAttributes carries routing metadata as Pub/Sub message attributes. Used for subscription filtering (e.g., filter by inbox or workspace).
func AttributesFromMap ¶
func AttributesFromMap(m map[string]string) MessageAttributes
AttributesFromMap creates MessageAttributes from a Pub/Sub message attributes map.
func (MessageAttributes) ToMap ¶
func (a MessageAttributes) ToMap() map[string]string
ToMap converts attributes to map[string]string for Pub/Sub message publishing. Only non-empty values are included.
type MessageHandler ¶
MessageHandler is called for each received Pub/Sub message. Returning nil acknowledges the message; returning an error causes a nack (retry).
type MessageNotification ¶
type MessageNotification struct {
MessageID string `json:"message_id"`
}
MessageNotification is published to the messages topic. Intentionally minimal — full message content lives in Firestore.
func DecodeMessageNotification ¶
func DecodeMessageNotification(data []byte) (MessageNotification, error)
DecodeMessageNotification decodes a MessageNotification from raw Pub/Sub data.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher provides high-level publish functions for AILANG Pub/Sub topics.
func NewPublisher ¶
NewPublisher creates a publisher from a client.
func (*Publisher) PublishCascade ¶ added in v0.14.3
func (p *Publisher) PublishCascade(ctx context.Context, messageID string, attrs MessageAttributes, rootPackage string) error
PublishCascade publishes a cascade-trigger notification to the cascade topic. (M-PKG-AUTONOMOUS-CASCADE-SAFE M2) The cascade topic's publish IAM is restricted to the coordinator service account at the GCP layer, so this method's success implicitly proves the caller is authorized — a stranger via the public MCP cannot reach this topic.
Stamps `source=cascade` and `root_package=<vendor>/<name>@<version>` as message attributes alongside the standard inbox routing fields. The receiving pkg-* agent's pkg-update.md template uses {{.Source}} to distinguish authoritative bumps from public-routed feedback.
For backwards compatibility this method accepts only the basic root_package signal. New code path is PublishCascadeWithEnvelope which carries the full hash + change_class context for deterministic dispatch.
func (*Publisher) PublishCascadeWithEnvelope ¶ added in v0.14.3
func (p *Publisher) PublishCascadeWithEnvelope(ctx context.Context, messageID string, attrs MessageAttributes, env *CascadeEnvelopeFields) error
PublishCascadeWithEnvelope publishes a cascade-trigger notification with the full envelope (hashes, change_class, effect deltas) embedded in the message data field AND surfaced as Pub/Sub attributes. The cloud coordinator can then make deterministic dispatch decisions (deterministic toml bump vs AI escalation) without fetching from a separate store.
M-PKG-CASCADE-DETERMINISTIC-FIRST.
func (*Publisher) PublishCompletion ¶
func (p *Publisher) PublishCompletion(ctx context.Context, completion TaskCompletion, workspace string) error
PublishCompletion publishes a task completion to the completions topic.
func (*Publisher) PublishEvent ¶
func (p *Publisher) PublishEvent(ctx context.Context, eventJSON []byte, eventType, taskID, workspace string) error
PublishEvent publishes a real-time event to the events topic. Events are consumed by the dashboard and laptop for live streaming.
func (*Publisher) PublishMessage ¶
func (p *Publisher) PublishMessage(ctx context.Context, messageID string, attrs MessageAttributes) error
PublishMessage publishes a message notification to the messages topic. The actual message content is already stored in Firestore — this notification tells the coordinator (or laptop) that a new message is available.
type PushEnvelope ¶
type PushEnvelope struct {
Message PushMessage `json:"message"`
Subscription string `json:"subscription"`
}
PushEnvelope is the JSON body POSTed by Pub/Sub to push subscription endpoints. See: https://cloud.google.com/pubsub/docs/push#receive_push
type PushMessage ¶
type PushMessage struct {
Data string `json:"data"` // base64-encoded payload
MessageID string `json:"messageId"`
Attributes map[string]string `json:"attributes"`
PublishTime string `json:"publishTime"`
}
PushMessage represents the message within a Pub/Sub push envelope.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber provides pull-based subscription for AILANG Pub/Sub topics.
func NewSubscriber ¶
func NewSubscriber(client *Client) *Subscriber
NewSubscriber creates a subscriber from a client.
func (*Subscriber) ReceiveOne ¶
func (s *Subscriber) ReceiveOne(ctx context.Context, subName string) ([]byte, map[string]string, error)
ReceiveOne pulls a single message from the subscription and returns it. Useful for CLI commands like `ailang messages watch --pubsub`. Returns the raw data, attributes, and any error.
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe(ctx context.Context, subName string, handler MessageHandler) error
Subscribe starts a blocking pull subscription. It calls handler for each message received on the named subscription. Subscribe blocks until ctx is cancelled or an unrecoverable error occurs.
Messages are automatically acked when handler returns nil, or nacked on error.
type TaskCompletion ¶
type TaskCompletion struct {
TaskID string `json:"task_id"`
AgentID string `json:"agent_id"`
Status string `json:"status"` // "completed" or "failed"
BranchName string `json:"branch_name,omitempty"` // Git branch with changes
ErrorMsg string `json:"error_msg,omitempty"`
// Changed files discovered via git diff after execution.
// Used by external clients (portal, sidecar) to know which files were created/modified.
ChangedFiles []string `json:"changed_files,omitempty"`
// Executor metrics (populated when using full executor infrastructure)
SessionID string `json:"session_id,omitempty"`
NumTurns int `json:"num_turns,omitempty"`
ToolCallCount int `json:"tool_call_count,omitempty"`
InputTokens int `json:"input_tokens,omitempty"`
OutputTokens int `json:"output_tokens,omitempty"`
CostUSD float64 `json:"cost_usd,omitempty"`
DurationMS int `json:"duration_ms,omitempty"`
// Cache token breakdown (omitted from InputTokens total — additive cost context)
CacheReadTokens int `json:"cache_read_tokens,omitempty"`
CacheCreationTokens int `json:"cache_creation_tokens,omitempty"`
// GCS path prefix for raw artifacts: transcript.txt, session.jsonl, metrics.json
// Format: "tasks/{taskID}" (relative to the per-environment artifact bucket)
ArtifactGCSPath string `json:"artifact_gcs_path,omitempty"`
}
TaskCompletion is published to the completions topic when a job finishes.
func DecodeTaskCompletion ¶
func DecodeTaskCompletion(data []byte) (TaskCompletion, error)
DecodeTaskCompletion decodes a TaskCompletion from raw Pub/Sub data.
type TaskDispatch ¶
TaskDispatch is published to the tasks topic to trigger a Cloud Run Job.
func DecodeTaskDispatch ¶
func DecodeTaskDispatch(data []byte) (TaskDispatch, error)
DecodeTaskDispatch decodes a TaskDispatch from raw Pub/Sub data.