distlock

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: MIT Imports: 10 Imported by: 0

README

# distlock

go-utils/distlock 提供统一的分布式锁抽象,屏蔽底层实现差异(Redis/Etcd),让业务层只依赖一套接口。

本包已从原有业务项目独立,现为通用分布式锁中间件,适用于多项目/多环境。


1. 设计目标

  • 统一接口:业务只依赖 distlock.Locker / distlock.Lock
  • 可切换后端:Redis 与 Etcd 可按环境切换
  • 生命周期安全:提供 StartRefresh + 幂等 stop(),避免 refresh-after-release 竞态
  • 一致错误语义:锁抢占失败统一映射为 distlock.ErrNotObtained
  • 轻依赖:仅依赖 redis/etcd 官方库,无额外侵入
  • 适合云原生/微服务/多实例场景

2. 安装

go get github.com/tx7do/go-utils/distlock

3. 核心接口

type Locker interface {
    Obtain(ctx context.Context, key string) (Lock, error)
    Close() error
}

type Lock interface {
    Key() string
    Release(ctx context.Context) error
    Refresh(ctx context.Context) error
    StartRefresh(ctx context.Context, onError func(err error)) (stop func())
}

var ErrNotObtained = errors.New("distlock: lock not obtained")

4. Redis 后端

依赖 github.com/bsm/redislock

locker := distlock.New(rdb, distlock.Options{
    TTL:             30 * time.Second,
    MaxRetries:      10,
    RetryDelay:      100 * time.Millisecond,
    RefreshInterval: 10 * time.Second, // 默认 TTL/3
})

5. Etcd 后端

依赖 go.etcd.io/etcd/client/v3/concurrency

locker, err := distlock.NewEtcd([]string{"127.0.0.1:2379"}, distlock.EtcdOptions{})
if err != nil {
    return err
}
defer locker.Close()

6. 推荐用法

lock, err := locker.Obtain(ctx, key)
if err != nil {
    if errors.Is(err, distlock.ErrNotObtained) {
        return nil // 其他节点已持有
    }
    return err
}

stopRefresh := lock.StartRefresh(ctx, func(err error) {
    // 记录日志/告警
})

// ... 业务逻辑 ...

// 顺序建议:先停续期,再释放锁
stopRefresh()
_ = lock.Release(context.Background())

7. 迁移说明

本包原为某业务项目的分布式锁实现,现已独立为 go-utils/distlock 公共库。

  • 代码结构更清晰,接口更通用
  • 推荐所有新项目直接依赖本库
  • 老项目可平滑迁移,接口兼容

8. 注意事项

  • Release 不是幂等保证接口:重复释放可能返回后端错误(按需忽略或记录)
  • StartRefresh 返回的 stop() 是幂等的,可以安全多次调用
  • 长任务务必持有并调用 stop(),避免 goroutine 泄漏
  • 生产环境建议:
    • 合理设置 TTLRefreshInterval
    • onError 接告警(锁丢失通常意味着实例异常或网络分区)

9. License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotObtained = errors.New("distlock: lock not obtained")

ErrNotObtained 当锁已被其他节点持有、无法获取时返回。 调用方用 errors.Is(err, distlock.ErrNotObtained) 判断。

Functions

This section is empty.

Types

type EtcdLocker

type EtcdLocker struct {
	// contains filtered or unexported fields
}

EtcdLocker 基于 etcd mutex 的分布式锁实现。

func (*EtcdLocker) Close

func (l *EtcdLocker) Close() error

func (*EtcdLocker) IsLocked added in v0.0.2

func (l *EtcdLocker) IsLocked(ctx context.Context, key string) (bool, error)

IsLocked 检查 key 是否已被锁定;仅供监控/调试使用,不能替代 Obtain 的原子性保证。

func (*EtcdLocker) Obtain

func (l *EtcdLocker) Obtain(ctx context.Context, key string, opts ...LockOption) (Lock, error)

type EtcdOptions

type EtcdOptions struct {
	// DialTimeout 是 etcd 连接超时;默认 5s。
	DialTimeout time.Duration
	// SessionTTL 是 session 续约租约秒数;默认 5s。
	SessionTTL int
	// SessionTimeout 是创建会话超时;默认 5s。
	SessionTimeout time.Duration
	// 锁超时,超过这个时间未释放锁会被自动释放;默认 0(不自动释放)
	LockTimeout time.Duration
}

EtcdOptions 配置 etcd 分布式锁行为。

type Lock

type Lock interface {
	// Key 返回锁的键名。
	Key() string

	// Release 立即释放锁。
	// 若锁已过期或已被释放,返回底层错误。
	Release(ctx context.Context) error

	// Refresh 以原始 TTL 续期一次。
	Refresh(ctx context.Context) error

	// StartRefresh 启动后台续期协程,每隔配置的 RefreshInterval 调用一次 Refresh。
	//
	//   - onError 在每次续期失败时调用(可为 nil);无论 onError 返回什么,失败后协程自动退出。
	//   - 返回的 stop 函数取消续期协程并阻塞等待其退出;可安全多次调用(幂等)。
	//
	// 典型用法:
	//
	//	stop := lock.StartRefresh(ctx, func(err error) { log.Warn(err) })
	//	defer stop()
	StartRefresh(ctx context.Context, onError func(err error)) (stop func())
}

Lock 代表一把已持有的分布式锁。

type LockOption added in v0.0.2

type LockOption func(*lockConfig)

LockOption 控制锁获取行为的函数选项

func WithBlockWait added in v0.0.2

func WithBlockWait(maxWait time.Duration) LockOption

WithBlockWait 启用阻塞等待模式

func WithRetryDelay added in v0.0.2

func WithRetryDelay(delay time.Duration) LockOption

WithRetryDelay 设置重试退避间隔(仅阻塞模式生效)

type Locker

type Locker interface {
	// Obtain 尝试获取 key 对应的锁。
	//   - 成功:返回 Lock,调用方必须在完成后调用 Release。
	//   - 锁已被持有:返回 ErrNotObtained(可用 errors.Is 判断)。
	//   - 其他错误:返回底层错误。
	Obtain(ctx context.Context, key string, opts ...LockOption) (Lock, error)

	// Close 关闭后端资源;不再使用时调用。
	Close() error

	// IsLocked 检查 key 是否已被锁定;仅供监控/调试使用,不能替代 Obtain 的原子性保证。
	IsLocked(ctx context.Context, key string) (bool, error)
}

Locker 抽象分布式锁后端(Redis/Etcd 等),供上层业务统一依赖。

func NewEtcd

func NewEtcd(endpoints []string, opts EtcdOptions) (Locker, error)

NewEtcd 创建基于 etcd concurrency.Session + Mutex 的锁实现。

func NewEtcdWithClient

func NewEtcdWithClient(cli *clientv3.Client, opts EtcdOptions) Locker

NewEtcdWithClient 使用已有 etcd client 创建 locker(便于复用连接)。

func NewRedisLocker

func NewRedisLocker(rdb *redis.Client, opts Options) Locker

NewRedisLocker 用给定的 Redis 客户端创建 Locker。 opts 零值字段自动补全默认值。

type Options

type Options struct {
	// TTL 是锁的有效期;默认 30s。
	TTL time.Duration

	// MaxRetries 是争抢锁时的最大重试次数;默认 10 次。
	MaxRetries int

	// RetryDelay 是线性退避的步长;默认 100ms。
	RetryDelay time.Duration

	// RefreshInterval 是 StartRefresh 的续期间隔;默认 TTL/3。
	RefreshInterval time.Duration
}

Options 配置锁的获取与续期行为。零值字段自动使用内置默认值。

type RedisLocker

type RedisLocker struct {
	// contains filtered or unexported fields
}

RedisLocker 是基于 redislock 的分布式锁实现。

func (*RedisLocker) Close

func (l *RedisLocker) Close() error

Close 对 redislock 为 no-op,保留接口一致性。

func (*RedisLocker) IsLocked added in v0.0.2

func (l *RedisLocker) IsLocked(ctx context.Context, key string) (bool, error)

IsLocked 检查 key 是否已被锁定;仅供监控/调试使用,不能替代 Obtain 的原子性保证。

func (*RedisLocker) Obtain

func (l *RedisLocker) Obtain(ctx context.Context, key string, opts ...LockOption) (Lock, error)

Obtain 尝试获取 key 对应的锁。

  • 成功:返回 Lock,调用方必须在完成后调用 Release。
  • 锁已被持有:返回 ErrNotObtained(可用 errors.Is 判断)。
  • 其他错误:返回底层 Redis 错误。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL