etcd

package module
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2024 License: MIT Imports: 14 Imported by: 0

README

etcd client

包:"github.com/farseer-go/etcd"

模块:etcd.Module

go-version Build

概述

etcd是比较流行的分布式组件之一,另外还有Zookeeper、Eureka、Nacos、Consul

常用于在分布式平台中的服务注册与发现的场景。

它有部署简单、使用方便、轻量等特性

在GO阵营里面,一般会优先选择etcd,因为它是采用GO语言编写的。

farseer-go/etcd可以让应用系统更加优雅的使用etcd:

  • 配置自动化:通过配置,可以很方便的快速接入ETCD服务
  • 容器化操作:使用IOC,我们可以很方便就能取到client。
  • 优雅的使用:将常用的操作,整合到几个方法中,更加简单使用。

配置

./farseer.yaml

Etcd:
  default1: "Server=127.0.0.1:2379|127.0.0.1:2379,DialTimeout=5000,Username=test,Password=test"
  default2: "Server=127.0.0.1:2379|127.0.0.1:2379,DialTimeout=5000,Username=test,Password=test"

配置文件支持同时配置多个不同服务端、不同的交换器设置。default1、default2是自定义的名称,同时也是IOC的别名

配置说明:

type etcdConfig struct {
	Server               string // 服务端地址
	DialTimeout          int    // 连接超时时间(ms)
	DialKeepAliveTime    int    // 对服务器进行ping的时间(ms)
	DialKeepAliveTimeout int    // 客户端等待响应的超时时间(ms)
	MaxCallSendMsgSize   int    // 客户端的请求发送限制,单位是字节。0,则默认为2.0 MiB(2 * 1024 * 1024)。
	MaxCallRecvMsgSize   int    // 客户端的响应接收限制,单位是字节。0,则默认不限制
	Username             string // 用户名
	Password             string // 密码
	RejectOldCluster     bool   // 拒绝过时的集群创建客户端。
	PermitWithoutStream  bool   // 允许客户端在没有任何活动流(RPC)的情况下向服务器发送keepalive pings。
}

配置的属性之间用,隔开组合成一个字符串,将被解析成etcdConfig对象。

Put

保存KV

client := container.Resolve[etcd.IClient]("default1") // 取出default1的配置服务端
putRsp, err := client.Put("/test/a1", "1")

通过container.Resolve容器取出etcd.IClient接口的实现。

参数值default1,是在./farseer.yaml中配置节点,意味着使用default1的配置服务端

Get

可以支持按KEY完整匹配,或者按KEY的前缀匹配。

// 根据KEY完整切尔西
client := container.Resolve[etcd.IClient]("default1")
result, err := client.Get("/test/a1")
flog.Info(result.Value) // print:1

// 根据KEY前缀匹配
results, err := client.GetPrefixKey("/test")
flog.Info(results["/test/a1"].Value)    // print:1

Exists

判断KEY是否存在

client := container.Resolve[etcd.IClient]("default1")
client.Exists("/test/a1")

Delete

删除KEY

client := container.Resolve[etcd.IClient]("default1")
_, _ = client.Delete("/test/a1")

Watch

监控指定的KEY(即使KEY还没有创建也可以先监控起来)

当这个KEY有任何的变化时,我们都可以拿到最新的数据状态

client := container.Resolve[etcd.IClient]("default1")
ctx, cancelFunc := context.WithCancel(context.Background())
// 指定KEY
client.Watch(ctx, "/test/a1", func(event etcd.WatchEvent) {
    flog.Info(event.Kv.Value)   // value
    flog.Info(event.IsModify())
    flog.Info(event.IsCreate())
})

// 指定KEY前缀
client.WatchPrefixKey(ctx, "/test/", func(event etcd.WatchEvent) {
    flog.Info(event.Kv.Value)   // value
    flog.Info(event.IsModify())
    flog.Info(event.IsCreate())
})

Lock

分布式锁

client := container.Resolve[etcd.IClient]("default1")

unLock1, _ := client.Lock("/lock/1", 3)
flog.Info("上锁:unLock1")

go func() {
    time.Sleep(1000 * time.Millisecond)
    flog.Info("解锁:unLock1")
    unLock1()
}()

unLock2, _ := client.Lock("/lock/1", 3)
flog.Info("上锁:unLock2")

unLock2()
flog.Info("解锁:unLock2")

打印结果:

2023-01-22 01:44:33 [Info] 上锁:unLock1
2023-01-22 01:44:34 [Info] 解锁:unLock1
2023-01-22 01:44:34 [Info] 上锁:unLock2
2023-01-22 01:44:34 [Info] 解锁:unLock2

同一时刻同一个KEY,只能有一个客户端能上锁成功。使用完后,需调用unLock()解锁

租约

client := container.Resolve[etcd.IClient]("default1")

// 先创建一个保持10秒的租约,拿到租约ID
leaseID, _ := client.LeaseGrant(10)

// 在给KV保存的时候,带上这个leaseID
_, _ = client.PutLease("/test/lease1", "1", leaseID)

