alan

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2026 License: MIT Imports: 11 Imported by: 0

README

alan

License Coverage GitHub Workflow Status Go Report Card Go PKG

UDP peer discovery and communication library for Go with optional ChaCha20-Poly1305 encryption.

Features

  • DNS-based peer discovery - Resolve a DNS name to discover cluster members
  • Automatic membership - JOIN/LEAVE/HEARTBEAT protocol for peer tracking
  • Encrypted communication - Optional ChaCha20-Poly1305 authenticated encryption
  • Request-Reply pattern - Send requests and wait for responses from peers
  • Distributed locking - Named locks with automatic release on peer disconnect
  • Quorum support - Configurable quorum requirement for distributed operations
  • Simple API - Start(), Send(), Stop() - that's it
  • Callbacks - Get notified when peers join or leave
  • Auto-refresh - Optionally re-resolve DNS to discover new peers

Installation

go get github.com/rakunlabs/alan

Quick Start

package main

import (
    "context"
    "fmt"
    "net"
    "github.com/rakunlabs/alan"
)

func main() {
    // Create Alan instance
    a, err := alan.New(alan.Config{
        DNSAddr: "my-cluster.local",  // DNS name for peer discovery
        Port:    5000,
    })
    if err != nil {
        panic(err)
    }

    // Optional: Get notified when peers join/leave
    a.OnPeerJoin(func(addr *net.UDPAddr) {
        fmt.Printf("Peer joined: %s\n", addr)
    })
    a.OnPeerLeave(func(addr *net.UDPAddr) {
        fmt.Printf("Peer left: %s\n", addr)
    })

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start in background
    go func() {
        a.Start(ctx, func(ctx context.Context, msg alan.Message) {
            fmt.Printf("Received from %s: %s\n", msg.Addr, msg.Data)
        })
    }()

    // Send to all peers (waits for quorum if configured)
    a.Send(ctx, []byte("Hello everyone!"))

    // Send to specific peer (direct send, no quorum check)
    a.SendTo(specificAddr, []byte("Hello you!"))

    // Graceful shutdown
    a.Stop()
}

With Encryption

a, err := alan.New(alan.Config{
    DNSAddr: "my-cluster.local",
    Port:    5000,
    Security: &alan.SecurityConfig{
        Key:     []byte("12345678901234567890123456789012"), // 32 bytes
        Enabled: true,
    },
})

All messages (including membership protocol) are automatically encrypted.

Request-Reply Pattern

Alan supports a request-reply pattern for scenarios where you need responses from peers:

Send to All Peers and Collect Responses
// Broadcast request to all peers and wait for their responses
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

replies, err := a.SendAndWaitReply(ctx, []byte("status-request"))
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
    log.Fatal(err)
}

for _, reply := range replies {
    fmt.Printf("Response from %s: %s\n", reply.Addr, reply.Data)
}
Send to Specific Peer and Wait for Response
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

reply, err := a.SendToAndWaitReply(ctx, peerAddr, []byte("ping"))
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Got response: %s\n", reply.Data)
Handling Requests (Responder Side)
a.Start(ctx, func(ctx context.Context, msg alan.Message) {
    if msg.IsRequest() {
        // This is a request expecting a reply
        response := processRequest(msg.Data)
        a.Reply(msg, response)
    } else {
        // Regular fire-and-forget message
        handleMessage(msg.Data)
    }
})
Notes on Request-Reply
  • Smart peer tracking: The library tracks which peers you're waiting for responses from
  • Early return on disconnect: If a peer disconnects (gracefully or via heartbeat timeout) while waiting, the library automatically adjusts:
    • SendAndWaitReply: Removes the disconnected peer from expected responses and returns when all remaining peers have responded
    • SendToAndWaitReply: Returns immediately with ErrPeerDisconnected if the target peer disconnects
  • No infinite waits: Because peer disconnects are detected via the membership protocol, requests won't wait forever for unresponsive peers
  • The request ID correlation is handled automatically by the library

Distributed Locking

Alan provides distributed named locks for coordinating work across peers:

Basic Lock Usage
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Acquire a named lock (blocks until acquired or context cancelled)
err := a.Lock(ctx, "my-job")
if err != nil {
    log.Fatal("failed to acquire lock:", err)
}

// Do protected work...
doExclusiveWork()

// Release the lock
err = a.Unlock("my-job")
if err != nil {
    log.Fatal("failed to release lock:", err)
}
TryLock (Non-blocking)
// Try to acquire lock without blocking
if a.TryLock("my-job") {
    // Got the lock
    defer a.Unlock("my-job")
    doWork()
} else {
    // Lock is held by another peer
    log.Println("could not acquire lock")
}
Lock Features
  • Named locks: Multiple independent locks identified by key
  • Auto-release: Locks are automatically released when the holder disconnects
  • Quorum-aware: When quorum is enabled, Lock() waits for quorum before acquiring
  • Context support: Lock() respects context cancellation and deadlines
Lock Limitations

The distributed lock provides best-effort coordination, not strong consistency:

  • Split-brain possible: During network partitions, multiple peers might acquire the same lock
  • No fencing tokens: There's no mechanism to prove lock ownership to external systems
  • Startup race: If peers start simultaneously before discovering each other, both might acquire a lock

Use quorum configuration to mitigate the startup race condition.

Quorum

Quorum ensures operations only proceed when enough peers are present in the cluster:

Configuration
a, err := alan.New(alan.Config{
    DNSAddr: "my-cluster.local",
    Port:    5000,
    Quorum:  3, // Expected cluster size
})

With Quorum: 3, operations require (3/2)+1 = 2 peers to be present.

Quorum Setting Required Peers
0 (default) Disabled
1 1
2 2
3 2
4 3
5 3
Quorum-Aware Operations
Operation Quorum Behavior
Lock(ctx, key) Waits for quorum, then acquires lock
TryLock(key) Returns false if quorum not met
Send(ctx, data) Waits for quorum, then broadcasts
SendAndWaitReply(ctx, data) Waits for quorum, then broadcasts
SendTo(addr, data) No quorum check (direct send)
Checking Quorum Status
// Check if quorum is currently met
if a.HasQuorum() {
    // Safe to proceed
}

// Get required peer count
required := a.QuorumSize() // Returns (Quorum/2)+1

// Wait for quorum before starting work
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := a.WaitForQuorum(ctx); err != nil {
    log.Fatal("cluster not ready:", err)
}

Configuration

type Config struct {
    // DNSAddr is the DNS name to resolve for discovering peers (required)
    DNSAddr string
    
    // BindAddr is the local IP address to bind to (default: "0.0.0.0" for all interfaces)
    // Useful when running multiple instances on same machine with different IPs
    BindAddr string
    
    // Port is the UDP port to use (default: 5000)
    // IMPORTANT: All peers in the cluster MUST use the same port
    Port int
    
    // Timeout is the read/write timeout (default: 5s)
    Timeout time.Duration
    
    // BufferSize for receiving messages (default: 4096)
    BufferSize int
    
    // HeartbeatInterval - how often to send heartbeats (default: 5s)
    HeartbeatInterval time.Duration
    
    // HeartbeatTimeout - when a peer is considered dead (default: 15s)
    HeartbeatTimeout time.Duration
    
    // RefreshInterval - how often to re-resolve DNS (default: 30s, set to -1 to disable)
    // Note: Refresh only adds new peers; stale peers are removed via heartbeat timeout
    RefreshInterval time.Duration
    
    // MessageQueueSize - per-peer message buffer size (default: 256)
    // Messages from the same peer are processed in order.
    // When the queue is full, the listener blocks until space is available.
    MessageQueueSize int
    
    // Quorum - expected cluster size for distributed operations (default: 0 = disabled)
    // When set, operations like Lock() and SendCtx() wait until (Quorum/2)+1 peers are present
    Quorum int
    
    // Security for encryption (optional)
    Security *SecurityConfig
}

type SecurityConfig struct {
    // Key must be exactly 32 bytes for ChaCha20-Poly1305
    Key     []byte
    Enabled bool
}

Note: All peers in the cluster must use the same port. DNS only provides IP addresses, so the library assumes all peers listen on the configured port.

How It Works

Peer Discovery
  1. On Start(), the library resolves DNSAddr to get initial peer IPs
  2. Sends JOIN message to all discovered peers
  3. Other peers add the new member to their peer list
Membership Protocol

The library uses a simple internal protocol:

Message Purpose
JOIN Announce joining the cluster
LEAVE Announce graceful departure
HEARTBEAT Periodic keepalive
DATA User data message
REQUEST Request message expecting a response
RESPONSE Response to a request message
LOCK_REQUEST Request to acquire a distributed lock
LOCK_GRANT Grant lock to requester
LOCK_DENY Deny lock (already held)
LOCK_RELEASE Notify lock has been released
  • JOIN: Sent on startup to all known peers
  • HEARTBEAT: Sent every HeartbeatInterval to all peers
  • LEAVE: Sent on Stop() to notify peers of graceful shutdown
  • Timeout: Peers not seen within HeartbeatTimeout are removed
Message Ordering

Messages from the same peer are guaranteed to be processed in order:

  • Each peer has a dedicated message queue (per-peer channel)
  • A worker goroutine processes messages from each queue sequentially
  • This ensures DATA and REQUEST messages from the same peer are handled in the order received
  • Queue size is configurable via MessageQueueSize (default: 256)
  • When a queue is full, the listener blocks (backpressure)
  • Queues are automatically cleaned up when peers leave or timeout
Peer Event Ordering

Peer join/leave events are also processed in order:

  • A single event queue handles all OnPeerJoin and OnPeerLeave callbacks
  • Events are processed sequentially by a dedicated worker
  • When the queue is full, the listener blocks (backpressure)
  • This ensures handlers see events in the order they occurred
Security

When encryption is enabled:

  • All messages (JOIN/LEAVE/HEARTBEAT/DATA) are encrypted
  • Uses XChaCha20-Poly1305 (AEAD)
  • Random 24-byte nonce per message
  • Wire format: [nonce:24][ciphertext+tag]

API Reference

Alan
Method Description
New(Config) Create new Alan instance
OnPeerJoin(handler) Set callback for peer join events
OnPeerLeave(handler) Set callback for peer leave events
Start(ctx, handler) Start the peer discovery system (blocking)
Stop() Gracefully stop and notify peers
Send(ctx, data) Send data to all peers (waits for quorum if enabled)
SendTo(addr, data) Send data to a specific peer (no quorum check)
SendAndWaitReply(ctx, data) Send request to all peers and wait for responses
SendToAndWaitReply(ctx, addr, data) Send request to specific peer and wait for response
Reply(msg, data) Send response to a request message
Lock(ctx, key) Acquire a distributed lock (blocking)
TryLock(key) Try to acquire a lock (non-blocking)
Unlock(key) Release a distributed lock
HasQuorum() Check if quorum is currently met
WaitForQuorum(ctx) Block until quorum is reached
QuorumSize() Get required peer count for quorum
Peers() Get list of current peer addresses
PeerCount() Get number of connected peers
Refresh() Manually re-resolve DNS
Ready() Returns channel closed when ready to send/receive
LocalAddr() Get local listening address
IsSecure() Check if encryption is enabled
Config() Get current configuration
Types
// Message received from a peer
type Message struct {
    Data []byte       // Decrypted payload
    Addr *net.UDPAddr // Sender's address
}

// Check if message is a request expecting a reply
func (m Message) IsRequest() bool

// Reply received from a peer (for request-reply pattern)
type Reply struct {
    Data []byte       // Response payload
    Addr *net.UDPAddr // Responder's address
}

// Result of sending to a peer
type SendResult struct {
    Addr  *net.UDPAddr
    Sent  int
    Error error
}

// Callbacks
type PeerHandler func(addr *net.UDPAddr)
type MessageHandler func(ctx context.Context, msg Message)

// Errors
var ErrPeerDisconnected = errors.New("peer disconnected before responding")
var ErrNoQuorum = errors.New("quorum not reached")
var ErrLockNotHeld = errors.New("lock not held by this instance")

License

MIT License - see LICENSE for details.

Documentation

Index

Constants

View Source
const (
	// MsgTypeJoin is sent when a peer joins the cluster
	MsgTypeJoin byte = 0x01
	// MsgTypeLeave is sent when a peer leaves the cluster gracefully
	MsgTypeLeave byte = 0x02
	// MsgTypeHeartbeat is sent periodically to maintain membership
	MsgTypeHeartbeat byte = 0x03
	// MsgTypeData is a user data message
	MsgTypeData byte = 0x10
	// MsgTypeRequest is a request message expecting a response
	MsgTypeRequest byte = 0x20
	// MsgTypeResponse is a response to a request message
	MsgTypeResponse byte = 0x21
	// MsgTypeLockRequest is sent to request a distributed lock
	MsgTypeLockRequest byte = 0x30
	// MsgTypeLockGrant is sent to grant a lock to the requester
	MsgTypeLockGrant byte = 0x31
	// MsgTypeLockDeny is sent to deny a lock request (already held)
	MsgTypeLockDeny byte = 0x32
	// MsgTypeLockRelease is sent to notify that a lock has been released
	MsgTypeLockRelease byte = 0x33
)

Message types for internal protocol

View Source
const RequestIDSize = 16

RequestIDSize is the size of request IDs in bytes

Variables

View Source
var (
	// ErrInvalidKeySize is returned when the key is not exactly 32 bytes
	ErrInvalidKeySize = errors.New("key must be exactly 32 bytes")
	// ErrSecurityNotEnabled is returned when trying to use encryption without enabling it
	ErrSecurityNotEnabled = errors.New("security is not enabled")
	// ErrMessageTooShort is returned when encrypted message is shorter than nonce size
	ErrMessageTooShort = errors.New("encrypted message too short")
	// ErrDecryptionFailed is returned when message authentication fails
	ErrDecryptionFailed = errors.New("decryption failed: message authentication failed")

	// ErrAlreadyStarted is returned when Start is called on an already running instance
	ErrAlreadyStarted = errors.New("alan is already started")
	// ErrNotStarted is returned when operations are attempted before Start
	ErrNotStarted = errors.New("alan is not started")
	// ErrPeerDisconnected is returned when the target peer disconnects before responding
	ErrPeerDisconnected = errors.New("peer disconnected before responding")
	// ErrNoQuorum is returned when quorum is not reached
	ErrNoQuorum = errors.New("quorum not reached")
	// ErrLockNotHeld is returned when trying to unlock a lock not held by this instance
	ErrLockNotHeld = errors.New("lock not held by this instance")
)

Functions

This section is empty.

Types

type Alan

type Alan struct {
	// contains filtered or unexported fields
}

Alan is the main entry point for the UDP peer discovery library.

func New

func New(config Config) (*Alan, error)

New creates a new Alan instance with the given configuration.

func (*Alan) Config

func (a *Alan) Config() Config

Config returns a copy of the current configuration

func (*Alan) HasQuorum added in v0.1.3

func (a *Alan) HasQuorum() bool

HasQuorum returns true if the current number of peers meets the quorum requirement. Always returns true if quorum is disabled (Quorum == 0).

func (*Alan) IsSecure

func (a *Alan) IsSecure() bool

IsSecure returns true if encryption is enabled

func (*Alan) LocalAddr

func (a *Alan) LocalAddr() net.Addr

LocalAddr returns the local address the server is listening on

func (*Alan) Lock added in v0.1.3

func (a *Alan) Lock(ctx context.Context, key string) error

Lock acquires a named distributed lock, blocking until acquired or context cancelled. If quorum is enabled, it waits for quorum before attempting to acquire the lock. Returns nil on success, ctx.Err() if context is cancelled.

func (*Alan) OnPeerJoin

func (a *Alan) OnPeerJoin(handler PeerHandler)

OnPeerJoin sets the callback for when a peer joins the cluster

func (*Alan) OnPeerLeave

func (a *Alan) OnPeerLeave(handler PeerHandler)

OnPeerLeave sets the callback for when a peer leaves the cluster

func (*Alan) PeerCount

func (a *Alan) PeerCount() int

PeerCount returns the number of connected peers

func (*Alan) Peers

func (a *Alan) Peers() []*net.UDPAddr

Peers returns the list of current peer addresses

func (*Alan) QuorumSize added in v0.1.3

func (a *Alan) QuorumSize() int

QuorumSize returns the number of peers required for quorum. Returns (Quorum/2)+1 if Quorum is set, or 0 if quorum is disabled.

func (*Alan) Ready

func (a *Alan) Ready() <-chan struct{}

Ready returns a channel that is closed when the instance is ready to send/receive. Use this to wait for Start() to complete initialization before calling Send/SendTo.

func (*Alan) Refresh

func (a *Alan) Refresh() error

Refresh re-resolves DNS and discovers new peers. If DNSAddr is empty or DNS resolution fails, it returns nil without error.

func (*Alan) Reply

func (a *Alan) Reply(msg Message, data []byte) (int, error)

Reply sends a response to a request message. This should be called from the message handler when processing a request. Returns an error if the message is not a request.

func (*Alan) Send

func (a *Alan) Send(ctx context.Context, data []byte) []SendResult

Send broadcasts data to all peers. If quorum is enabled, it waits for quorum before sending. Returns ctx.Err() in each result if context is cancelled before quorum is reached.

func (*Alan) SendAndWaitReply

func (a *Alan) SendAndWaitReply(ctx context.Context, data []byte) ([]Reply, error)

SendAndWaitReply broadcasts a request to all peers and waits for their responses. It waits for quorum before sending if quorum is enabled. It returns all replies received before the context is cancelled or deadline exceeded. If peers disconnect while waiting, they are removed from the expected responses. The method returns when all remaining peers have responded or the context is done.

func (*Alan) SendTo

func (a *Alan) SendTo(addr *net.UDPAddr, data []byte) (int, error)

SendTo sends data to a specific peer. This method does NOT wait for quorum - it's a direct send to a known peer.

func (*Alan) SendToAndWaitReply

func (a *Alan) SendToAndWaitReply(ctx context.Context, addr *net.UDPAddr, data []byte) (*Reply, error)

SendToAndWaitReply sends a request to a specific peer and waits for its response. Returns ErrPeerDisconnected if the target peer disconnects before responding.

func (*Alan) Start

func (a *Alan) Start(ctx context.Context, handler MessageHandler) error

Start initializes the peer discovery system: - Resolves DNSAddr to discover initial peers (if configured and resolvable) - Starts UDP server - Sends JOIN to all peers - Starts heartbeat goroutine This method blocks until the context is cancelled or Stop() is called.

func (*Alan) Stop

func (a *Alan) Stop() error

Stop gracefully stops the peer discovery system

func (*Alan) TryLock added in v0.1.3

func (a *Alan) TryLock(key string) bool

TryLock attempts to acquire a named distributed lock without blocking. Returns true if the lock was acquired, false otherwise. If quorum is enabled and not met, returns false.

func (*Alan) Unlock added in v0.1.3

func (a *Alan) Unlock(key string) error

Unlock releases a named distributed lock. Returns ErrLockNotHeld if this instance does not hold the lock.

func (*Alan) WaitForQuorum added in v0.1.3

func (a *Alan) WaitForQuorum(ctx context.Context) error

WaitForQuorum blocks until quorum is reached or the context is cancelled. Returns nil immediately if quorum is disabled (Quorum == 0). Returns ctx.Err() if the context is cancelled before quorum is reached.

type Config

type Config struct {
	// DNSAddr is the DNS name to resolve for discovering peers (optional).
	// If empty or DNS resolution fails, the library will still start and
	// can discover peers through incoming messages or later DNS resolution.
	DNSAddr string `cfg:"dns_addr" json:"dns_addr"`
	// BindAddr is the local address to bind to (default: "0.0.0.0" for all interfaces)
	BindAddr string `cfg:"bind_addr" json:"bind_addr"`
	// Port is the UDP port to use (default: 5000)
	// IMPORTANT: All peers in the cluster MUST use the same port
	Port int `cfg:"port" json:"port"`
	// Timeout is the read/write timeout duration (default: 5s)
	Timeout time.Duration `cfg:"timeout" json:"timeout"`
	// BufferSize is the buffer size for receiving messages (default: 4096)
	BufferSize int `cfg:"buffer_size" json:"buffer_size"`
	// Security holds optional encryption configuration
	Security *SecurityConfig `cfg:"security" json:"security"`
	// HeartbeatInterval is how often to send heartbeats (default: 5s)
	HeartbeatInterval time.Duration `cfg:"heartbeat_interval" json:"heartbeat_interval"`
	// HeartbeatTimeout is when a peer is considered dead (default: 15s)
	HeartbeatTimeout time.Duration `cfg:"heartbeat_timeout" json:"heartbeat_timeout"`
	// RefreshInterval is how often to re-resolve DNS (default: 30s, set to -1 to disable)
	RefreshInterval time.Duration `cfg:"refresh_interval" json:"refresh_interval"`
	// MessageQueueSize is the per-peer message buffer size (default: 256)
	// Messages from the same peer are processed in order.
	// When the queue is full, the listener blocks until space is available.
	MessageQueueSize int `cfg:"message_queue_size" json:"message_queue_size"`
	// Quorum is the expected cluster size for distributed operations.
	// When set, operations like Lock() and Send() will wait until
	// (Quorum/2)+1 peers are present before proceeding.
	// Set to 0 to disable quorum checks (default).
	Quorum int `cfg:"quorum" json:"quorum"`
}

Config holds all configuration for Alan

type Message

type Message struct {
	// Data contains the decrypted message payload
	Data []byte
	// Addr is the sender's address
	Addr *net.UDPAddr
	// contains filtered or unexported fields
}

Message represents an incoming data message from a peer

func (Message) IsRequest

func (m Message) IsRequest() bool

IsRequest returns true if this message is a request expecting a reply

type MessageHandler

type MessageHandler func(ctx context.Context, msg Message)

MessageHandler is a callback for receiving data messages

type Peer

type Peer struct {
	Addr     *net.UDPAddr
	LastSeen time.Time
}

Peer represents a remote peer in the cluster

type PeerHandler

type PeerHandler func(addr *net.UDPAddr)

PeerHandler is a callback for peer membership events

type Reply

type Reply struct {
	// Data contains the response payload
	Data []byte
	// Addr is the responder's address
	Addr *net.UDPAddr
}

Reply represents a response from a peer to a request

type SecurityConfig

type SecurityConfig struct {
	// Key is the pre-shared key for ChaCha20-Poly1305 encryption.
	// Must be exactly 32 bytes.
	Key []byte `cfg:"key" json:"key"`
	// Enabled determines whether encryption is active
	Enabled bool `cfg:"enabled" json:"enabled"`
}

SecurityConfig holds encryption settings

type SendResult

type SendResult struct {
	// Addr is the peer address
	Addr *net.UDPAddr
	// Sent is the number of bytes sent
	Sent int
	// Error is any error that occurred
	Error error
}

SendResult contains the result of sending to a single peer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL