distributed

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package distributed defines Harbor's V1 distributed-edge contracts:

  • `MessageBus` — the at-least-once cross-worker fan-out edge. `Publish` is the canonical publication primitive. V1 ships an in-process loopback driver (`internal/distributed/drivers/loopback`); durable backends (NATS / Redis Streams / Postgres-as-queue) land in post-V1 phase 86.

  • `RemoteTransport` — the cross-process / cross-host call surface, designed end-to-end against the full A2A v1 spec (vendored at `docs/specifications/a2a.proto`). Every A2A RPC maps to a Go method here; every A2A message type has a counterpart in `internal/distributed/a2a`. V1 ships an in-process loopback driver; the actual A2A wire driver lands in Phase 29 (southbound).

Both surfaces follow the §4.4 driver-registry seam: an interface here, concrete drivers under `drivers/<name>/`, a factory + registry that dispatches by name, drivers self-registering from `init()`.

Identity contract. `MessageBus.Publish` rejects envelopes whose `Identity` quadruple is missing any of (tenant, user, session) with `ErrIdentityRequired`. `RemoteTransport` methods rely on `identity.Quadruple` being present in `ctx`; missing identity surfaces as `ErrIdentityRequired` at the driver boundary (caller-side check before reaching the wire). The contracts NEVER carry an identity- downgrading knob — identity is mandatory at every distributed edge (per AGENTS.md §6 + §13).

At-least-once contract. `MessageBus.Publish` is documented at-least- once. Consumers MUST be idempotent on `(TaskID, Edge, EventID)`. The in-process loopback driver delivers exactly-once today; future durable drivers WILL produce duplicates under partition + retry. Pinning the contract at at-least-once from t=0 means consumers that work against loopback also work against post-V1 drivers — this is intentional API hardening per D-031.

Index

Constants

View Source
const DefaultDriver = "loopback"

DefaultDriver is the Phase 22 production driver name for BOTH the MessageBus and the RemoteTransport registries. Post-V1 durable bus drivers (NATS / Postgres-as-queue at phase 86) and the A2A wire driver (phase 29) register additional names; Open switches on `cfg.BusDriver` / `cfg.RemoteDriver`.

Variables

View Source
var (
	// ErrBusClosed — Publish was called after Close.
	ErrBusClosed = errors.New("distributed: bus is closed")
	// ErrTransportClosed — a RemoteTransport method was called after Close.
	ErrTransportClosed = errors.New("distributed: remote transport is closed")
	// ErrIdentityRequired — the call lacks one or more identity components.
	ErrIdentityRequired = errors.New("distributed: identity required (tenant/user/session)")
	// ErrUnknownDriver — the configured driver name is not registered.
	ErrUnknownDriver = errors.New("distributed: unknown driver")
	// ErrAgentNotFound — the RemoteTransport could not resolve the agent URL.
	ErrAgentNotFound = errors.New("distributed: A2A agent not registered with this transport")
	// ErrTaskNotFound — the requested A2A task does not exist on the target.
	ErrTaskNotFound = errors.New("distributed: A2A task not found")
	// ErrInvalidPart — an A2A Part oneof was empty or unrecognised.
	ErrInvalidPart = errors.New("distributed: invalid A2A Part (oneof empty)")
)

Sentinel errors. Callers compare via errors.Is.

Functions

func RegisterBus

func RegisterBus(name string, factory BusFactory)

RegisterBus installs a BusFactory under name. Drivers self-register from their package init(); cmd/harbor blank-imports the production driver to trigger registration. Per AGENTS.md §4.4.

Re-registering the same name panics — the registration model is write-once-at-init and a duplicate signals a build mis-configuration.

func RegisterRemoteTransport

func RegisterRemoteTransport(name string, factory RemoteFactory)

RegisterRemoteTransport installs a RemoteFactory under name. Same contract as RegisterBus.

func RegisteredBusDrivers

func RegisteredBusDrivers() []string

RegisteredBusDrivers returns a sorted list of bus driver names.

func RegisteredRemoteTransportDrivers

func RegisteredRemoteTransportDrivers() []string

RegisteredRemoteTransportDrivers returns a sorted list of remote driver names.

Types

type BusEnvelope

type BusEnvelope struct {
	// Edge labels the conceptual fan-out edge ("planner.next",
	// "tool.dispatch.completed", ...). Free-form; consumers use it
	// for routing + idempotency keying.
	Edge string
	// Source labels the originating subsystem ("planner", "tools", ...).
	Source string
	// Target labels the destination class ("memory", "audit", ...).
	// Empty when the envelope is a broadcast.
	Target string
	// Identity is the load-bearing isolation key. The triple
	// (tenant, user, session) is mandatory.
	Identity identity.Quadruple
	// TaskID associates the envelope with a Harbor `tasks.Task`. Empty
	// when the envelope is not task-scoped (rare; most distributed
	// edges fire inside a task).
	TaskID tasks.TaskID
	// EventID is the caller-supplied idempotency key. Consumers dedupe
	// on `(TaskID, Edge, EventID)`. ULID-shaped; callers SHOULD use
	// `events.NewEventID` (or a state-store equivalent) to generate.
	EventID events.EventID
	// Payload is the redacted bytes the consumer will see. Phase 22's
	// loopback driver passes the bytes through verbatim; durable
	// drivers may compress / encrypt. Caller-side redaction (D-020)
	// is mandatory BEFORE Publish.
	Payload json.RawMessage
	// Headers carry transport-level key/value pairs (trace context,
	// tenant overrides for fan-out scopes, ...). Free-form.
	Headers map[string]string
	// Meta carries free-form metadata (mostly for tests / debugging).
	// Drivers SHOULD NOT depend on Meta for correctness.
	Meta map[string]any
	// Timestamp records when the publisher considered the envelope
	// ready. Set by the caller; drivers do NOT rewrite this field.
	Timestamp time.Time
}

BusEnvelope is the unit `MessageBus.Publish` accepts. It carries identity quadruple, task ID, edge / source / target labels, an opaque (caller-redacted) payload, free-form headers + metadata, a caller-supplied event ID for idempotency, and a wall-clock timestamp.

Per AGENTS.md §6, the `Identity` triple (tenant / user / session) is mandatory; `Publish` rejects empty triples with `ErrIdentityRequired`. `RunID` (the fourth component) is optional — set when the envelope originates inside a run scope.

func (BusEnvelope) Validate

func (e BusEnvelope) Validate() error

Validate reports whether the envelope is structurally valid. Returns wrapped sentinels:

  • ErrIdentityRequired when any of (tenant, user, session) is empty.

Drivers SHOULD call Validate before publishing; consumers MAY rely on the bus having already validated.

type BusFactory

type BusFactory func(deps Dependencies) (MessageBus, error)

BusFactory builds a MessageBus from a Dependencies struct. Drivers expose one BusFactory via init() → RegisterBus.

type Dependencies

type Dependencies struct {
	// EventBus is the typed event bus the loopback MessageBus projects
	// envelopes through. Optional for drivers that do not project;
	// the loopback bus REQUIRES it.
	EventBus events.EventBus
	// Cfg carries Phase 22's DistributedConfig (driver names today).
	Cfg config.DistributedConfig
	// Tools carries the unified tool-catalog config. The A2A wire
	// driver (Phase 29) reads `Tools.A2APeers` here. Other drivers
	// MAY ignore.
	Tools config.ToolsConfig
}

Dependencies bundles the wiring inputs every distributed driver receives. EventBus is the optional projection target for the bus loopback (drivers free to ignore when not in-process); Cfg carries the driver names + any future per-driver tuning fields. Tools is the tool-catalog config — the A2A wire RemoteTransport driver (Phase 29) reads `Tools.A2APeers` to seed its route registry.

type MessageBus

type MessageBus interface {
	// Publish delivers env to its subscribers. At-least-once delivery;
	// consumers MUST be idempotent on `(TaskID, Edge, EventID)`.
	//
	// Returns ErrBusClosed when called after Close.
	// Returns ErrIdentityRequired when env's identity triple is empty.
	Publish(ctx context.Context, env BusEnvelope) error
	// Close shuts down the bus, joining any driver-owned goroutines.
	// Idempotent: a second Close is a no-op + returns nil.
	Close(ctx context.Context) error
}

MessageBus is Harbor's at-least-once cross-worker fan-out edge. Implementations MUST be safe for concurrent use by N goroutines against a single shared instance (D-025).

Subscribe is intentionally NOT on this interface in V1: the loopback driver projects the bus through the typed `events.EventBus` so subscribers wired to the event bus see the envelopes as typed events. A `Subscribe`-shaped method lands when a durable driver does (post-V1 phase 86); the contract is purposely narrow at V1.

func OpenBus

func OpenBus(_ context.Context, deps Dependencies) (MessageBus, error)

OpenBus returns the MessageBus built by the factory whose name matches deps.Cfg.BusDriver (defaults to DefaultDriver when empty).

func OpenBusDriver

func OpenBusDriver(name string, deps Dependencies) (MessageBus, error)

OpenBusDriver opens a specific bus driver by name; useful for tests.

type RemoteCallKind

type RemoteCallKind string

RemoteCallKind discriminates the three SendMessage-shaped A2A dispatch modes a `RemoteTransport.Stream` call can take. `Send` is unary (mapped to A2A SendMessage); `Stream` is the streaming send (A2A SendStreamingMessage); `Subscribe` is the streaming-subscribe shape (A2A SubscribeToTask, when the caller wants live updates for a task it didn't initiate).

Drivers route by RemoteCallRequest.Kind. Empty Kind defaults to Send.

const (
	// RemoteCallKindSend — A2A SendMessage (unary request/reply).
	RemoteCallKindSend RemoteCallKind = "send"
	// RemoteCallKindStream — A2A SendStreamingMessage (streaming reply).
	RemoteCallKindStream RemoteCallKind = "stream"
	// RemoteCallKindSubscribe — A2A SubscribeToTask (live updates for an existing task).
	RemoteCallKindSubscribe RemoteCallKind = "subscribe"
)

RemoteCallKind values.

type RemoteCallRequest

type RemoteCallRequest struct {
	// AgentURL is the target A2A agent's interface URL (matches an
	// AgentInterface.URL value from a discovered AgentCard).
	AgentURL string
	// Kind selects the dispatch shape. Empty defaults to Send.
	Kind RemoteCallKind
	// ContextID is the A2A context_id (the conversation grouping).
	// Empty for new contexts; the target may assign one in the reply.
	ContextID string
	// TaskID is the A2A task_id. Empty for new tasks; set when the
	// caller is continuing an existing task (Send with a follow-up
	// message, Stream with Kind=Subscribe for an existing task).
	TaskID string
	// Message is the A2A message payload. Caller-side audit redaction
	// (D-020) MUST have already run.
	Message a2a.Message
	// Config carries A2A send configuration (accepted output modes,
	// push-notification config, history length, return-immediately).
	Config a2a.SendMessageConfiguration
	// Timeout is the per-call deadline. Zero means no timeout; drivers
	// SHOULD treat zero as "honour the parent ctx deadline."
	Timeout time.Duration
}

RemoteCallRequest carries the inputs to a `RemoteTransport.Send` or `RemoteTransport.Stream` invocation. The shape is the V1 wire-neutral envelope; Phase 29's A2A driver translates this into the on-the-wire gRPC / JSON-RPC / HTTP+JSON request as configured.

type RemoteCallResult

type RemoteCallResult struct {
	// Task is the A2A Task returned by the agent.
	Task a2a.Task
	// HTTPStatus is populated by transports that carry HTTP semantics
	// (the JSON-RPC over HTTP and HTTP+JSON bindings of A2A). Zero
	// when the transport does not carry HTTP status.
	HTTPStatus int
}

RemoteCallResult is the unary-reply form returned by `RemoteTransport.Send`. It carries the resulting A2A Task plus an optional HTTP status (populated by HTTP-shaped transports; zero-valued for gRPC / loopback).

type RemoteEventStream

type RemoteEventStream interface {
	// Recv returns the next StreamResponse or an error. Returns an
	// error wrapping io.EOF when the stream completes normally (the
	// "done" condition for A2A's final event).
	Recv(ctx context.Context) (a2a.StreamResponse, error)
	// Close releases the stream. Idempotent.
	Close() error
}

RemoteEventStream is the streaming-reply interface returned by `RemoteTransport.Stream`. Each Recv yields the next event in the stream; the stream terminates with a `RemoteStreamDone` error (typically wrapping `io.EOF` or `ctx.Err()`).

Callers MUST call Close to release the stream's resources (channel + goroutine). The conformance suite's `Stream_RespectsClose` covers the contract.

type RemoteFactory

type RemoteFactory func(deps Dependencies) (RemoteTransport, error)

RemoteFactory builds a RemoteTransport from a Dependencies struct. Drivers expose one RemoteFactory via init() → RegisterRemoteTransport.

type RemoteTaskEventStream

type RemoteTaskEventStream = RemoteEventStream

RemoteTaskEventStream is the streaming-reply interface returned by `RemoteTransport.Subscribe`. Identical surface to RemoteEventStream — the named alias makes the call-site intent (Subscribe vs Stream) readable without a documentation hop.

type RemoteTaskFilter

type RemoteTaskFilter struct {
	// Tenant filters by the agent-side tenant scope (matches the
	// proto's `string tenant = 1` path parameter).
	Tenant string
	// ContextID filters by conversation grouping.
	ContextID string
	// Status filters by task status; TaskStateUnspecified means "any".
	Status a2a.TaskState
	// PageSize bounds the response size; drivers SHOULD cap at 100
	// per the proto contract.
	PageSize int32
	// PageToken carries the continuation token from a prior call's
	// NextPageToken. Empty for the first call.
	PageToken string
	// HistoryLength bounds per-task history depth in the response;
	// zero means "no limit imposed by the caller."
	HistoryLength int32
	// StatusTimestampAfter filters to tasks whose status was updated
	// at or after this instant. Zero means "no time filter."
	StatusTimestampAfter time.Time
	// IncludeArtifacts asks the driver to populate Artifacts on each
	// returned Task. False by default to reduce payload size.
	IncludeArtifacts bool
}

RemoteTaskFilter narrows `RemoteTransport.ListTasks`. Mirrors the proto ListTasksRequest filter shape — Tenant, ContextID, Status (zero-value = "any"), pagination, history length, status timestamp filter, and the artifact-inclusion flag.

type RemoteTaskSnapshot

type RemoteTaskSnapshot a2a.Task

RemoteTaskSnapshot is the value returned by `RemoteTransport.GetTask` and the element type of `ListTasks`. The underlying type is an A2A Task; the named alias exists so the distributed contract can evolve independently of the wire shape without breaking callers.

type RemoteTransport

type RemoteTransport interface {
	// Send maps to A2A `SendMessage`. Unary request/reply.
	Send(ctx context.Context, req RemoteCallRequest) (RemoteCallResult, error)
	// Stream maps to A2A `SendStreamingMessage` (when req.Kind ==
	// RemoteCallKindStream) or `SubscribeToTask` (when req.Kind ==
	// RemoteCallKindSubscribe AND req.TaskID is set). Drivers route
	// by req.Kind.
	Stream(ctx context.Context, req RemoteCallRequest) (RemoteEventStream, error)
	// GetTask maps to A2A `GetTask`. Returns ErrTaskNotFound when the
	// task is not registered with the target agent.
	GetTask(ctx context.Context, taskID, contextID string) (*RemoteTaskSnapshot, error)
	// ListTasks maps to A2A `ListTasks`. Returns a snapshot per the
	// filter; pagination handled via filter.PageToken.
	ListTasks(ctx context.Context, filter RemoteTaskFilter) ([]RemoteTaskSnapshot, error)
	// Cancel maps to A2A `CancelTask`. Returns nil when the task
	// reached a terminal state (Canceled).
	Cancel(ctx context.Context, taskID, contextID string) error
	// Subscribe maps to A2A `SubscribeToTask` for the caller wanting
	// live updates on a task it did NOT initiate. Symmetric with
	// Stream(Kind=Subscribe); exposed as a separate method so the
	// call-site intent reads cleanly.
	Subscribe(ctx context.Context, taskID, contextID string) (RemoteTaskEventStream, error)
	// CreateTaskPushNotificationConfig maps to A2A
	// `CreateTaskPushNotificationConfig`. Stores a push-notification
	// config for a task; V1 drivers store in memory.
	CreateTaskPushNotificationConfig(ctx context.Context, cfg a2a.TaskPushNotificationConfig) (a2a.TaskPushNotificationConfig, error)
	// GetTaskPushNotificationConfig maps to A2A
	// `GetTaskPushNotificationConfig`. Returns the stored config.
	GetTaskPushNotificationConfig(ctx context.Context, taskID, configID string) (a2a.TaskPushNotificationConfig, error)
	// ListTaskPushNotificationConfigs maps to A2A
	// `ListTaskPushNotificationConfigs`. Lists configs for a task.
	ListTaskPushNotificationConfigs(ctx context.Context, taskID string) ([]a2a.TaskPushNotificationConfig, error)
	// DeleteTaskPushNotificationConfig maps to A2A
	// `DeleteTaskPushNotificationConfig`. Deletes a stored config.
	DeleteTaskPushNotificationConfig(ctx context.Context, taskID, configID string) error
	// GetExtendedAgentCard maps to A2A `GetExtendedAgentCard`. Returns
	// the agent's self-describing manifest.
	GetExtendedAgentCard(ctx context.Context) (*a2a.AgentCard, error)
	// Close releases driver-owned resources. Idempotent.
	Close(ctx context.Context) error
}

RemoteTransport is Harbor's cross-process / cross-host call surface. Every method maps 1:1 to an A2A v1 RPC from the vendored `docs/specifications/a2a.proto`. The mapping is verbatim so Phase 29 (the southbound A2A driver) consumes the surface without churn.

Implementations MUST be safe for concurrent use by N goroutines against a single shared instance (D-025).

All methods receive identity via `ctx`. Calls without a complete identity triple are rejected with `ErrIdentityRequired` at the caller-side boundary (drivers SHOULD NOT need to re-validate when the runtime owns the ctx).

func OpenRemoteTransport

func OpenRemoteTransport(_ context.Context, deps Dependencies) (RemoteTransport, error)

OpenRemoteTransport returns the RemoteTransport built by the factory whose name matches deps.Cfg.RemoteDriver (defaults to DefaultDriver when empty).

func OpenRemoteTransportDriver

func OpenRemoteTransportDriver(name string, deps Dependencies) (RemoteTransport, error)

OpenRemoteTransportDriver opens a specific remote driver by name; useful for tests.

Directories

Path Synopsis
Package a2a provides hand-transcribed Go shapes for every type defined in the vendored A2A v1 proto specification at `docs/specifications/a2a.proto` (commit `ae6a562d5d972f2c4b184f748bb32e1fa9aa7bf2`, 2026-04-23).
Package a2a provides hand-transcribed Go shapes for every type defined in the vendored A2A v1 proto specification at `docs/specifications/a2a.proto` (commit `ae6a562d5d972f2c4b184f748bb32e1fa9aa7bf2`, 2026-04-23).
Package conformancetest exposes the canonical correctness suites every distributed driver must pass.
Package conformancetest exposes the canonical correctness suites every distributed driver must pass.
drivers
a2a
Package a2a is Harbor's southbound A2A wire driver.
Package a2a is Harbor's southbound A2A wire driver.
loopback
Package loopback ships Harbor's V1 in-process drivers for both `distributed.MessageBus` and `distributed.RemoteTransport`.
Package loopback ships Harbor's V1 in-process drivers for both `distributed.MessageBus` and `distributed.RemoteTransport`.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL