a2a

package
v0.2.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package a2a provides utilities for integrating gains with the A2A (Agent-to-Agent) protocol.

A2A is an open protocol enabling communication and interoperability between AI agent systems. It uses JSON-RPC 2.0 over HTTP(S) with support for streaming via Server-Sent Events (SSE). This package provides type definitions and message conversion utilities for building A2A-compatible agents with gains.

Overview

This package provides:

The package does NOT provide HTTP handlers or transport implementations. Users are responsible for implementing their own JSON-RPC server using their preferred framework.

Message Conversion

Use ToGainsMessages to convert A2A messages to gains messages for processing:

gainsMessages := a2a.ToGainsMessages(a2aMessages)
result := agent.Run(ctx, gainsMessages)

Use FromGainsMessages to convert gains messages back to A2A format:

history := a2a.FromGainsMessages(gainsMessages)
task.History = history

Task Lifecycle

A2A tasks progress through defined states:

  • TaskStateSubmitted: Task received, not yet started
  • TaskStateWorking: Task is being processed
  • TaskStateInputRequired: Agent needs additional input
  • TaskStateCompleted: Task finished successfully
  • TaskStateFailed: Task failed with an error
  • TaskStateCanceled: Task was canceled
  • TaskStateRejected: Task was rejected by the agent

Event Mapping

Use Mapper to convert gains events to A2A task status updates during streaming:

mapper := a2a.NewMapper(taskID, contextID)

for event := range agent.RunStream(ctx, messages) {
    if update := mapper.MapEvent(event); update != nil {
        // Send task status update to client
        sendSSE(update)
    }
}

// Finalize with completed or failed status
finalTask := mapper.Complete(artifacts)

Protocol Compliance

This package implements types compatible with A2A Protocol version 0.3. For full protocol details, see: https://a2a-protocol.org

Thread Safety

The Mapper is NOT safe for concurrent use. Each goroutine should have its own Mapper instance. Message conversion functions are stateless and safe for concurrent use.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MarshalPart

func MarshalPart(p Part) ([]byte, error)

MarshalPart marshals a Part to JSON with the correct type.

func ToGainsMessage

func ToGainsMessage(msg Message) ai.Message

ToGainsMessage converts a single A2A message to a gains message.

func ToGainsMessages

func ToGainsMessages(msgs []Message) []ai.Message

ToGainsMessages converts A2A messages to gains messages.

Types

type AgentExecutor

type AgentExecutor struct {
	// contains filtered or unexported fields
}

AgentExecutor wraps a gains Agent to implement the A2A Executor interface.

func NewAgentExecutor

func NewAgentExecutor(a AgentRunner, opts ...agent.Option) *AgentExecutor

NewAgentExecutor creates a new AgentExecutor wrapping the given agent.

func (*AgentExecutor) Execute

func (e *AgentExecutor) Execute(ctx context.Context, req SendMessageRequest) (*Task, error)

Execute runs the agent synchronously and returns the final task.

func (*AgentExecutor) ExecuteStream

func (e *AgentExecutor) ExecuteStream(ctx context.Context, req SendMessageRequest) <-chan Event

ExecuteStream runs the agent and streams status updates.

type AgentRunner

type AgentRunner interface {
	Run(ctx context.Context, messages []ai.Message, opts ...agent.Option) (*agent.Result, error)
	RunStream(ctx context.Context, messages []ai.Message, opts ...agent.Option) <-chan event.Event
}

AgentRunner is the interface required from gains agents. This allows for easier testing and decoupling.

type Artifact

type Artifact struct {
	ArtifactID  string         `json:"artifactId"`
	Name        string         `json:"name,omitempty"`
	Description string         `json:"description,omitempty"`
	Parts       []Part         `json:"parts"`
	Metadata    map[string]any `json:"metadata,omitempty"`
	Extensions  []string       `json:"extensions,omitempty"`
}

Artifact represents an output generated by a task.

func NewArtifact

func NewArtifact(name, description string, parts ...Part) Artifact

NewArtifact creates a new artifact with the given parts.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an A2A protocol client for calling remote agents.

func NewClient

func NewClient(endpoint string, opts ...ClientOption) *Client

NewClient creates a new A2A client for the given endpoint.

func (*Client) SendMessage

func (c *Client) SendMessage(ctx context.Context, req SendMessageRequest) (*Task, error)

SendMessage sends a message to the remote agent and returns the task.

func (*Client) SendText

func (c *Client) SendText(ctx context.Context, text string) (*Task, error)

SendText is a convenience method that sends a text message.

type ClientOption

type ClientOption func(*Client)

ClientOption configures a Client.

func WithHTTPClient

func WithHTTPClient(c *http.Client) ClientOption

WithHTTPClient sets a custom HTTP client.

type DataPart

type DataPart struct {
	Kind     string         `json:"kind"`
	Data     any            `json:"data"`
	Metadata map[string]any `json:"metadata,omitempty"`
}

DataPart represents arbitrary structured data (JSON) within a message.

func NewDataPart

func NewDataPart(data any) DataPart

NewDataPart creates a new DataPart with the given data.

func (DataPart) GetKind

func (p DataPart) GetKind() string

type Event

type Event interface {
	// contains filtered or unexported methods
}

Event represents an A2A streaming event (status update or artifact update).

type Executor

type Executor interface {
	// Execute runs a task synchronously and returns the final task.
	Execute(ctx context.Context, req SendMessageRequest) (*Task, error)

	// ExecuteStream runs a task and streams status updates.
	// The channel closes when execution completes.
	ExecuteStream(ctx context.Context, req SendMessageRequest) <-chan Event
}

Executor handles A2A task execution. Implementations convert A2A messages to gains format, execute the underlying agent or workflow, and convert results back to A2A format.

type FileContent

type FileContent struct {
	Name     string `json:"name,omitempty"`
	MimeType string `json:"mimeType,omitempty"`
	Bytes    string `json:"bytes,omitempty"` // Base64 encoded
	URI      string `json:"uri,omitempty"`
}

FileContent represents file content, either inline bytes or a URI reference.

type FilePart

type FilePart struct {
	Kind     string         `json:"kind"`
	File     FileContent    `json:"file"`
	Metadata map[string]any `json:"metadata,omitempty"`
}

FilePart represents a file included in a message.

func NewFilePartWithBytes

func NewFilePartWithBytes(name, mimeType, bytes string) FilePart

NewFilePartWithBytes creates a FilePart with inline base64-encoded content.

func NewFilePartWithURI

func NewFilePartWithURI(name, mimeType, uri string) FilePart

NewFilePartWithURI creates a FilePart with a URI reference.

func (FilePart) GetKind

func (p FilePart) GetKind() string

type Mapper

type Mapper struct {
	// contains filtered or unexported fields
}

Mapper converts gains events to A2A task status updates.

A2A uses a task-centric model where all updates are framed as task status changes or artifact additions. This mapper accumulates message content and emits task status updates as the agent progresses.

Create a new Mapper for each task using NewMapper. The Mapper is not safe for concurrent use - each goroutine should have its own Mapper.

func NewMapper

func NewMapper(taskID, contextID string) *Mapper

NewMapper creates a new Mapper for a single task.

func (*Mapper) ArtifactUpdate

func (m *Mapper) ArtifactUpdate(artifact Artifact) TaskArtifactUpdateEvent

ArtifactUpdate creates a task artifact update event.

func (*Mapper) Canceled

func (m *Mapper) Canceled() TaskStatusUpdateEvent

Canceled returns a final status update for cancellation.

func (*Mapper) Completed

func (m *Mapper) Completed(msg *Message) TaskStatusUpdateEvent

Completed returns a final status update for successful completion.

func (*Mapper) ContextID

func (m *Mapper) ContextID() string

ContextID returns the context ID for this mapper.

func (*Mapper) CreateTask

func (m *Mapper) CreateTask() *Task

CreateTask creates a Task object from the current mapper state.

func (*Mapper) CreateTaskWithHistory

func (m *Mapper) CreateTaskWithHistory(history []Message) *Task

CreateTaskWithHistory creates a Task with the given message history.

func (*Mapper) Failed

func (m *Mapper) Failed(errMsg string) TaskStatusUpdateEvent

Failed returns a final status update for failure.

func (*Mapper) InputRequired

func (m *Mapper) InputRequired(prompt string) TaskStatusUpdateEvent

InputRequired returns a status update requesting additional input.

func (*Mapper) MapEvent

func (m *Mapper) MapEvent(e event.Event) Event

MapEvent converts a gains event to an A2A event. Returns nil for events that don't require an A2A update.

For nested runs (e.g., workflows containing agents), only the outermost run lifecycle events trigger task state changes.

func (*Mapper) MapStream

func (m *Mapper) MapStream(input <-chan event.Event) <-chan Event

MapStream wraps a gains event channel and yields A2A events. The returned channel closes when the input channel closes.

func (*Mapper) State

func (m *Mapper) State() TaskState

State returns the current task state.

func (*Mapper) StatusUpdate

func (m *Mapper) StatusUpdate(state TaskState, msg *Message, final bool) TaskStatusUpdateEvent

StatusUpdate creates a task status update event.

func (*Mapper) Submitted

func (m *Mapper) Submitted() TaskStatusUpdateEvent

Submitted returns a status update for the submitted state.

func (*Mapper) TaskID

func (m *Mapper) TaskID() string

TaskID returns the task ID for this mapper.

func (*Mapper) Working

func (m *Mapper) Working() TaskStatusUpdateEvent

Working returns a status update for the working state.

type Message

type Message struct {
	Kind             string         `json:"kind"`
	MessageID        string         `json:"messageId"`
	Role             MessageRole    `json:"role"`
	Parts            []Part         `json:"parts"`
	ContextID        *string        `json:"contextId,omitempty"`
	TaskID           *string        `json:"taskId,omitempty"`
	ReferenceTaskIDs []string       `json:"referenceTaskIds,omitempty"`
	Metadata         map[string]any `json:"metadata,omitempty"`
	Extensions       []string       `json:"extensions,omitempty"`
}

Message represents a single exchange between a user and an agent.

func FromGainsMessage

func FromGainsMessage(msg ai.Message) Message

FromGainsMessage converts a single gains message to an A2A message.

func FromGainsMessages

func FromGainsMessages(msgs []ai.Message) []Message

FromGainsMessages converts gains messages to A2A messages.

func NewMessage

func NewMessage(role MessageRole, parts ...Part) Message

NewMessage creates a new message with the given role and parts.

func NewMessageWithContext

func NewMessageWithContext(role MessageRole, contextID string, taskID *string, parts ...Part) Message

NewMessageWithContext creates a new message with context and optional task ID.

func (Message) TextContent

func (m Message) TextContent() string

TextContent returns the concatenated text from all TextParts in the message.

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling for Message. This is needed because Parts is a []Part interface which can't be directly unmarshaled.

type MessageRole

type MessageRole string

MessageRole indicates the originator of a message.

const (
	// MessageRoleUser is the role for messages from the user/client.
	MessageRoleUser MessageRole = "user"
	// MessageRoleAgent is the role for messages from the agent/server.
	MessageRoleAgent MessageRole = "agent"
)

type Part

type Part interface {
	GetKind() string
	// contains filtered or unexported methods
}

Part represents a segment of a message (text, file, or data).

func UnmarshalPart

func UnmarshalPart(data []byte) (Part, error)

UnmarshalPart unmarshals a Part from JSON.

type RemoteTool

type RemoteTool struct {
	// contains filtered or unexported fields
}

RemoteTool wraps an A2A agent as a gains Tool. This allows a gains agent to call remote A2A agents as tools.

func NewRemoteTool

func NewRemoteTool(client *Client, opts ...RemoteToolOption) *RemoteTool

NewRemoteTool creates a new RemoteTool that calls a remote A2A agent.

func (*RemoteTool) Handler

func (t *RemoteTool) Handler() func(ctx context.Context, input json.RawMessage) (string, error)

Handler returns a tool handler that calls the remote agent.

func (*RemoteTool) Register

func (t *RemoteTool) Register(registry ToolRegistry)

Register registers the remote tool with a tool registry.

func (*RemoteTool) Tool

func (t *RemoteTool) Tool() ai.Tool

Tool returns a gains Tool that calls the remote agent.

type RemoteToolOption

type RemoteToolOption func(*RemoteTool)

RemoteToolOption configures a RemoteTool.

func WithToolDescription

func WithToolDescription(desc string) RemoteToolOption

WithToolDescription sets the tool description.

func WithToolName

func WithToolName(name string) RemoteToolOption

WithToolName sets the tool name (default: "remote_agent").

func WithToolSchema

func WithToolSchema(schema json.RawMessage) RemoteToolOption

WithToolSchema sets a custom schema for the tool arguments.

type SendMessageConfiguration

type SendMessageConfiguration struct {
	// AcceptedOutputModes specifies the output formats the client can handle.
	AcceptedOutputModes []string `json:"acceptedOutputModes,omitempty"`

	// HistoryLength controls how much conversation context to include.
	HistoryLength *int `json:"historyLength,omitempty"`

	// Blocking waits for task completion before returning.
	Blocking bool `json:"blocking,omitempty"`

	// PushNotificationConfig for async updates (not implemented yet).
	PushNotificationConfig map[string]any `json:"pushNotificationConfig,omitempty"`
}

SendMessageConfiguration contains options for the send request.

type SendMessageRequest

type SendMessageRequest struct {
	Message       Message                   `json:"message"`
	Configuration *SendMessageConfiguration `json:"configuration,omitempty"`
	Metadata      map[string]any            `json:"metadata,omitempty"`
}

SendMessageRequest represents an A2A message/send request.

type Task

type Task struct {
	Kind      string         `json:"kind"`
	ID        string         `json:"id"`
	ContextID string         `json:"contextId"`
	Status    TaskStatus     `json:"status"`
	Artifacts []Artifact     `json:"artifacts,omitempty"`
	History   []Message      `json:"history,omitempty"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

Task represents a unit of work being processed by the agent.

func NewTask

func NewTask(id, contextID string) *Task

NewTask creates a new task with the given ID and context ID.

type TaskArtifactUpdateEvent

type TaskArtifactUpdateEvent struct {
	Kind      string   `json:"kind"`
	TaskID    string   `json:"taskId"`
	ContextID string   `json:"contextId"`
	Artifact  Artifact `json:"artifact"`
}

TaskArtifactUpdateEvent represents a streaming artifact update.

func NewTaskArtifactUpdateEvent

func NewTaskArtifactUpdateEvent(taskID, contextID string, artifact Artifact) TaskArtifactUpdateEvent

NewTaskArtifactUpdateEvent creates a new task artifact update event.

type TaskState

type TaskState string

TaskState represents the lifecycle state of a task.

const (
	TaskStateSubmitted     TaskState = "submitted"
	TaskStateWorking       TaskState = "working"
	TaskStateInputRequired TaskState = "input-required"
	TaskStateCompleted     TaskState = "completed"
	TaskStateCanceled      TaskState = "canceled"
	TaskStateFailed        TaskState = "failed"
	TaskStateRejected      TaskState = "rejected"
	TaskStateAuthRequired  TaskState = "auth-required"
)

func (TaskState) IsTerminal

func (s TaskState) IsTerminal() bool

IsTerminal returns true if the state is a terminal state.

type TaskStatus

type TaskStatus struct {
	State     TaskState `json:"state"`
	Message   *Message  `json:"message,omitempty"`
	Timestamp string    `json:"timestamp,omitempty"`
}

TaskStatus represents the current status of a task.

func NewTaskStatus

func NewTaskStatus(state TaskState) TaskStatus

NewTaskStatus creates a new TaskStatus with the given state.

func NewTaskStatusWithMessage

func NewTaskStatusWithMessage(state TaskState, msg *Message) TaskStatus

NewTaskStatusWithMessage creates a new TaskStatus with a message.

type TaskStatusUpdateEvent

type TaskStatusUpdateEvent struct {
	Kind      string     `json:"kind"`
	TaskID    string     `json:"taskId"`
	ContextID string     `json:"contextId"`
	Status    TaskStatus `json:"status"`
	Final     bool       `json:"final"`
}

TaskStatusUpdateEvent represents a streaming task status update.

func NewTaskStatusUpdateEvent

func NewTaskStatusUpdateEvent(taskID, contextID string, status TaskStatus, final bool) TaskStatusUpdateEvent

NewTaskStatusUpdateEvent creates a new task status update event.

type TextPart

type TextPart struct {
	Kind     string         `json:"kind"`
	Text     string         `json:"text"`
	Metadata map[string]any `json:"metadata,omitempty"`
}

TextPart represents a text segment within a message.

func NewTextPart

func NewTextPart(text string) TextPart

NewTextPart creates a new TextPart with the given text.

func (TextPart) GetKind

func (p TextPart) GetKind() string

type ToolRegistry

type ToolRegistry interface {
	RegisterFunc(name, description string, schema json.RawMessage, handler func(ctx context.Context, input json.RawMessage) (string, error))
}

ToolRegistry is the interface for registering tools.

type WorkflowExecutor

type WorkflowExecutor struct {
	// contains filtered or unexported fields
}

WorkflowExecutor wraps a gains workflow Runner to implement the A2A Executor interface.

func NewWorkflowExecutor

func NewWorkflowExecutor(runner WorkflowRunner) *WorkflowExecutor

NewWorkflowExecutor creates a new WorkflowExecutor wrapping the given runner.

func (*WorkflowExecutor) Execute

func (e *WorkflowExecutor) Execute(ctx context.Context, req SendMessageRequest) (*Task, error)

Execute runs the workflow synchronously and returns the final task.

func (*WorkflowExecutor) ExecuteStream

func (e *WorkflowExecutor) ExecuteStream(ctx context.Context, req SendMessageRequest) <-chan Event

ExecuteStream runs the workflow and streams status updates.

type WorkflowRunner

type WorkflowRunner interface {
	RunStream(ctx context.Context, state any, opts ...interface{}) <-chan event.Event
}

WorkflowRunner is the interface required from gains workflow runners.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL