Documentation
¶
Overview ¶
Package client provides a Go client for the dflockd distributed lock server.
Index ¶
- Constants
- Variables
- func Acquire(c *Conn, key string, acquireTimeout time.Duration, opts ...Option) (token string, leaseTTL int, err error)
- func Authenticate(c *Conn, token string) error
- func CRC32Shard(key string, numServers int) int
- func Emit(c *Conn, channel, payload string) (int, error)
- func Enqueue(c *Conn, key string, opts ...Option) (status, token string, leaseTTL int, err error)
- func Release(c *Conn, key, token string) error
- func Renew(c *Conn, key, token string, opts ...Option) (remaining int, err error)
- func SemAcquire(c *Conn, key string, acquireTimeout time.Duration, limit int, opts ...Option) (token string, leaseTTL int, err error)
- func SemEnqueue(c *Conn, key string, limit int, opts ...Option) (status, token string, leaseTTL int, err error)
- func SemRelease(c *Conn, key, token string) error
- func SemRenew(c *Conn, key, token string, opts ...Option) (remaining int, err error)
- func SemWait(c *Conn, key string, waitTimeout time.Duration) (token string, leaseTTL int, err error)
- func Wait(c *Conn, key string, waitTimeout time.Duration) (token string, leaseTTL int, err error)
- type Conn
- type ListenOption
- type Lock
- func (l *Lock) Acquire(ctx context.Context) (bool, error)
- func (r *Lock) Close() error
- func (l *Lock) Enqueue(ctx context.Context) (string, error)
- func (l *Lock) Release(ctx context.Context) error
- func (r *Lock) Token() string
- func (l *Lock) Wait(ctx context.Context, timeout time.Duration) (bool, error)
- type Option
- type Semaphore
- func (s *Semaphore) Acquire(ctx context.Context) (bool, error)
- func (r *Semaphore) Close() error
- func (s *Semaphore) Enqueue(ctx context.Context) (string, error)
- func (s *Semaphore) Release(ctx context.Context) error
- func (r *Semaphore) Token() string
- func (s *Semaphore) Wait(ctx context.Context, timeout time.Duration) (bool, error)
- type ShardFunc
- type Signal
- type SignalConn
- func (sc *SignalConn) Close() error
- func (sc *SignalConn) DroppedSignals() uint64
- func (sc *SignalConn) Emit(channel, payload string) (int, error)
- func (sc *SignalConn) Listen(pattern string, opts ...ListenOption) error
- func (sc *SignalConn) Signals() <-chan Signal
- func (sc *SignalConn) Unlisten(pattern string, opts ...ListenOption) error
- type SignalConnOption
Constants ¶
const DefaultDialTimeout = 10 * time.Second
DefaultDialTimeout is the default timeout for establishing a TCP connection.
const DefaultHeartbeatInterval = 15 * time.Second
DefaultHeartbeatInterval is the default interval between ping commands sent by SignalConn to keep the server from timing out idle connections.
Variables ¶
var ( ErrTimeout = errors.New("dflockd: timeout") ErrMaxLocks = errors.New("dflockd: max locks reached") ErrMaxWaiters = errors.New("dflockd: max waiters reached") ErrServer = errors.New("dflockd: server error") ErrNotQueued = errors.New("dflockd: not enqueued") ErrAlreadyQueued = errors.New("dflockd: already enqueued") ErrLimitMismatch = errors.New("dflockd: limit mismatch") ErrLeaseExpired = errors.New("dflockd: lease expired") ErrAuth = errors.New("dflockd: authentication failed") ErrDraining = errors.New("dflockd: server draining") )
Sentinel errors returned by protocol operations.
Functions ¶
func Acquire ¶
func Acquire(c *Conn, key string, acquireTimeout time.Duration, opts ...Option) (token string, leaseTTL int, err error)
Acquire sends a lock ("l") command. It blocks on the server side until the lock is acquired or acquireTimeout expires. Returns the token, lease TTL in seconds, and any error. Returns ErrTimeout if the server reports a timeout.
func Authenticate ¶ added in v1.6.0
Authenticate sends an auth command with the given token. Returns nil on success, ErrAuth if the server rejects the token.
func CRC32Shard ¶
CRC32Shard returns a shard index using CRC-32 (IEEE). This matches the Python client's zlib.crc32-based stable_hash_shard. Returns 0 if numServers <= 0.
func Emit ¶ added in v1.11.0
Emit sends a signal on a channel using a regular (non-SignalConn) connection. Returns the number of listeners that received the signal.
func Enqueue ¶
Enqueue sends an enqueue ("e") command. Returns the status ("acquired" or "queued"), and if acquired, the token and lease TTL.
func SemAcquire ¶ added in v1.2.0
func SemAcquire(c *Conn, key string, acquireTimeout time.Duration, limit int, opts ...Option) (token string, leaseTTL int, err error)
SemAcquire sends a semaphore acquire ("sl") command. Returns the token, lease TTL in seconds, and any error. Returns ErrTimeout on timeout, ErrLimitMismatch if the limit doesn't match the existing semaphore.
func SemEnqueue ¶ added in v1.2.0
func SemEnqueue(c *Conn, key string, limit int, opts ...Option) (status, token string, leaseTTL int, err error)
SemEnqueue sends a semaphore enqueue ("se") command. Returns the status ("acquired" or "queued"), and if acquired, the token and lease TTL.
func SemRelease ¶ added in v1.2.0
SemRelease sends a semaphore release ("sr") command for the given key and token.
func SemRenew ¶ added in v1.2.0
SemRenew sends a semaphore renew ("sn") command and returns the remaining lease seconds.
func SemWait ¶ added in v1.2.0
func SemWait(c *Conn, key string, waitTimeout time.Duration) (token string, leaseTTL int, err error)
SemWait sends a semaphore wait ("sw") command after a prior SemEnqueue.
Types ¶
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn wraps a TCP connection to a dflockd server, providing a buffered reader for line-oriented protocol communication. Conn is safe for concurrent use; a mutex serializes request/response pairs to prevent interleaved I/O.
func Dial ¶
Dial connects to a dflockd server at the given address (host:port). Uses DefaultDialTimeout and enables TCP keepalive.
type ListenOption ¶ added in v1.11.0
type ListenOption func(*listenOptions)
ListenOption configures optional parameters for Listen/Unlisten.
func WithGroup ¶ added in v1.11.0
func WithGroup(group string) ListenOption
WithGroup sets the queue group for a Listen or Unlisten call. Within a group, only one member receives each signal via round-robin.
type Lock ¶
type Lock struct {
Key string
AcquireTimeout time.Duration // default 10s
LeaseTTL int // custom lease TTL in seconds; 0 = server default
Servers []string // e.g. ["127.0.0.1:6388"]
ShardFunc ShardFunc // defaults to CRC32Shard
RenewRatio float64 // fraction of lease at which to renew; default 0.5
RenewJitter float64 // early-only jitter fraction for renewals; default 0.10
TLSConfig *tls.Config // if non-nil, connect using TLS
AuthToken string // if non-empty, authenticate after connecting
OnRenewError func(err error) // optional; called when background lease renewal fails
// contains filtered or unexported fields
}
Lock provides a high-level interface for acquiring, holding, and releasing a distributed lock, including automatic lease renewal in the background.
func (*Lock) Acquire ¶
Acquire connects to the server, acquires the lock, and starts a background goroutine to renew the lease. Returns false (with nil error) on timeout. The provided context controls cancellation; if it is cancelled, the connection is closed which unblocks the server-side wait.
func (*Lock) Close ¶
func (r *Lock) Close() error
Close ends the renewal goroutine and closes the connection without sending a release. The server will auto-release if configured to do so. Promoted onto Lock and Semaphore via embedding.
func (*Lock) Enqueue ¶
Enqueue performs the first phase of two-phase locking. Returns "acquired" or "queued". If acquired, a renewal goroutine is started automatically. The provided context controls cancellation; if cancelled, the connection is closed which unblocks any in-progress server I/O.
func (*Lock) Release ¶
stopRenew cancels the renewal goroutine and waits for it to exit. Must be called with l.mu held; temporarily releases the mutex so the renewal goroutine can complete its tick (which grabs l.mu).
If the goroutine is stuck inside a Renew network call (server hung or network slow), ctx cancellation alone can't unblock it. After a grace period we force-close the underlying conn, which interrupts the Renew I/O with an error; the goroutine then exits normally. Release stops the renewal goroutine, releases the lock on the server, and closes the connection. Cancelling ctx closes the connection to unblock a release round trip that is stuck in network I/O.
func (*Lock) Token ¶
func (r *Lock) Token() string
Token returns the current lock/semaphore token, or "" if not held.
type Option ¶
type Option func(*options)
Option configures optional parameters for protocol commands.
func WithLeaseTTL ¶
WithLeaseTTL sets a custom lease TTL (in seconds) for an Acquire or Enqueue call.
type Semaphore ¶ added in v1.2.0
type Semaphore struct {
Key string
Limit int
AcquireTimeout time.Duration // default 10s
LeaseTTL int // custom lease TTL in seconds; 0 = server default
Servers []string // e.g. ["127.0.0.1:6388"]
ShardFunc ShardFunc // defaults to CRC32Shard
RenewRatio float64 // fraction of lease at which to renew; default 0.5
RenewJitter float64 // early-only jitter fraction for renewals; default 0.10
TLSConfig *tls.Config // if non-nil, connect using TLS
AuthToken string // if non-empty, authenticate after connecting
OnRenewError func(err error) // optional; called when background lease renewal fails
// contains filtered or unexported fields
}
Semaphore provides a high-level interface for acquiring, holding, and releasing a distributed semaphore slot, including automatic lease renewal.
func (*Semaphore) Acquire ¶ added in v1.2.0
Acquire connects to the server, acquires a semaphore slot, and starts background lease renewal. Returns false (with nil error) on timeout.
func (*Semaphore) Close ¶ added in v1.2.0
func (r *Semaphore) Close() error
Close ends the renewal goroutine and closes the connection without sending a release. The server will auto-release if configured to do so. Promoted onto Lock and Semaphore via embedding.
func (*Semaphore) Enqueue ¶ added in v1.2.0
Enqueue performs the first phase of two-phase semaphore acquire. The provided context controls cancellation; if cancelled, the connection is closed which unblocks any in-progress server I/O.
func (*Semaphore) Release ¶ added in v1.2.0
Release stops renewal, releases the semaphore slot, and closes the connection. Cancelling ctx closes the connection to unblock a release round trip that is stuck in network I/O.
func (*Semaphore) Token ¶ added in v1.2.0
func (r *Semaphore) Token() string
Token returns the current lock/semaphore token, or "" if not held.
type SignalConn ¶ added in v1.11.0
type SignalConn struct {
// contains filtered or unexported fields
}
SignalConn wraps a Conn for signal operations, providing a background reader that separates push signals from command responses.
The sigCh buffer (64) is a soft cap: if the consumer can't keep up and the channel fills, incoming signals are dropped silently and the drop count is exposed via DroppedSignals(). Consumers that must not miss signals should monitor that counter and either scale up consumers or drop the connection (the server will also evict slow consumers via CancelConn when its own WriteCh buffer overflows).
func NewSignalConn ¶ added in v1.11.0
func NewSignalConn(c *Conn, opts ...SignalConnOption) *SignalConn
NewSignalConn creates a SignalConn from an existing Conn. It starts a background goroutine that reads lines from the connection and routes "sig ..." push messages to sigCh and command responses to respCh. A heartbeat goroutine sends periodic ping commands to prevent the server from timing out the connection (default: every 15s). Use WithHeartbeatInterval(0) to disable.
func (*SignalConn) Close ¶ added in v1.11.0
func (sc *SignalConn) Close() error
Close closes the underlying connection and waits for the read loop to exit.
func (*SignalConn) DroppedSignals ¶ added in v1.15.0
func (sc *SignalConn) DroppedSignals() uint64
DroppedSignals returns the total number of signals that were dropped because the Signals() channel was full when they arrived. Monotonically increasing; use it to detect slow-consumer conditions. Zero in the common case.
func (*SignalConn) Emit ¶ added in v1.11.0
func (sc *SignalConn) Emit(channel, payload string) (int, error)
Emit sends a signal on a channel (must be literal, no wildcards). Returns the number of listeners that received the signal.
func (*SignalConn) Listen ¶ added in v1.11.0
func (sc *SignalConn) Listen(pattern string, opts ...ListenOption) error
Listen subscribes to signals matching the given pattern.
func (*SignalConn) Signals ¶ added in v1.11.0
func (sc *SignalConn) Signals() <-chan Signal
Signals returns a read-only channel that receives signals pushed by the server.
func (*SignalConn) Unlisten ¶ added in v1.11.0
func (sc *SignalConn) Unlisten(pattern string, opts ...ListenOption) error
Unlisten unsubscribes from signals matching the given pattern.
type SignalConnOption ¶ added in v1.14.0
type SignalConnOption func(*SignalConn)
SignalConnOption configures optional parameters for NewSignalConn.
func WithHeartbeatInterval ¶ added in v1.14.0
func WithHeartbeatInterval(d time.Duration) SignalConnOption
WithHeartbeatInterval sets the interval between heartbeat ping commands. Set to 0 to disable heartbeats.