registry

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package registry provides agent registration and discovery for swarm coordination.

Overview

The Registry interface enables agents to self-register with capabilities, status, and load. Other agents discover and route to appropriate handlers based on capabilities and availability.

Available Implementations

  • MemoryRegistry: In-memory implementation for testing and single-node use
  • NATSRegistry: Distributed registry using NATS JetStream KV store

Basic Usage

Register an agent:

reg := registry.NewMemoryRegistry(registry.MemoryConfig{})
err := reg.Register(registry.AgentInfo{
    ID:           "agent-1",
    Name:         "Code Review Agent",
    Capabilities: []string{"code-review", "testing"},
    Status:       registry.StatusIdle,
    Load:         0.3,
})

Discover agents by capability:

agents, _ := reg.FindByCapability("code-review")
// Returns agents sorted by load (lowest first)
if len(agents) > 0 {
    target := agents[0] // Pick the least loaded agent
}

Watch for changes:

events, _ := reg.Watch()
for event := range events {
    switch event.Type {
    case registry.EventAdded:
        fmt.Printf("New agent: %s\n", event.Agent.ID)
    case registry.EventUpdated:
        fmt.Printf("Agent updated: %s (load=%.2f)\n", event.Agent.ID, event.Agent.Load)
    case registry.EventRemoved:
        fmt.Printf("Agent removed: %s\n", event.Agent.ID)
    }
}

NATS Registry

For distributed deployments, use NATSRegistry with a shared NATS cluster:

import "github.com/vinayprograms/agentkit/bus"

// Reuse bus connection
natsBus, _ := bus.NewNATSBus(bus.NATSConfig{URL: "nats://localhost:4222"})
reg, _ := registry.NewNATSRegistry(natsBus.Conn(), registry.NATSRegistryConfig{
    BucketName: "my-swarm-registry",
    TTL:        30 * time.Second,
})

Multiple agents across different nodes share the same registry, enabling discovery and load balancing across the swarm.

TTL and Stale Entries

Both implementations support TTL-based expiry. Agents should periodically re-register (heartbeat) to prevent being marked stale:

// Heartbeat every 10 seconds
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
    reg.Register(AgentInfo{
        ID:     myID,
        Status: currentStatus,
        Load:   currentLoad,
    })
}

Load Balancing

FindByCapability returns agents sorted by load (lowest first), enabling simple load-aware routing:

agents, _ := reg.FindByCapability("code-review")
agents, _ = reg.List(&registry.Filter{
    Capability: "code-review",
    Status:     registry.StatusIdle,
    MaxLoad:    0.8,  // Only agents with load <= 80%
})

Package registry provides agent registration and discovery for swarm coordination.

Agents self-register with capabilities, status, and load. Other agents discover and route to appropriate handlers based on capabilities and availability.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound    = errors.New("agent not found")
	ErrClosed      = errors.New("registry closed")
	ErrInvalidID   = errors.New("invalid agent ID")
	ErrDuplicateID = errors.New("duplicate agent ID")
)

Common errors.

Functions

func HasCapability

func HasCapability(info AgentInfo, capability string) bool

HasCapability checks if an agent has a specific capability. Supports exact match and wildcard prefix (e.g., "code.*" matches "code.golang").

func MatchesFilter

func MatchesFilter(info AgentInfo, filter *Filter) bool

MatchesFilter checks if an agent matches the filter criteria.

func ValidateAgentInfo

func ValidateAgentInfo(info AgentInfo) error

ValidateAgentInfo checks if agent info is valid.

Types

type AgentInfo

type AgentInfo struct {
	// ID uniquely identifies the agent.
	ID string

	// Name is a human-readable name for the agent.
	Name string

	// Capabilities lists what the agent can do (e.g., "code-review", "testing").
	Capabilities []string

	// Status is the agent's current operational state.
	Status Status

	// Load is the agent's current load (0.0-1.0).
	Load float64

	// Metadata contains additional key-value pairs.
	Metadata map[string]string

	// Embedding is the vector representation for semantic matching.
	Embedding []float64 `json:"embedding,omitempty"`

	// LastSeen is when the agent last updated its registration.
	LastSeen time.Time
}

AgentInfo contains registration information for an agent.

type CapabilitySchema added in v0.2.0

type CapabilitySchema struct {
	// Name is the capability identifier (usually Agentfile NAME)
	Name string `json:"name"`

	// Version for compatibility checking (semver recommended)
	Version string `json:"version,omitempty"`

	// Description is human-readable explanation
	Description string `json:"description,omitempty"`

	// Inputs describes required and optional input parameters
	Inputs []FieldSchema `json:"inputs,omitempty"`

	// Outputs describes what the capability produces
	Outputs []FieldSchema `json:"outputs,omitempty"`
}

CapabilitySchema describes what an agent can do and its input/output contract.

func UnmarshalCapabilitySchema added in v0.2.0

func UnmarshalCapabilitySchema(data []byte) (*CapabilitySchema, error)

UnmarshalCapabilitySchema deserializes a capability schema from JSON.

func (*CapabilitySchema) GetInputDefault added in v0.2.0

func (c *CapabilitySchema) GetInputDefault(name string) (string, bool)

GetInputDefault returns the default value for an input field.

func (*CapabilitySchema) HasRequiredInput added in v0.2.0

func (c *CapabilitySchema) HasRequiredInput(name string) bool

HasRequiredInput checks if a capability has a specific required input.

func (*CapabilitySchema) Marshal added in v0.2.0

func (c *CapabilitySchema) Marshal() ([]byte, error)

Marshal serializes the capability schema to JSON.

func (*CapabilitySchema) Validate added in v0.2.0

func (c *CapabilitySchema) Validate() error

Validate checks if the capability schema is valid.

type Event

type Event struct {
	// Type indicates what happened.
	Type EventType

	// Agent contains the agent information.
	// For removal events, this contains the last known state.
	Agent AgentInfo
}

Event represents a change in the registry.

type EventType

type EventType string

EventType represents the type of registry event.

const (
	EventAdded   EventType = "added"
	EventUpdated EventType = "updated"
	EventRemoved EventType = "removed"
)

type FieldSchema added in v0.2.0

type FieldSchema struct {
	// Name is the field identifier
	Name string `json:"name"`

	// Required indicates if the field must be provided
	Required bool `json:"required"`

	// Default value if not provided (empty string if no default)
	Default string `json:"default,omitempty"`

	// Type hint: "string", "number", "boolean", "json"
	Type string `json:"type,omitempty"`

	// Description is human-readable explanation
	Description string `json:"description,omitempty"`
}

FieldSchema describes an input or output field.

type Filter

type Filter struct {
	// Status filters by operational state. Empty means all.
	Status Status

	// Capability filters to agents with this capability.
	Capability string

	// MaxLoad filters to agents with load at or below this value.
	// Zero means no filter.
	MaxLoad float64
}

Filter specifies criteria for listing agents.

type MemoryConfig

type MemoryConfig struct {
	// TTL specifies how long before an agent is considered stale.
	// Zero means entries never expire.
	TTL time.Duration
}

MemoryConfig configures the in-memory registry.

type MemoryRegistry

type MemoryRegistry struct {
	// contains filtered or unexported fields
}

MemoryRegistry is an in-memory implementation of Registry. Suitable for testing and single-node deployments.

func NewMemoryRegistry

func NewMemoryRegistry(cfg MemoryConfig) *MemoryRegistry

NewMemoryRegistry creates a new in-memory registry.

func (*MemoryRegistry) Close

func (r *MemoryRegistry) Close() error

Close shuts down the registry.

func (*MemoryRegistry) Deregister

func (r *MemoryRegistry) Deregister(id string) error

Deregister removes an agent from the registry.

func (*MemoryRegistry) FindByCapability

func (r *MemoryRegistry) FindByCapability(capability string) ([]AgentInfo, error)

FindByCapability returns agents with a specific capability.

func (*MemoryRegistry) FindByEmbedding added in v0.2.0

func (r *MemoryRegistry) FindByEmbedding(queryVec []float64, maxResults int) ([]AgentInfo, error)

FindByEmbedding returns agents ranked by cosine similarity to a query vector.

func (*MemoryRegistry) Get

func (r *MemoryRegistry) Get(id string) (*AgentInfo, error)

Get retrieves a specific agent by ID.

func (*MemoryRegistry) List

func (r *MemoryRegistry) List(filter *Filter) ([]AgentInfo, error)

List returns all agents matching the filter.

func (*MemoryRegistry) Register

func (r *MemoryRegistry) Register(info AgentInfo) error

Register adds or updates an agent in the registry.

func (*MemoryRegistry) Touch added in v0.2.0

func (r *MemoryRegistry) Touch(id string) error

Touch refreshes an agent's LastSeen timestamp.

func (*MemoryRegistry) Watch

func (r *MemoryRegistry) Watch() (<-chan Event, error)

Watch returns a channel of registry events.

type NATSRegistry

type NATSRegistry struct {
	// contains filtered or unexported fields
}

NATSRegistry implements Registry using NATS JetStream KV store. Suitable for distributed deployments across multiple nodes.

func NewNATSRegistry

func NewNATSRegistry(conn *nats.Conn, cfg NATSRegistryConfig) (*NATSRegistry, error)

NewNATSRegistry creates a new NATS registry from an existing connection.

func (*NATSRegistry) Close

func (r *NATSRegistry) Close() error

Close shuts down the registry.

func (*NATSRegistry) Conn

func (r *NATSRegistry) Conn() *nats.Conn

Conn returns the underlying NATS connection.

func (*NATSRegistry) Deregister

func (r *NATSRegistry) Deregister(id string) error

Deregister removes an agent from the registry.

func (*NATSRegistry) FindByCapability

func (r *NATSRegistry) FindByCapability(capability string) ([]AgentInfo, error)

FindByCapability returns agents with a specific capability.

func (*NATSRegistry) FindByEmbedding added in v0.2.0

func (r *NATSRegistry) FindByEmbedding(queryVec []float64, maxResults int) ([]AgentInfo, error)

FindByEmbedding returns agents ranked by cosine similarity to a query vector.

func (*NATSRegistry) Get

func (r *NATSRegistry) Get(id string) (*AgentInfo, error)

Get retrieves a specific agent by ID.

func (*NATSRegistry) List

func (r *NATSRegistry) List(filter *Filter) ([]AgentInfo, error)

List returns all agents matching the filter.

func (*NATSRegistry) Register

func (r *NATSRegistry) Register(info AgentInfo) error

Register adds or updates an agent in the registry.

func (*NATSRegistry) Touch added in v0.2.0

func (r *NATSRegistry) Touch(id string) error

Touch refreshes an agent's TTL by re-putting its entry. Updates LastSeen timestamp. This is lighter than a full Register since the caller doesn't need to rebuild the AgentInfo.

func (*NATSRegistry) Watch

func (r *NATSRegistry) Watch() (<-chan Event, error)

Watch returns a channel of registry events.

type NATSRegistryConfig

type NATSRegistryConfig struct {
	// BucketName is the KV bucket name. Default: "agent-registry"
	BucketName string

	// TTL for agent entries. Zero means no expiry.
	// Note: NATS KV has its own TTL handling.
	TTL time.Duration

	// Replicas for the KV store (1-5). Default: 1
	Replicas int
}

NATSRegistryConfig configures the NATS registry.

func DefaultNATSRegistryConfig

func DefaultNATSRegistryConfig() NATSRegistryConfig

DefaultNATSRegistryConfig returns configuration with sensible defaults.

type Registry

type Registry interface {
	// Register adds or updates an agent in the registry.
	// If an agent with the same ID exists, it updates the entry.
	Register(info AgentInfo) error

	// Touch refreshes an agent's TTL without rewriting the full entry.
	// Updates LastSeen and resets the KV entry age for TTL purposes.
	Touch(id string) error

	// Deregister removes an agent from the registry.
	// Returns ErrNotFound if the agent doesn't exist.
	Deregister(id string) error

	// Get retrieves a specific agent by ID.
	// Returns nil, ErrNotFound if not found.
	Get(id string) (*AgentInfo, error)

	// List returns all agents matching the optional filter.
	// Pass nil for no filtering.
	List(filter *Filter) ([]AgentInfo, error)

	// FindByCapability returns agents with a specific capability.
	// Supports exact match and wildcard (e.g., "code.*").
	// Results are sorted by load (lowest first).
	FindByCapability(capability string) ([]AgentInfo, error)

	// FindByEmbedding returns agents ranked by cosine similarity to a query vector.
	// Agents must have an Embedding field set. Returns up to maxResults matches.
	FindByEmbedding(queryVec []float64, maxResults int) ([]AgentInfo, error)

	// Watch returns a channel of registry events.
	// The channel is closed when the registry is closed.
	// Multiple watchers are supported.
	Watch() (<-chan Event, error)

	// Close shuts down the registry client.
	Close() error
}

Registry provides agent registration and discovery.

type ServiceAgentInfo added in v0.2.0

type ServiceAgentInfo struct {
	AgentInfo

	// Capability this service agent provides (one per agent)
	Capability CapabilitySchema `json:"capability"`

	// NodeID identifies the machine/container running this agent
	NodeID string `json:"node_id,omitempty"`

	// Version of the agent software
	AgentVersion string `json:"agent_version,omitempty"`
}

ServiceAgentInfo extends AgentInfo with capability schema for service agents.

type Status

type Status string

Status represents an agent's operational state.

const (
	StatusIdle     Status = "idle"
	StatusBusy     Status = "busy"
	StatusRunning  Status = "running"
	StatusStopping Status = "stopping"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL