Documentation
¶
Overview ¶
Package registry provides agent registration and discovery for swarm coordination.
Overview ¶
The Registry interface enables agents to self-register with capabilities, status, and load. Other agents discover and route to appropriate handlers based on capabilities and availability.
Available Implementations ¶
- MemoryRegistry: In-memory implementation for testing and single-node use
- NATSRegistry: Distributed registry using NATS JetStream KV store
Basic Usage ¶
Register an agent:
reg := registry.NewMemoryRegistry(registry.MemoryConfig{})
err := reg.Register(registry.AgentInfo{
ID: "agent-1",
Name: "Code Review Agent",
Capabilities: []string{"code-review", "testing"},
Status: registry.StatusIdle,
Load: 0.3,
})
Discover agents by capability:
agents, _ := reg.FindByCapability("code-review")
// Returns agents sorted by load (lowest first)
if len(agents) > 0 {
target := agents[0] // Pick the least loaded agent
}
Watch for changes:
events, _ := reg.Watch()
for event := range events {
switch event.Type {
case registry.EventAdded:
fmt.Printf("New agent: %s\n", event.Agent.ID)
case registry.EventUpdated:
fmt.Printf("Agent updated: %s (load=%.2f)\n", event.Agent.ID, event.Agent.Load)
case registry.EventRemoved:
fmt.Printf("Agent removed: %s\n", event.Agent.ID)
}
}
NATS Registry ¶
For distributed deployments, use NATSRegistry with a shared NATS cluster:
import "github.com/vinayprograms/agentkit/bus"
// Reuse bus connection
natsBus, _ := bus.NewNATSBus(bus.NATSConfig{URL: "nats://localhost:4222"})
reg, _ := registry.NewNATSRegistry(natsBus.Conn(), registry.NATSRegistryConfig{
BucketName: "my-swarm-registry",
TTL: 30 * time.Second,
})
Multiple agents across different nodes share the same registry, enabling discovery and load balancing across the swarm.
TTL and Stale Entries ¶
Both implementations support TTL-based expiry. Agents should periodically re-register (heartbeat) to prevent being marked stale:
// Heartbeat every 10 seconds
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
reg.Register(AgentInfo{
ID: myID,
Status: currentStatus,
Load: currentLoad,
})
}
Load Balancing ¶
FindByCapability returns agents sorted by load (lowest first), enabling simple load-aware routing:
agents, _ := reg.FindByCapability("code-review")
agents, _ = reg.List(®istry.Filter{
Capability: "code-review",
Status: registry.StatusIdle,
MaxLoad: 0.8, // Only agents with load <= 80%
})
Package registry provides agent registration and discovery for swarm coordination.
Agents self-register with capabilities, status, and load. Other agents discover and route to appropriate handlers based on capabilities and availability.
Index ¶
- Variables
- func HasCapability(info AgentInfo, capability string) bool
- func MatchesFilter(info AgentInfo, filter *Filter) bool
- func ValidateAgentInfo(info AgentInfo) error
- type AgentInfo
- type CapabilitySchema
- type Event
- type EventType
- type FieldSchema
- type Filter
- type MemoryConfig
- type MemoryRegistry
- func (r *MemoryRegistry) Close() error
- func (r *MemoryRegistry) Deregister(id string) error
- func (r *MemoryRegistry) FindByCapability(capability string) ([]AgentInfo, error)
- func (r *MemoryRegistry) FindByEmbedding(queryVec []float64, maxResults int) ([]AgentInfo, error)
- func (r *MemoryRegistry) Get(id string) (*AgentInfo, error)
- func (r *MemoryRegistry) List(filter *Filter) ([]AgentInfo, error)
- func (r *MemoryRegistry) Register(info AgentInfo) error
- func (r *MemoryRegistry) Touch(id string) error
- func (r *MemoryRegistry) Watch() (<-chan Event, error)
- type NATSRegistry
- func (r *NATSRegistry) Close() error
- func (r *NATSRegistry) Conn() *nats.Conn
- func (r *NATSRegistry) Deregister(id string) error
- func (r *NATSRegistry) FindByCapability(capability string) ([]AgentInfo, error)
- func (r *NATSRegistry) FindByEmbedding(queryVec []float64, maxResults int) ([]AgentInfo, error)
- func (r *NATSRegistry) Get(id string) (*AgentInfo, error)
- func (r *NATSRegistry) List(filter *Filter) ([]AgentInfo, error)
- func (r *NATSRegistry) Register(info AgentInfo) error
- func (r *NATSRegistry) Touch(id string) error
- func (r *NATSRegistry) Watch() (<-chan Event, error)
- type NATSRegistryConfig
- type Registry
- type ServiceAgentInfo
- type Status
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("agent not found") ErrClosed = errors.New("registry closed") ErrInvalidID = errors.New("invalid agent ID") ErrDuplicateID = errors.New("duplicate agent ID") )
Common errors.
Functions ¶
func HasCapability ¶
HasCapability checks if an agent has a specific capability. Supports exact match and wildcard prefix (e.g., "code.*" matches "code.golang").
func MatchesFilter ¶
MatchesFilter checks if an agent matches the filter criteria.
func ValidateAgentInfo ¶
ValidateAgentInfo checks if agent info is valid.
Types ¶
type AgentInfo ¶
type AgentInfo struct {
// ID uniquely identifies the agent.
ID string
// Name is a human-readable name for the agent.
Name string
// Capabilities lists what the agent can do (e.g., "code-review", "testing").
Capabilities []string
// Status is the agent's current operational state.
Status Status
// Load is the agent's current load (0.0-1.0).
Load float64
// Metadata contains additional key-value pairs.
Metadata map[string]string
// Embedding is the vector representation for semantic matching.
Embedding []float64 `json:"embedding,omitempty"`
// LastSeen is when the agent last updated its registration.
LastSeen time.Time
}
AgentInfo contains registration information for an agent.
type CapabilitySchema ¶ added in v0.2.0
type CapabilitySchema struct {
// Name is the capability identifier (usually Agentfile NAME)
Name string `json:"name"`
// Version for compatibility checking (semver recommended)
Version string `json:"version,omitempty"`
// Description is human-readable explanation
Description string `json:"description,omitempty"`
// Inputs describes required and optional input parameters
Inputs []FieldSchema `json:"inputs,omitempty"`
// Outputs describes what the capability produces
Outputs []FieldSchema `json:"outputs,omitempty"`
}
CapabilitySchema describes what an agent can do and its input/output contract.
func UnmarshalCapabilitySchema ¶ added in v0.2.0
func UnmarshalCapabilitySchema(data []byte) (*CapabilitySchema, error)
UnmarshalCapabilitySchema deserializes a capability schema from JSON.
func (*CapabilitySchema) GetInputDefault ¶ added in v0.2.0
func (c *CapabilitySchema) GetInputDefault(name string) (string, bool)
GetInputDefault returns the default value for an input field.
func (*CapabilitySchema) HasRequiredInput ¶ added in v0.2.0
func (c *CapabilitySchema) HasRequiredInput(name string) bool
HasRequiredInput checks if a capability has a specific required input.
func (*CapabilitySchema) Marshal ¶ added in v0.2.0
func (c *CapabilitySchema) Marshal() ([]byte, error)
Marshal serializes the capability schema to JSON.
func (*CapabilitySchema) Validate ¶ added in v0.2.0
func (c *CapabilitySchema) Validate() error
Validate checks if the capability schema is valid.
type Event ¶
type Event struct {
// Type indicates what happened.
Type EventType
// Agent contains the agent information.
// For removal events, this contains the last known state.
Agent AgentInfo
}
Event represents a change in the registry.
type FieldSchema ¶ added in v0.2.0
type FieldSchema struct {
// Name is the field identifier
Name string `json:"name"`
// Required indicates if the field must be provided
Required bool `json:"required"`
// Default value if not provided (empty string if no default)
Default string `json:"default,omitempty"`
// Type hint: "string", "number", "boolean", "json"
Type string `json:"type,omitempty"`
// Description is human-readable explanation
Description string `json:"description,omitempty"`
}
FieldSchema describes an input or output field.
type Filter ¶
type Filter struct {
// Status filters by operational state. Empty means all.
Status Status
// Capability filters to agents with this capability.
Capability string
// MaxLoad filters to agents with load at or below this value.
// Zero means no filter.
MaxLoad float64
}
Filter specifies criteria for listing agents.
type MemoryConfig ¶
type MemoryConfig struct {
// TTL specifies how long before an agent is considered stale.
// Zero means entries never expire.
TTL time.Duration
}
MemoryConfig configures the in-memory registry.
type MemoryRegistry ¶
type MemoryRegistry struct {
// contains filtered or unexported fields
}
MemoryRegistry is an in-memory implementation of Registry. Suitable for testing and single-node deployments.
func NewMemoryRegistry ¶
func NewMemoryRegistry(cfg MemoryConfig) *MemoryRegistry
NewMemoryRegistry creates a new in-memory registry.
func (*MemoryRegistry) Close ¶
func (r *MemoryRegistry) Close() error
Close shuts down the registry.
func (*MemoryRegistry) Deregister ¶
func (r *MemoryRegistry) Deregister(id string) error
Deregister removes an agent from the registry.
func (*MemoryRegistry) FindByCapability ¶
func (r *MemoryRegistry) FindByCapability(capability string) ([]AgentInfo, error)
FindByCapability returns agents with a specific capability.
func (*MemoryRegistry) FindByEmbedding ¶ added in v0.2.0
func (r *MemoryRegistry) FindByEmbedding(queryVec []float64, maxResults int) ([]AgentInfo, error)
FindByEmbedding returns agents ranked by cosine similarity to a query vector.
func (*MemoryRegistry) Get ¶
func (r *MemoryRegistry) Get(id string) (*AgentInfo, error)
Get retrieves a specific agent by ID.
func (*MemoryRegistry) List ¶
func (r *MemoryRegistry) List(filter *Filter) ([]AgentInfo, error)
List returns all agents matching the filter.
func (*MemoryRegistry) Register ¶
func (r *MemoryRegistry) Register(info AgentInfo) error
Register adds or updates an agent in the registry.
func (*MemoryRegistry) Touch ¶ added in v0.2.0
func (r *MemoryRegistry) Touch(id string) error
Touch refreshes an agent's LastSeen timestamp.
func (*MemoryRegistry) Watch ¶
func (r *MemoryRegistry) Watch() (<-chan Event, error)
Watch returns a channel of registry events.
type NATSRegistry ¶
type NATSRegistry struct {
// contains filtered or unexported fields
}
NATSRegistry implements Registry using NATS JetStream KV store. Suitable for distributed deployments across multiple nodes.
func NewNATSRegistry ¶
func NewNATSRegistry(conn *nats.Conn, cfg NATSRegistryConfig) (*NATSRegistry, error)
NewNATSRegistry creates a new NATS registry from an existing connection.
func (*NATSRegistry) Conn ¶
func (r *NATSRegistry) Conn() *nats.Conn
Conn returns the underlying NATS connection.
func (*NATSRegistry) Deregister ¶
func (r *NATSRegistry) Deregister(id string) error
Deregister removes an agent from the registry.
func (*NATSRegistry) FindByCapability ¶
func (r *NATSRegistry) FindByCapability(capability string) ([]AgentInfo, error)
FindByCapability returns agents with a specific capability.
func (*NATSRegistry) FindByEmbedding ¶ added in v0.2.0
func (r *NATSRegistry) FindByEmbedding(queryVec []float64, maxResults int) ([]AgentInfo, error)
FindByEmbedding returns agents ranked by cosine similarity to a query vector.
func (*NATSRegistry) Get ¶
func (r *NATSRegistry) Get(id string) (*AgentInfo, error)
Get retrieves a specific agent by ID.
func (*NATSRegistry) List ¶
func (r *NATSRegistry) List(filter *Filter) ([]AgentInfo, error)
List returns all agents matching the filter.
func (*NATSRegistry) Register ¶
func (r *NATSRegistry) Register(info AgentInfo) error
Register adds or updates an agent in the registry.
func (*NATSRegistry) Touch ¶ added in v0.2.0
func (r *NATSRegistry) Touch(id string) error
Touch refreshes an agent's TTL by re-putting its entry. Updates LastSeen timestamp. This is lighter than a full Register since the caller doesn't need to rebuild the AgentInfo.
func (*NATSRegistry) Watch ¶
func (r *NATSRegistry) Watch() (<-chan Event, error)
Watch returns a channel of registry events.
type NATSRegistryConfig ¶
type NATSRegistryConfig struct {
// BucketName is the KV bucket name. Default: "agent-registry"
BucketName string
// TTL for agent entries. Zero means no expiry.
// Note: NATS KV has its own TTL handling.
TTL time.Duration
// Replicas for the KV store (1-5). Default: 1
Replicas int
}
NATSRegistryConfig configures the NATS registry.
func DefaultNATSRegistryConfig ¶
func DefaultNATSRegistryConfig() NATSRegistryConfig
DefaultNATSRegistryConfig returns configuration with sensible defaults.
type Registry ¶
type Registry interface {
// Register adds or updates an agent in the registry.
// If an agent with the same ID exists, it updates the entry.
Register(info AgentInfo) error
// Touch refreshes an agent's TTL without rewriting the full entry.
// Updates LastSeen and resets the KV entry age for TTL purposes.
Touch(id string) error
// Deregister removes an agent from the registry.
// Returns ErrNotFound if the agent doesn't exist.
Deregister(id string) error
// Get retrieves a specific agent by ID.
// Returns nil, ErrNotFound if not found.
Get(id string) (*AgentInfo, error)
// List returns all agents matching the optional filter.
// Pass nil for no filtering.
List(filter *Filter) ([]AgentInfo, error)
// FindByCapability returns agents with a specific capability.
// Supports exact match and wildcard (e.g., "code.*").
// Results are sorted by load (lowest first).
FindByCapability(capability string) ([]AgentInfo, error)
// FindByEmbedding returns agents ranked by cosine similarity to a query vector.
// Agents must have an Embedding field set. Returns up to maxResults matches.
FindByEmbedding(queryVec []float64, maxResults int) ([]AgentInfo, error)
// Watch returns a channel of registry events.
// The channel is closed when the registry is closed.
// Multiple watchers are supported.
Watch() (<-chan Event, error)
// Close shuts down the registry client.
Close() error
}
Registry provides agent registration and discovery.
type ServiceAgentInfo ¶ added in v0.2.0
type ServiceAgentInfo struct {
AgentInfo
// Capability this service agent provides (one per agent)
Capability CapabilitySchema `json:"capability"`
// NodeID identifies the machine/container running this agent
NodeID string `json:"node_id,omitempty"`
// Version of the agent software
AgentVersion string `json:"agent_version,omitempty"`
}
ServiceAgentInfo extends AgentInfo with capability schema for service agents.