此后这个租约将在11秒后过期。(TTL + 1 秒)

自动持续续约:

ctx, cancelFunc := context.WithCancel(context.Background())
_ = client.LeaseKeepAlive(ctx, leaseID)

此时,KEY:/test/lease1,将会自动续约,不再需要我们手动去执行续约操作。

只续约一次:

_ = client.LeaseKeepAliveOnce(leaseID)

此时,只会续约一次,每次续约为10秒(创建租约时传了10)

使用原生客户端

有时候我们需要原生的client执行更多操作时,可以使用Original方法

client := container.Resolve[etcd.IClient]("default")
client.Original()

将返回:etcdV3.Client对象

最后

以上是常用的方法。完整的方法列表,可以查看:etcd.IClient接口

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Header struct {
	// 集群ID
	ClusterId uint64
	// 处理本次请求的节点ID
	MemberId uint64
	// 集群的版本号(该集群的任何KEY有变化时,版本号都会增加)
	Revision int64
	// raft_term is the raft term when the request was applied.
	RaftTerm uint64
}

func (*Header) String

func (receiver *Header) String() string

type IClient

type IClient interface {
	// Close 关闭客户端
	Close()
	// Put 保存KV
	Put(key, value string) (*Header, error)
	// PutLease 保存KV,同时赋加租约
	PutLease(key, value string, leaseId LeaseID) (*Header, error)
	// PutJson 保存KV(data转成json)
	PutJson(key string, data any) (*Header, error)
	// PutJsonLease 保存KV(data转成json),同时赋加租约
	PutJsonLease(key string, data any, leaseId LeaseID) (*Header, error)
	// Get 获取Value值
	Get(key string) (*KeyValue, error)
	// GetPrefixKey 根据KEY前缀获取Value值
	GetPrefixKey(prefixKey string) (map[string]*KeyValue, error)
	// Delete 删除KEY
	Delete(key string) (*Header, error)
	// DeletePrefixKey 根据KEY前缀来删除
	DeletePrefixKey(prefixKey string) (*Header, error)
	// Exists 判断是否存在KEY
	Exists(key string) bool
	// Watch 监听KEY
	Watch(ctx context.Context, key string, watchFunc func(event WatchEvent))
	// WatchPrefixKey 根据KEY前缀来监听
	WatchPrefixKey(ctx context.Context, prefixKey string, watchFunc func(event WatchEvent))
	// LeaseGrant 创建租约,ttl:租约的时间(单位s),keys:要赋加租约的KEY
	LeaseGrant(ttl int64, keys ...string) (LeaseID, error)
	// LeaseKeepAlive 续租(持续)
	LeaseKeepAlive(ctx context.Context, leaseId LeaseID) error
	// LeaseKeepAliveOnce 续租一次
	LeaseKeepAliveOnce(leaseId LeaseID) error
	// LeaseRevoke 删除租约(会使当前租约的所关联的key-value失效)
	LeaseRevoke(leaseId LeaseID) (*Header, error)
	// LeaseInfo 查询租约信息
	LeaseInfo(leaseId LeaseID) (*LeaseInfo, error)
	// Lock 添加锁
	Lock(lockKey string, lockTTL int) (UnLock, error)
	// Original 原客户端对象
	Original() *etcdClient
}

type KeyValue

type KeyValue struct {
	Header *Header
	// key 保存到etcd的KEY
	Key string
	// 创建KEY时的集群Revision(此后这个版本号不会再变)
	CreateRevision int64
	// 修改改这个 key 时的集群Revision(每次修改都会重新获取最新的集群Revision)
	ModRevision int64
	// 对KEY的任何修改,都会导致版本号增加(默认为1)
	Version int64
	// value
	Value string
	// 租约ID
	Lease int64
}

KeyValue KV信息

func (*KeyValue) Exists

func (receiver *KeyValue) Exists() bool

Exists 是否有值

type LeaseID

type LeaseID int64

LeaseID 租约ID

type LeaseInfo

type LeaseInfo struct {
	Header     *Header
	ID         LeaseID  // 租约ID
	TTL        int64    // TTL是租约的剩余TTL(秒);租约将在TTL+1秒下过期。过期的租约将返回-1。
	GrantedTTL int64    // 创建租约时设定的时间(单位s)
	Keys       []string // 租约关联的KEY
}

type Module

type Module struct {
}

func (Module) DependsModule

func (module Module) DependsModule() []modules.FarseerModule

func (Module) Initialize

func (module Module) Initialize()

type UnLock

type UnLock func()

UnLock 解锁

type WatchEvent

type WatchEvent struct {
	Type string    // PUT 或者 DELETE
	Kv   *KeyValue // 最新的KV信息
}

WatchEvent 监听事件

func (*WatchEvent) IsCreate

func (e *WatchEvent) IsCreate() bool

IsCreate 是否为创建事件

func (*WatchEvent) IsModify

func (e *WatchEvent) IsModify() bool

IsModify 是否为修改事件

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL