client

package
v1.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package client provides a Go client for the dflockd distributed lock server.

Index

Constants

View Source
const DefaultDialTimeout = 10 * time.Second

DefaultDialTimeout is the default timeout for establishing a TCP connection.

View Source
const DefaultHeartbeatInterval = 15 * time.Second

DefaultHeartbeatInterval is the default interval between ping commands sent by SignalConn to keep the server from timing out idle connections.

Variables

View Source
var (
	ErrTimeout       = errors.New("dflockd: timeout")
	ErrMaxLocks      = errors.New("dflockd: max locks reached")
	ErrMaxWaiters    = errors.New("dflockd: max waiters reached")
	ErrServer        = errors.New("dflockd: server error")
	ErrNotQueued     = errors.New("dflockd: not enqueued")
	ErrAlreadyQueued = errors.New("dflockd: already enqueued")
	ErrLimitMismatch = errors.New("dflockd: limit mismatch")
	ErrLeaseExpired  = errors.New("dflockd: lease expired")
	ErrAuth          = errors.New("dflockd: authentication failed")
	ErrDraining      = errors.New("dflockd: server draining")
)

Sentinel errors returned by protocol operations.

Functions

func Acquire

func Acquire(c *Conn, key string, acquireTimeout time.Duration, opts ...Option) (token string, leaseTTL int, err error)

Acquire sends a lock ("l") command. It blocks on the server side until the lock is acquired or acquireTimeout expires. Returns the token, lease TTL in seconds, and any error. Returns ErrTimeout if the server reports a timeout.

func Authenticate added in v1.6.0

func Authenticate(c *Conn, token string) error

Authenticate sends an auth command with the given token. Returns nil on success, ErrAuth if the server rejects the token.

func CRC32Shard

func CRC32Shard(key string, numServers int) int

CRC32Shard returns a shard index using CRC-32 (IEEE). This matches the Python client's zlib.crc32-based stable_hash_shard. Returns 0 if numServers <= 0.

func Emit added in v1.11.0

func Emit(c *Conn, channel, payload string) (int, error)

Emit sends a signal on a channel using a regular (non-SignalConn) connection. Returns the number of listeners that received the signal.

func Enqueue

func Enqueue(c *Conn, key string, opts ...Option) (status, token string, leaseTTL int, err error)

Enqueue sends an enqueue ("e") command. Returns the status ("acquired" or "queued"), and if acquired, the token and lease TTL.

func Release

func Release(c *Conn, key, token string) error

Release sends a release ("r") command for the given key and token.

func Renew

func Renew(c *Conn, key, token string, opts ...Option) (remaining int, err error)

Renew sends a renew ("n") command and returns the remaining lease seconds.

func SemAcquire added in v1.2.0

func SemAcquire(c *Conn, key string, acquireTimeout time.Duration, limit int, opts ...Option) (token string, leaseTTL int, err error)

SemAcquire sends a semaphore acquire ("sl") command. Returns the token, lease TTL in seconds, and any error. Returns ErrTimeout on timeout, ErrLimitMismatch if the limit doesn't match the existing semaphore.

func SemEnqueue added in v1.2.0

func SemEnqueue(c *Conn, key string, limit int, opts ...Option) (status, token string, leaseTTL int, err error)

SemEnqueue sends a semaphore enqueue ("se") command. Returns the status ("acquired" or "queued"), and if acquired, the token and lease TTL.

func SemRelease added in v1.2.0

func SemRelease(c *Conn, key, token string) error

SemRelease sends a semaphore release ("sr") command for the given key and token.

func SemRenew added in v1.2.0

func SemRenew(c *Conn, key, token string, opts ...Option) (remaining int, err error)

SemRenew sends a semaphore renew ("sn") command and returns the remaining lease seconds.

func SemWait added in v1.2.0

func SemWait(c *Conn, key string, waitTimeout time.Duration) (token string, leaseTTL int, err error)

SemWait sends a semaphore wait ("sw") command after a prior SemEnqueue.

func Wait

func Wait(c *Conn, key string, waitTimeout time.Duration) (token string, leaseTTL int, err error)

Wait sends a wait ("w") command after a prior Enqueue. It blocks until the lock is granted or waitTimeout expires. Returns the token, lease TTL, and any error. Returns ErrTimeout on timeout, ErrNotQueued if not enqueued.

Types

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn wraps a TCP connection to a dflockd server, providing a buffered reader for line-oriented protocol communication. Conn is safe for concurrent use; a mutex serializes request/response pairs to prevent interleaved I/O.

func Dial

func Dial(addr string) (*Conn, error)

Dial connects to a dflockd server at the given address (host:port). Uses DefaultDialTimeout and enables TCP keepalive.

func DialTLS added in v1.4.0

func DialTLS(addr string, cfg *tls.Config) (*Conn, error)

DialTLS connects to a dflockd server at the given address using TLS. Uses DefaultDialTimeout and enables TCP keepalive.

func (*Conn) Close

func (c *Conn) Close() error

Close closes the underlying TCP connection. It is safe to call concurrently with sendRecv; the underlying net.Conn.Close is goroutine-safe and will unblock any pending I/O.

type ListenOption added in v1.11.0

type ListenOption func(*listenOptions)

ListenOption configures optional parameters for Listen/Unlisten.

func WithGroup added in v1.11.0

func WithGroup(group string) ListenOption

WithGroup sets the queue group for a Listen or Unlisten call. Within a group, only one member receives each signal via round-robin.

type Lock

type Lock struct {
	Key            string
	AcquireTimeout time.Duration   // default 10s
	LeaseTTL       int             // custom lease TTL in seconds; 0 = server default
	Servers        []string        // e.g. ["127.0.0.1:6388"]
	ShardFunc      ShardFunc       // defaults to CRC32Shard
	RenewRatio     float64         // fraction of lease at which to renew; default 0.5
	RenewJitter    float64         // early-only jitter fraction for renewals; default 0.10
	TLSConfig      *tls.Config     // if non-nil, connect using TLS
	AuthToken      string          // if non-empty, authenticate after connecting
	OnRenewError   func(err error) // optional; called when background lease renewal fails
	// contains filtered or unexported fields
}

Lock provides a high-level interface for acquiring, holding, and releasing a distributed lock, including automatic lease renewal in the background.

func (*Lock) Acquire

func (l *Lock) Acquire(ctx context.Context) (bool, error)

Acquire connects to the server, acquires the lock, and starts a background goroutine to renew the lease. Returns false (with nil error) on timeout. The provided context controls cancellation; if it is cancelled, the connection is closed which unblocks the server-side wait.

func (*Lock) Close

func (r *Lock) Close() error

Close ends the renewal goroutine and closes the connection without sending a release. The server will auto-release if configured to do so. Promoted onto Lock and Semaphore via embedding.

func (*Lock) Enqueue

func (l *Lock) Enqueue(ctx context.Context) (string, error)

Enqueue performs the first phase of two-phase locking. Returns "acquired" or "queued". If acquired, a renewal goroutine is started automatically. The provided context controls cancellation; if cancelled, the connection is closed which unblocks any in-progress server I/O.

func (*Lock) Release

func (l *Lock) Release(ctx context.Context) error

stopRenew cancels the renewal goroutine and waits for it to exit. Must be called with l.mu held; temporarily releases the mutex so the renewal goroutine can complete its tick (which grabs l.mu).

If the goroutine is stuck inside a Renew network call (server hung or network slow), ctx cancellation alone can't unblock it. After a grace period we force-close the underlying conn, which interrupts the Renew I/O with an error; the goroutine then exits normally. Release stops the renewal goroutine, releases the lock on the server, and closes the connection. Cancelling ctx closes the connection to unblock a release round trip that is stuck in network I/O.

func (*Lock) Token

func (r *Lock) Token() string

Token returns the current lock/semaphore token, or "" if not held.

func (*Lock) Wait

func (l *Lock) Wait(ctx context.Context, timeout time.Duration) (bool, error)

Wait performs the second phase of two-phase locking. Must be called after Enqueue returned "queued". Returns false (with nil error) on timeout. On timeout the connection is closed; the caller must call Enqueue again to re-enter the queue.

type Option

type Option func(*options)

Option configures optional parameters for protocol commands.

func WithLeaseTTL

func WithLeaseTTL(seconds int) Option

WithLeaseTTL sets a custom lease TTL (in seconds) for an Acquire or Enqueue call.

type Semaphore added in v1.2.0

type Semaphore struct {
	Key            string
	Limit          int
	AcquireTimeout time.Duration   // default 10s
	LeaseTTL       int             // custom lease TTL in seconds; 0 = server default
	Servers        []string        // e.g. ["127.0.0.1:6388"]
	ShardFunc      ShardFunc       // defaults to CRC32Shard
	RenewRatio     float64         // fraction of lease at which to renew; default 0.5
	RenewJitter    float64         // early-only jitter fraction for renewals; default 0.10
	TLSConfig      *tls.Config     // if non-nil, connect using TLS
	AuthToken      string          // if non-empty, authenticate after connecting
	OnRenewError   func(err error) // optional; called when background lease renewal fails
	// contains filtered or unexported fields
}

Semaphore provides a high-level interface for acquiring, holding, and releasing a distributed semaphore slot, including automatic lease renewal.

func (*Semaphore) Acquire added in v1.2.0

func (s *Semaphore) Acquire(ctx context.Context) (bool, error)

Acquire connects to the server, acquires a semaphore slot, and starts background lease renewal. Returns false (with nil error) on timeout.

func (*Semaphore) Close added in v1.2.0

func (r *Semaphore) Close() error

Close ends the renewal goroutine and closes the connection without sending a release. The server will auto-release if configured to do so. Promoted onto Lock and Semaphore via embedding.

func (*Semaphore) Enqueue added in v1.2.0

func (s *Semaphore) Enqueue(ctx context.Context) (string, error)

Enqueue performs the first phase of two-phase semaphore acquire. The provided context controls cancellation; if cancelled, the connection is closed which unblocks any in-progress server I/O.

func (*Semaphore) Release added in v1.2.0

func (s *Semaphore) Release(ctx context.Context) error

Release stops renewal, releases the semaphore slot, and closes the connection. Cancelling ctx closes the connection to unblock a release round trip that is stuck in network I/O.

func (*Semaphore) Token added in v1.2.0

func (r *Semaphore) Token() string

Token returns the current lock/semaphore token, or "" if not held.

func (*Semaphore) Wait added in v1.2.0

func (s *Semaphore) Wait(ctx context.Context, timeout time.Duration) (bool, error)

Wait performs the second phase of two-phase semaphore acquire. Returns false (with nil error) on timeout. On timeout the connection is closed; the caller must call Enqueue again to re-enter the queue.

type ShardFunc

type ShardFunc func(key string, numServers int) int

ShardFunc maps a key to a server index given the number of servers.

type Signal added in v1.11.0

type Signal struct {
	Channel string
	Payload string
}

Signal represents a received signal from a channel.

type SignalConn added in v1.11.0

type SignalConn struct {
	// contains filtered or unexported fields
}

SignalConn wraps a Conn for signal operations, providing a background reader that separates push signals from command responses.

The sigCh buffer (64) is a soft cap: if the consumer can't keep up and the channel fills, incoming signals are dropped silently and the drop count is exposed via DroppedSignals(). Consumers that must not miss signals should monitor that counter and either scale up consumers or drop the connection (the server will also evict slow consumers via CancelConn when its own WriteCh buffer overflows).

func NewSignalConn added in v1.11.0

func NewSignalConn(c *Conn, opts ...SignalConnOption) *SignalConn

NewSignalConn creates a SignalConn from an existing Conn. It starts a background goroutine that reads lines from the connection and routes "sig ..." push messages to sigCh and command responses to respCh. A heartbeat goroutine sends periodic ping commands to prevent the server from timing out the connection (default: every 15s). Use WithHeartbeatInterval(0) to disable.

func (*SignalConn) Close added in v1.11.0

func (sc *SignalConn) Close() error

Close closes the underlying connection and waits for the read loop to exit.

func (*SignalConn) DroppedSignals added in v1.15.0

func (sc *SignalConn) DroppedSignals() uint64

DroppedSignals returns the total number of signals that were dropped because the Signals() channel was full when they arrived. Monotonically increasing; use it to detect slow-consumer conditions. Zero in the common case.

func (*SignalConn) Emit added in v1.11.0

func (sc *SignalConn) Emit(channel, payload string) (int, error)

Emit sends a signal on a channel (must be literal, no wildcards). Returns the number of listeners that received the signal.

func (*SignalConn) Listen added in v1.11.0

func (sc *SignalConn) Listen(pattern string, opts ...ListenOption) error

Listen subscribes to signals matching the given pattern.

func (*SignalConn) Signals added in v1.11.0

func (sc *SignalConn) Signals() <-chan Signal

Signals returns a read-only channel that receives signals pushed by the server.

func (*SignalConn) Unlisten added in v1.11.0

func (sc *SignalConn) Unlisten(pattern string, opts ...ListenOption) error

Unlisten unsubscribes from signals matching the given pattern.

type SignalConnOption added in v1.14.0

type SignalConnOption func(*SignalConn)

SignalConnOption configures optional parameters for NewSignalConn.

func WithHeartbeatInterval added in v1.14.0

func WithHeartbeatInterval(d time.Duration) SignalConnOption

WithHeartbeatInterval sets the interval between heartbeat ping commands. Set to 0 to disable heartbeats.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL