Documentation
¶
Overview ¶
Package distributed defines Harbor's V1 distributed-edge contracts:
`MessageBus` — the at-least-once cross-worker fan-out edge. `Publish` is the canonical publication primitive. V1 ships an in-process loopback driver (`internal/distributed/drivers/loopback`); durable backends (NATS / Redis Streams / Postgres-as-queue) land in post-V1 phase 86.
`RemoteTransport` — the cross-process / cross-host call surface, designed end-to-end against the full A2A v1 spec (vendored at `docs/specifications/a2a.proto`). Every A2A RPC maps to a Go method here; every A2A message type has a counterpart in `internal/distributed/a2a`. V1 ships an in-process loopback driver; the actual A2A wire driver lands in Phase 29 (southbound).
Both surfaces follow the §4.4 driver-registry seam: an interface here, concrete drivers under `drivers/<name>/`, a factory + registry that dispatches by name, drivers self-registering from `init()`.
Identity contract. `MessageBus.Publish` rejects envelopes whose `Identity` quadruple is missing any of (tenant, user, session) with `ErrIdentityRequired`. `RemoteTransport` methods rely on `identity.Quadruple` being present in `ctx`; missing identity surfaces as `ErrIdentityRequired` at the driver boundary (caller-side check before reaching the wire). The contracts NEVER carry an identity- downgrading knob — identity is mandatory at every distributed edge (per AGENTS.md §6 + §13).
At-least-once contract. `MessageBus.Publish` is documented at-least- once. Consumers MUST be idempotent on `(TaskID, Edge, EventID)`. The in-process loopback driver delivers exactly-once today; future durable drivers WILL produce duplicates under partition + retry. Pinning the contract at at-least-once from t=0 means consumers that work against loopback also work against post-V1 drivers — this is intentional API hardening per D-031.
Index ¶
- Constants
- Variables
- func RegisterBus(name string, factory BusFactory)
- func RegisterRemoteTransport(name string, factory RemoteFactory)
- func RegisteredBusDrivers() []string
- func RegisteredRemoteTransportDrivers() []string
- type BusEnvelope
- type BusFactory
- type Dependencies
- type MessageBus
- type RemoteCallKind
- type RemoteCallRequest
- type RemoteCallResult
- type RemoteEventStream
- type RemoteFactory
- type RemoteTaskEventStream
- type RemoteTaskFilter
- type RemoteTaskSnapshot
- type RemoteTransport
Constants ¶
const DefaultDriver = "loopback"
DefaultDriver is the Phase 22 production driver name for BOTH the MessageBus and the RemoteTransport registries. Post-V1 durable bus drivers (NATS / Postgres-as-queue at phase 86) and the A2A wire driver (phase 29) register additional names; Open switches on `cfg.BusDriver` / `cfg.RemoteDriver`.
Variables ¶
var ( // ErrBusClosed — Publish was called after Close. ErrBusClosed = errors.New("distributed: bus is closed") // ErrTransportClosed — a RemoteTransport method was called after Close. ErrTransportClosed = errors.New("distributed: remote transport is closed") // ErrIdentityRequired — the call lacks one or more identity components. ErrIdentityRequired = errors.New("distributed: identity required (tenant/user/session)") // ErrUnknownDriver — the configured driver name is not registered. ErrUnknownDriver = errors.New("distributed: unknown driver") // ErrAgentNotFound — the RemoteTransport could not resolve the agent URL. ErrAgentNotFound = errors.New("distributed: A2A agent not registered with this transport") // ErrTaskNotFound — the requested A2A task does not exist on the target. ErrTaskNotFound = errors.New("distributed: A2A task not found") // ErrInvalidPart — an A2A Part oneof was empty or unrecognised. ErrInvalidPart = errors.New("distributed: invalid A2A Part (oneof empty)") )
Sentinel errors. Callers compare via errors.Is.
Functions ¶
func RegisterBus ¶
func RegisterBus(name string, factory BusFactory)
RegisterBus installs a BusFactory under name. Drivers self-register from their package init(); cmd/harbor blank-imports the production driver to trigger registration. Per AGENTS.md §4.4.
Re-registering the same name panics — the registration model is write-once-at-init and a duplicate signals a build mis-configuration.
func RegisterRemoteTransport ¶
func RegisterRemoteTransport(name string, factory RemoteFactory)
RegisterRemoteTransport installs a RemoteFactory under name. Same contract as RegisterBus.
func RegisteredBusDrivers ¶
func RegisteredBusDrivers() []string
RegisteredBusDrivers returns a sorted list of bus driver names.
func RegisteredRemoteTransportDrivers ¶
func RegisteredRemoteTransportDrivers() []string
RegisteredRemoteTransportDrivers returns a sorted list of remote driver names.
Types ¶
type BusEnvelope ¶
type BusEnvelope struct {
// Edge labels the conceptual fan-out edge ("planner.next",
// "tool.dispatch.completed", ...). Free-form; consumers use it
// for routing + idempotency keying.
Edge string
// Source labels the originating subsystem ("planner", "tools", ...).
Source string
// Target labels the destination class ("memory", "audit", ...).
// Empty when the envelope is a broadcast.
Target string
// Identity is the load-bearing isolation key. The triple
// (tenant, user, session) is mandatory.
Identity identity.Quadruple
// TaskID associates the envelope with a Harbor `tasks.Task`. Empty
// when the envelope is not task-scoped (rare; most distributed
// edges fire inside a task).
TaskID tasks.TaskID
// EventID is the caller-supplied idempotency key. Consumers dedupe
// on `(TaskID, Edge, EventID)`. ULID-shaped; callers SHOULD use
// `events.NewEventID` (or a state-store equivalent) to generate.
EventID events.EventID
// Payload is the redacted bytes the consumer will see. Phase 22's
// loopback driver passes the bytes through verbatim; durable
// drivers may compress / encrypt. Caller-side redaction (D-020)
// is mandatory BEFORE Publish.
Payload json.RawMessage
// Headers carry transport-level key/value pairs (trace context,
// tenant overrides for fan-out scopes, ...). Free-form.
Headers map[string]string
// Meta carries free-form metadata (mostly for tests / debugging).
// Drivers SHOULD NOT depend on Meta for correctness.
Meta map[string]any
// Timestamp records when the publisher considered the envelope
// ready. Set by the caller; drivers do NOT rewrite this field.
Timestamp time.Time
}
BusEnvelope is the unit `MessageBus.Publish` accepts. It carries identity quadruple, task ID, edge / source / target labels, an opaque (caller-redacted) payload, free-form headers + metadata, a caller-supplied event ID for idempotency, and a wall-clock timestamp.
Per AGENTS.md §6, the `Identity` triple (tenant / user / session) is mandatory; `Publish` rejects empty triples with `ErrIdentityRequired`. `RunID` (the fourth component) is optional — set when the envelope originates inside a run scope.
func (BusEnvelope) Validate ¶
func (e BusEnvelope) Validate() error
Validate reports whether the envelope is structurally valid. Returns wrapped sentinels:
- ErrIdentityRequired when any of (tenant, user, session) is empty.
Drivers SHOULD call Validate before publishing; consumers MAY rely on the bus having already validated.
type BusFactory ¶
type BusFactory func(deps Dependencies) (MessageBus, error)
BusFactory builds a MessageBus from a Dependencies struct. Drivers expose one BusFactory via init() → RegisterBus.
type Dependencies ¶
type Dependencies struct {
// EventBus is the typed event bus the loopback MessageBus projects
// envelopes through. Optional for drivers that do not project;
// the loopback bus REQUIRES it.
EventBus events.EventBus
// Cfg carries Phase 22's DistributedConfig (driver names today).
Cfg config.DistributedConfig
// Tools carries the unified tool-catalog config. The A2A wire
// driver (Phase 29) reads `Tools.A2APeers` here. Other drivers
// MAY ignore.
Tools config.ToolsConfig
}
Dependencies bundles the wiring inputs every distributed driver receives. EventBus is the optional projection target for the bus loopback (drivers free to ignore when not in-process); Cfg carries the driver names + any future per-driver tuning fields. Tools is the tool-catalog config — the A2A wire RemoteTransport driver (Phase 29) reads `Tools.A2APeers` to seed its route registry.
type MessageBus ¶
type MessageBus interface {
// Publish delivers env to its subscribers. At-least-once delivery;
// consumers MUST be idempotent on `(TaskID, Edge, EventID)`.
//
// Returns ErrBusClosed when called after Close.
// Returns ErrIdentityRequired when env's identity triple is empty.
Publish(ctx context.Context, env BusEnvelope) error
// Close shuts down the bus, joining any driver-owned goroutines.
// Idempotent: a second Close is a no-op + returns nil.
Close(ctx context.Context) error
}
MessageBus is Harbor's at-least-once cross-worker fan-out edge. Implementations MUST be safe for concurrent use by N goroutines against a single shared instance (D-025).
Subscribe is intentionally NOT on this interface in V1: the loopback driver projects the bus through the typed `events.EventBus` so subscribers wired to the event bus see the envelopes as typed events. A `Subscribe`-shaped method lands when a durable driver does (post-V1 phase 86); the contract is purposely narrow at V1.
func OpenBus ¶
func OpenBus(_ context.Context, deps Dependencies) (MessageBus, error)
OpenBus returns the MessageBus built by the factory whose name matches deps.Cfg.BusDriver (defaults to DefaultDriver when empty).
func OpenBusDriver ¶
func OpenBusDriver(name string, deps Dependencies) (MessageBus, error)
OpenBusDriver opens a specific bus driver by name; useful for tests.
type RemoteCallKind ¶
type RemoteCallKind string
RemoteCallKind discriminates the three SendMessage-shaped A2A dispatch modes a `RemoteTransport.Stream` call can take. `Send` is unary (mapped to A2A SendMessage); `Stream` is the streaming send (A2A SendStreamingMessage); `Subscribe` is the streaming-subscribe shape (A2A SubscribeToTask, when the caller wants live updates for a task it didn't initiate).
Drivers route by RemoteCallRequest.Kind. Empty Kind defaults to Send.
const ( // RemoteCallKindSend — A2A SendMessage (unary request/reply). RemoteCallKindSend RemoteCallKind = "send" // RemoteCallKindStream — A2A SendStreamingMessage (streaming reply). RemoteCallKindStream RemoteCallKind = "stream" // RemoteCallKindSubscribe — A2A SubscribeToTask (live updates for an existing task). RemoteCallKindSubscribe RemoteCallKind = "subscribe" )
RemoteCallKind values.
type RemoteCallRequest ¶
type RemoteCallRequest struct {
// AgentURL is the target A2A agent's interface URL (matches an
// AgentInterface.URL value from a discovered AgentCard).
AgentURL string
// Kind selects the dispatch shape. Empty defaults to Send.
Kind RemoteCallKind
// ContextID is the A2A context_id (the conversation grouping).
// Empty for new contexts; the target may assign one in the reply.
ContextID string
// TaskID is the A2A task_id. Empty for new tasks; set when the
// caller is continuing an existing task (Send with a follow-up
// message, Stream with Kind=Subscribe for an existing task).
TaskID string
// Message is the A2A message payload. Caller-side audit redaction
// (D-020) MUST have already run.
Message a2a.Message
// Config carries A2A send configuration (accepted output modes,
// push-notification config, history length, return-immediately).
Config a2a.SendMessageConfiguration
// Timeout is the per-call deadline. Zero means no timeout; drivers
// SHOULD treat zero as "honour the parent ctx deadline."
Timeout time.Duration
}
RemoteCallRequest carries the inputs to a `RemoteTransport.Send` or `RemoteTransport.Stream` invocation. The shape is the V1 wire-neutral envelope; Phase 29's A2A driver translates this into the on-the-wire gRPC / JSON-RPC / HTTP+JSON request as configured.
type RemoteCallResult ¶
type RemoteCallResult struct {
// Task is the A2A Task returned by the agent.
Task a2a.Task
// HTTPStatus is populated by transports that carry HTTP semantics
// (the JSON-RPC over HTTP and HTTP+JSON bindings of A2A). Zero
// when the transport does not carry HTTP status.
HTTPStatus int
}
RemoteCallResult is the unary-reply form returned by `RemoteTransport.Send`. It carries the resulting A2A Task plus an optional HTTP status (populated by HTTP-shaped transports; zero-valued for gRPC / loopback).
type RemoteEventStream ¶
type RemoteEventStream interface {
// Recv returns the next StreamResponse or an error. Returns an
// error wrapping io.EOF when the stream completes normally (the
// "done" condition for A2A's final event).
Recv(ctx context.Context) (a2a.StreamResponse, error)
// Close releases the stream. Idempotent.
Close() error
}
RemoteEventStream is the streaming-reply interface returned by `RemoteTransport.Stream`. Each Recv yields the next event in the stream; the stream terminates with a `RemoteStreamDone` error (typically wrapping `io.EOF` or `ctx.Err()`).
Callers MUST call Close to release the stream's resources (channel + goroutine). The conformance suite's `Stream_RespectsClose` covers the contract.
type RemoteFactory ¶
type RemoteFactory func(deps Dependencies) (RemoteTransport, error)
RemoteFactory builds a RemoteTransport from a Dependencies struct. Drivers expose one RemoteFactory via init() → RegisterRemoteTransport.
type RemoteTaskEventStream ¶
type RemoteTaskEventStream = RemoteEventStream
RemoteTaskEventStream is the streaming-reply interface returned by `RemoteTransport.Subscribe`. Identical surface to RemoteEventStream — the named alias makes the call-site intent (Subscribe vs Stream) readable without a documentation hop.
type RemoteTaskFilter ¶
type RemoteTaskFilter struct {
// Tenant filters by the agent-side tenant scope (matches the
// proto's `string tenant = 1` path parameter).
Tenant string
// ContextID filters by conversation grouping.
ContextID string
// Status filters by task status; TaskStateUnspecified means "any".
Status a2a.TaskState
// PageSize bounds the response size; drivers SHOULD cap at 100
// per the proto contract.
PageSize int32
// PageToken carries the continuation token from a prior call's
// NextPageToken. Empty for the first call.
PageToken string
// HistoryLength bounds per-task history depth in the response;
// zero means "no limit imposed by the caller."
HistoryLength int32
// StatusTimestampAfter filters to tasks whose status was updated
// at or after this instant. Zero means "no time filter."
StatusTimestampAfter time.Time
// IncludeArtifacts asks the driver to populate Artifacts on each
// returned Task. False by default to reduce payload size.
IncludeArtifacts bool
}
RemoteTaskFilter narrows `RemoteTransport.ListTasks`. Mirrors the proto ListTasksRequest filter shape — Tenant, ContextID, Status (zero-value = "any"), pagination, history length, status timestamp filter, and the artifact-inclusion flag.
type RemoteTaskSnapshot ¶
RemoteTaskSnapshot is the value returned by `RemoteTransport.GetTask` and the element type of `ListTasks`. The underlying type is an A2A Task; the named alias exists so the distributed contract can evolve independently of the wire shape without breaking callers.
type RemoteTransport ¶
type RemoteTransport interface {
// Send maps to A2A `SendMessage`. Unary request/reply.
Send(ctx context.Context, req RemoteCallRequest) (RemoteCallResult, error)
// Stream maps to A2A `SendStreamingMessage` (when req.Kind ==
// RemoteCallKindStream) or `SubscribeToTask` (when req.Kind ==
// RemoteCallKindSubscribe AND req.TaskID is set). Drivers route
// by req.Kind.
Stream(ctx context.Context, req RemoteCallRequest) (RemoteEventStream, error)
// GetTask maps to A2A `GetTask`. Returns ErrTaskNotFound when the
// task is not registered with the target agent.
GetTask(ctx context.Context, taskID, contextID string) (*RemoteTaskSnapshot, error)
// ListTasks maps to A2A `ListTasks`. Returns a snapshot per the
// filter; pagination handled via filter.PageToken.
ListTasks(ctx context.Context, filter RemoteTaskFilter) ([]RemoteTaskSnapshot, error)
// Cancel maps to A2A `CancelTask`. Returns nil when the task
// reached a terminal state (Canceled).
Cancel(ctx context.Context, taskID, contextID string) error
// Subscribe maps to A2A `SubscribeToTask` for the caller wanting
// live updates on a task it did NOT initiate. Symmetric with
// Stream(Kind=Subscribe); exposed as a separate method so the
// call-site intent reads cleanly.
Subscribe(ctx context.Context, taskID, contextID string) (RemoteTaskEventStream, error)
// CreateTaskPushNotificationConfig maps to A2A
// `CreateTaskPushNotificationConfig`. Stores a push-notification
// config for a task; V1 drivers store in memory.
CreateTaskPushNotificationConfig(ctx context.Context, cfg a2a.TaskPushNotificationConfig) (a2a.TaskPushNotificationConfig, error)
// GetTaskPushNotificationConfig maps to A2A
// `GetTaskPushNotificationConfig`. Returns the stored config.
GetTaskPushNotificationConfig(ctx context.Context, taskID, configID string) (a2a.TaskPushNotificationConfig, error)
// ListTaskPushNotificationConfigs maps to A2A
// `ListTaskPushNotificationConfigs`. Lists configs for a task.
ListTaskPushNotificationConfigs(ctx context.Context, taskID string) ([]a2a.TaskPushNotificationConfig, error)
// DeleteTaskPushNotificationConfig maps to A2A
// `DeleteTaskPushNotificationConfig`. Deletes a stored config.
DeleteTaskPushNotificationConfig(ctx context.Context, taskID, configID string) error
// GetExtendedAgentCard maps to A2A `GetExtendedAgentCard`. Returns
// the agent's self-describing manifest.
GetExtendedAgentCard(ctx context.Context) (*a2a.AgentCard, error)
// Close releases driver-owned resources. Idempotent.
Close(ctx context.Context) error
}
RemoteTransport is Harbor's cross-process / cross-host call surface. Every method maps 1:1 to an A2A v1 RPC from the vendored `docs/specifications/a2a.proto`. The mapping is verbatim so Phase 29 (the southbound A2A driver) consumes the surface without churn.
Implementations MUST be safe for concurrent use by N goroutines against a single shared instance (D-025).
All methods receive identity via `ctx`. Calls without a complete identity triple are rejected with `ErrIdentityRequired` at the caller-side boundary (drivers SHOULD NOT need to re-validate when the runtime owns the ctx).
func OpenRemoteTransport ¶
func OpenRemoteTransport(_ context.Context, deps Dependencies) (RemoteTransport, error)
OpenRemoteTransport returns the RemoteTransport built by the factory whose name matches deps.Cfg.RemoteDriver (defaults to DefaultDriver when empty).
func OpenRemoteTransportDriver ¶
func OpenRemoteTransportDriver(name string, deps Dependencies) (RemoteTransport, error)
OpenRemoteTransportDriver opens a specific remote driver by name; useful for tests.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package a2a provides hand-transcribed Go shapes for every type defined in the vendored A2A v1 proto specification at `docs/specifications/a2a.proto` (commit `ae6a562d5d972f2c4b184f748bb32e1fa9aa7bf2`, 2026-04-23).
|
Package a2a provides hand-transcribed Go shapes for every type defined in the vendored A2A v1 proto specification at `docs/specifications/a2a.proto` (commit `ae6a562d5d972f2c4b184f748bb32e1fa9aa7bf2`, 2026-04-23). |
|
Package conformancetest exposes the canonical correctness suites every distributed driver must pass.
|
Package conformancetest exposes the canonical correctness suites every distributed driver must pass. |
|
drivers
|
|
|
a2a
Package a2a is Harbor's southbound A2A wire driver.
|
Package a2a is Harbor's southbound A2A wire driver. |
|
loopback
Package loopback ships Harbor's V1 in-process drivers for both `distributed.MessageBus` and `distributed.RemoteTransport`.
|
Package loopback ships Harbor's V1 in-process drivers for both `distributed.MessageBus` and `distributed.RemoteTransport`. |