pantheon

package module
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2025 License: MIT Imports: 10 Imported by: 0

README

Pantheon

Pantheon is a toolkit for distributing workloads across distributed nodes using a consistent hashring.

Features

  • Consistent hashing algorithm for even workload distribution
  • Fault-tolerant node management with health monitoring
  • Automatic node failover when nodes become unavailable
  • HTTP-based heartbeat mechanism with Redis-based persistence
  • Event-driven architecture with callbacks for node status changes

Requirements

  • Go 1.24 or higher
  • Redis server

Installation

go get github.com/fleetcontrolsio/pantheon

Usage

Creating a Pantheon Cluster
// Create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a custom HTTP client with timeout
httpClient := &http.Client{
    Timeout: 5 * time.Second,
}

// Create options
options := pantheon.NewOptions().
    WithName("my-cluster").
    WithHeartbeatInterval(10 * time.Second).
    WithHeartbeatTimeout(5 * time.Second).
    WithHTTPClient(httpClient)

// Create Pantheon instance
p, err := pantheon.New(ctx, options)
if err != nil {
    // Handle error
}

// Start pantheon
if err := p.Start(); err != nil {
    // Handle error
}
Adding Nodes to the Cluster
// Add a node to the cluster
err = p.Join(&pantheon.JoinOp{
    ID:      "node-1",
    Address: "http://localhost",
    Port:    8080,
    Path:    "health",
})
if err != nil {
    // Handle error
}
Node Health Monitoring

Pantheon automatically monitors the health of nodes by sending HTTP requests to the specified health endpoint. When a node fails to respond, it is marked as suspect. After multiple failures, it is marked as dead and removed from the available nodes list.

// Listen for node events
go func() {
    for event := range p.EventsCh {
        switch event.Event {
        case "died":
            fmt.Printf("Node %s is considered dead\n", event.NodeID)
        case "revived":
            fmt.Printf("Node %s has recovered\n", event.NodeID)
        }
    }
}()
Distributing Keys with Consistent Hashing
// Distribute some keys to available nodes using consistent hashing
keys := []string{"user:1", "user:2", "user:3", "product:1", "product:2"}
err = p.Distribute(keys)
if err != nil {
    // Handle error
}

// Get keys assigned to a specific node
nodeKeys, err := p.GetNodeKeys("node-1")
if err != nil {
    // Handle error
}
fmt.Printf("Node 1 is responsible for %d keys: %v\n", len(nodeKeys), nodeKeys)

// Find which node is responsible for a specific key
nodeID, err := p.GetKeyNode("user:1")
if err != nil {
    // Handle error
}
fmt.Printf("Key 'user:1' is assigned to node: %s\n", nodeID)

When nodes join or leave the cluster, keys are automatically redistributed using the consistent hashing algorithm, minimizing the number of keys that need to be remapped.

Graceful Shutdown
// Remove a node from the cluster (graceful shutdown)
err = p.Leave("node-1")
if err != nil {
    // Handle error
}

// Destroy the Pantheon instance
if err := p.Destroy(); err != nil {
    // Handle error
}

Architecture

Pantheon uses Redis for persistence, HTTP for communication, and consistent hashing for workload distribution:

  1. Node Management:

    • Nodes register with the Pantheon cluster with a unique ID and health endpoint
    • Pantheon periodically sends HTTP requests to each node's health endpoint
    • Nodes that fail to respond are marked as suspect and eventually dead
    • When node status changes, events are sent through the EventsCh channel
  2. Consistent Hashing:

    • Keys are distributed across available nodes using a consistent hash ring
    • Each physical node gets multiple virtual nodes on the ring for better distribution
    • When nodes are added/removed, only a minimal fraction of keys need to be remapped
    • The hash ring automatically routes around failed nodes
  3. Persistence:

    • Redis stores node information, heartbeat data, and key-to-node mappings
    • Key mappings are stored both as direct lookups and as sets per node
    • Node state (alive, suspect, dead) is tracked for proper failover handling

License

see license

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidHTTPClient = errors.New("http client is required")
View Source
var ErrInvalidHashRing = errors.New("hash ring is required")
View Source
var ErrInvalidHeartbeatConcurrency = errors.New("heartbeat concurrency must be greater than 0")
View Source
var ErrInvalidHeartbeatInterval = errors.New("heartbeat interval must be greater than 0")
View Source
var ErrInvalidHeartbeatMaxFailures = errors.New("heartbeat max failures must be greater than 0")
View Source
var ErrInvalidHeartbeatTimeout = errors.New("heartbeat timeout must be greater than 0")
View Source
var ErrInvalidName = errors.New("name is required")
View Source
var ErrInvalidPrefix = errors.New("prefix is required")

Option validation errors

View Source
var ErrInvalidRedisDB = errors.New("redis db must be greater than or equal to 0")
View Source
var ErrInvalidRedisHost = errors.New("redis host is required")
View Source
var ErrInvalidRedisMaxRetries = errors.New("redis max retries must be greater than or equal to 0")
View Source
var ErrInvalidRedisPort = errors.New("redis port is required")
View Source
var ErrInvalidRedisRetryBackoff = errors.New("redis retry backoff must be greater than 0")

Functions

This section is empty.

Types

type ErrNodePropertyNotFound

type ErrNodePropertyNotFound struct {
	// contains filtered or unexported fields
}

ErrNodeNotFound is return when a node property is not found in the storage

func NewErrNodePropertyNotFound

func NewErrNodePropertyNotFound(property string) *ErrNodePropertyNotFound

NewErrNodePropertyNotFound creates a new ErrNodePropertyNotFound

func (*ErrNodePropertyNotFound) Error

func (e *ErrNodePropertyNotFound) Error() string

type HearbeatEvent

type HearbeatEvent struct {
	NodeID string
	// Event; the event that occurred
	Event string
	// Error; the error that occurred
	Error error
}

type JoinOp

type JoinOp struct {
	// ID; the name of the node joining the cluster
	ID string
	// Address; the address of the node joining the cluster
	Address string
	// Port; the port of the node joining the cluster
	Port int
	// Path: the path on the node to make the heartbeat request to
	Path string
}

type Member

type Member struct {
	// ID; the unique identifier for the node
	ID string
	// Address; the address of the node
	Address string
	// Path; the path on the node to make the heartbeat request to
	Path string
	// JoinedAt; the time the node joined the cluster
	JoinedAt string
	// LastHeartbeat; the last time a heartbeat was received from the node
	LastHeartbeat string
	// HearbeatCount; the number of heartbeat requests sent to the node
	HeartbeatCount string
	// HeartbeatFailures; the number of failed heartbeat requests
	HeartbeatFailures string
	// State; the state of the node: alive, dead, or suspect
	State MemberState
}

Member represents a node in the cluster

type MemberState

type MemberState string
const (
	MemberAlive   MemberState = "alive"
	MemberDead    MemberState = "dead"
	MemberSuspect MemberState = "suspect"
)

func (MemberState) MarshalBinary added in v0.1.3

func (m MemberState) MarshalBinary() (data []byte, err error)

MarshalBinary implements the encoding.BinaryMarshaler interface

func (*MemberState) UnmarshalBinary added in v0.1.3

func (m *MemberState) UnmarshalBinary(data []byte) error

UnmarshalBinary implements the encoding.BinaryUnmarshaler interface

type Options

type Options struct {
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions() *Options

NewOptions creates a new Options instance with default values The default values are: - prefix: "pantheon" - name: "my-cluster" - hearbeatInterval: 30 seconds - heartbeatTimeout: 30 seconds - heartbeatConcurrency: 2 - heartbeatMaxFailures: 3 - redisHost: "localhost" - redisPort: 6379 - redisDB: 0 - redisMaxRetries: 5 - redisRetryBackoff: 20 seconds - hashringReplicaCount: 10 - httpClient: nil - hashRing: nil

func (*Options) Validate

func (o *Options) Validate() error

func (*Options) WithHTTPClient

func (o *Options) WithHTTPClient(client *http.Client) *Options

func (*Options) WithHashRing

func (o *Options) WithHashRing(ring hashring.Ring) *Options

func (*Options) WithHashRingReplicaCount

func (o *Options) WithHashRingReplicaCount(count int) *Options

func (*Options) WithHearbeatConcurrency

func (o *Options) WithHearbeatConcurrency(concurrency int) *Options

func (*Options) WithHeartbeatInterval

func (o *Options) WithHeartbeatInterval(interval time.Duration) *Options

func (*Options) WithHeartbeatMaxFailures added in v0.1.1

func (o *Options) WithHeartbeatMaxFailures(maxFailures int) *Options

func (*Options) WithHeartbeatTimeout

func (o *Options) WithHeartbeatTimeout(timeout time.Duration) *Options

func (*Options) WithName

func (o *Options) WithName(name string) *Options

func (*Options) WithPrefix

func (o *Options) WithPrefix(prefix string) *Options

func (*Options) WithRedisDB

func (o *Options) WithRedisDB(db int) *Options

func (*Options) WithRedisHost

func (o *Options) WithRedisHost(host string) *Options

func (*Options) WithRedisMaxRetries

func (o *Options) WithRedisMaxRetries(retries int) *Options

func (*Options) WithRedisPassword

func (o *Options) WithRedisPassword(password string) *Options

func (*Options) WithRedisPort

func (o *Options) WithRedisPort(port int) *Options

func (*Options) WithRedisRetryBackoff

func (o *Options) WithRedisRetryBackoff(interval time.Duration) *Options

type Pantheon

type Pantheon struct {

	// eventsCh; a channel to send cluster events
	EventsCh chan PantheonEvent
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, options *Options) (*Pantheon, error)

New create a new Pantheon instance

func (*Pantheon) Destroy

func (c *Pantheon) Destroy() error

Destroy stops the cluster this should be called when the cluster is no longer needed

func (*Pantheon) Distribute

func (c *Pantheon) Distribute(keys []string) error

Distribute distributes keys to the nodes in the cluster This should be called after a nodes has joined or left the cluster to rebalance the keys to available nodes

func (*Pantheon) GetKeyNode

func (c *Pantheon) GetKeyNode(key string) (string, error)

GetKeyNode returns the node responsible for a specific key

func (*Pantheon) GetNodeHealth

func (c *Pantheon) GetNodeHealth(nodeID string) (MemberState, error)

GetNodeHealth returns the health status of a node

func (*Pantheon) GetNodeKeys

func (c *Pantheon) GetNodeKeys(nodeID string) ([]string, error)

GetNodeKeys retrieves the keys assigned to a node. This is used to determine which keys a node in the cluster is responsible for

func (*Pantheon) Join

func (c *Pantheon) Join(op *JoinOp) error

Join adds a node to the cluster This should be called when a new node is starting up

func (*Pantheon) Leave

func (c *Pantheon) Leave(id string) error

Leave removes a node from the cluster This should called when a node is shutting down

func (*Pantheon) PingNode

func (c *Pantheon) PingNode(nodeID string) error

PingNode forces an immediate heartbeat check for a node

func (*Pantheon) ResetNodeFailures

func (c *Pantheon) ResetNodeFailures(nodeID string) error

ResetNodeFailures resets the heartbeat failure count for a node

func (*Pantheon) Start

func (c *Pantheon) Start() error

Start starts the cluster this should be called before adding nodes to the cluster

type PantheonEvent

type PantheonEvent struct {
	// Event; the name of the event
	// "started" - when the cluster is started
	// "joined" - when a node joins the cluster
	// "left" - when a node leaves the cluster
	// "died" - when a node is considered dead (no heartbeat received/timeout)
	Event string
	// NodeID; the identifier of the node
	NodeID string
}

type RedisClient

type RedisClient interface {
	Ping(ctx context.Context) *redis.StatusCmd
	Del(ctx context.Context, keys ...string) *redis.IntCmd
	HSet(ctx context.Context, key string, fields ...interface{}) *redis.IntCmd
	HGet(ctx context.Context, key, field string) *redis.StringCmd
	HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd
	HIncrBy(ctx context.Context, key, field string, incr int64) *redis.IntCmd
	Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
	// Added for key distribution
	Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
	Get(ctx context.Context, key string) *redis.StringCmd
	SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
	SMembers(ctx context.Context, key string) *redis.StringSliceCmd
}

func NewRedisClient

func NewRedisClient(ctx context.Context, opts *RedisClientOptions) (RedisClient, error)

NewRedisClient creates a new redis client

type RedisClientOptions

type RedisClientOptions struct {
	// The hostname of the redis server/cluster
	Host string
	// The port of the redis server/cluster
	Port int
	// The password for the redis server/cluster
	Password string
	// The database to use in the redis server/cluster
	DB int
	// The maximum number of retries before giving up
	MaxRetries int
	// The maximum time to wait before giving up
	RetryBackOffLimit time.Duration
}

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(prefix string, namespace string, client RedisClient) *Storage

func (*Storage) AddNode

func (s *Storage) AddNode(ctx context.Context, nodeID, address, path string, port int) error

AddNode upserts a node to the cluster The node is identified by its ID. The address and port are used to communicate with the node. The path is the path on the node to make the heartbeat request to. The node is added with the state "alive". The node is added with the current time as the joined_at and last_heartbeat times.

func (*Storage) GetNode

func (s *Storage) GetNode(ctx context.Context, nodeID string) (*Member, error)

GetNode retrieves a node from the cluster

func (*Storage) GetNodes

func (s *Storage) GetNodes(ctx context.Context) ([]Member, error)

GetNodes retrieves all nodes from the cluster

func (*Storage) IncrementHeartbeatFailures

func (s *Storage) IncrementHeartbeatFailures(ctx context.Context, nodeID string) error

func (*Storage) IncrementHeartbeats

func (s *Storage) IncrementHeartbeats(ctx context.Context, nodeID string) error

func (*Storage) RemoveNode

func (s *Storage) RemoveNode(ctx context.Context, nodeID string) error

RemoveNode removes a node from the cluster

func (Storage) ResetHeartbeatFailures

func (s Storage) ResetHeartbeatFailures(ctx context.Context, nodeID string) error

func (*Storage) UpdateNode added in v0.1.6

func (s *Storage) UpdateNode(ctx context.Context, nodeID, address, path string, port int) error

UpdateNode updates the address and path of a node

func (*Storage) UpdateNodeHeartbeat

func (s *Storage) UpdateNodeHeartbeat(ctx context.Context, nodeID string) error

UpdateNodeHeartbeat updates the last heartbeat time for a node

func (*Storage) UpdateNodeState

func (s *Storage) UpdateNodeState(ctx context.Context, nodeID string, state MemberState) error

UpdateNodeState updates the state of a node

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL