Documentation ¶
Index ¶
- Variables
- func DeleteResult(resp *clientv3.DeleteResponse, err error) (int64, error)
- func GetResult(resp *clientv3.GetResponse, err error) (map[string]string, error)
- func GetResultCount(resp *clientv3.GetResponse, err error) (int64, error)
- func GetResultInt(resp *clientv3.GetResponse, err error) (int, error)
- func GetResultInts(resp *clientv3.GetResponse, err error) ([]int, error)
- func GetResultString(resp *clientv3.GetResponse, err error) (string, error)
- func GetResultStrings(resp *clientv3.GetResponse, err error) ([]string, error)
- func IsErrLockNotAcquired(err error) bool
- func LeaseGrantResult(resp *clientv3.LeaseGrantResponse, err error) (clientv3.LeaseID, error)
- func LeaseRevokeResult(_ *clientv3.LeaseRevokeResponse, err error) error
- func LeaseTimeToLiveResult(resp *clientv3.LeaseTimeToLiveResponse, err error) (int64, error)
- func PutResult(_ *clientv3.PutResponse, err error) error
- func TxnResult(_ *clientv3.TxnResponse, err error) error
- type ClientOption
- func WithAuth(username, password string) ClientOption
- func WithCACertPath(caCertPath string) ClientOption
- func WithEndpoints(endpoints []string) ClientOption
- func WithTLSCertPath(tlsCertPath string) ClientOption
- func WithTLSKeyPath(tlsKeyPath string) ClientOption
- func WithTimeout(timeout int) ClientOption
- type ClientProxy
- type LeaseProxy
- type LockerProxy
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrLockNotAcquired lock not acquired ErrLockNotAcquired = errors.New("lock not acquired") )
var ( // ErrNil result nil error ErrNil = errors.New("result nil") )
Functions ¶
func DeleteResult ¶
func DeleteResult(resp *clientv3.DeleteResponse, err error) (int64, error)
DeleteResult is a helper that return the deleted raw number of etcd delete operation
func GetResult ¶
GetResult is a helper that converts the response of an etcd get operation to a mapa[string]string
func GetResultCount ¶
func GetResultCount(resp *clientv3.GetResponse, err error) (int64, error)
GetResultCount is a helper that return the count in the response of an etcd get operation
func GetResultInt ¶
func GetResultInt(resp *clientv3.GetResponse, err error) (int, error)
GetResultInt is a helper that converts the first value in the response of an etcd get operation to a int
Return ErrNil when response count euqal 0.
func GetResultInts ¶
func GetResultInts(resp *clientv3.GetResponse, err error) ([]int, error)
GetResultInts is a helper that converts the first value in the response of an etcd get operation to a []int
Return ErrNil when response count euqal 0.
func GetResultString ¶
func GetResultString(resp *clientv3.GetResponse, err error) (string, error)
GetResultString is a helper that converts the first value in the response of an etcd get operation to a string
Return ErrNil when response count euqal 0.
func GetResultStrings ¶
func GetResultStrings(resp *clientv3.GetResponse, err error) ([]string, error)
GetResultStrings is a helper that converts the first value in the response of an etcd get operation to a []string
Return ErrNil when response count euqal 0.
func IsErrLockNotAcquired ¶
IsErrLockNotAcquired is lock not acquired error
func LeaseGrantResult ¶
LeaseGrantResult is a helper that return lease id in the response of an etcd lease grant operation
func LeaseRevokeResult ¶
func LeaseRevokeResult(_ *clientv3.LeaseRevokeResponse, err error) error
LeaseRevokeResult is a helper that converts etcd lease revoke operation response to an error
func LeaseTimeToLiveResult ¶
func LeaseTimeToLiveResult(resp *clientv3.LeaseTimeToLiveResponse, err error) (int64, error)
LeaseTimeToLiveResult is a helper that return lease ttl in the response of an etcd lease grant operation
Types ¶
type ClientOption ¶
type ClientOption func(*clientConfig)
ClientOption etcd client proxy option
func WithAuth ¶
func WithAuth(username, password string) ClientOption
WithAuth set username and password for authentication
func WithCACertPath ¶
func WithCACertPath(caCertPath string) ClientOption
WithCACertPath set ca cert file path
func WithEndpoints ¶
func WithEndpoints(endpoints []string) ClientOption
WithEndpoints set endpoints
func WithTLSCertPath ¶
func WithTLSCertPath(tlsCertPath string) ClientOption
WithTLSCertPath set tls cert file path
func WithTLSKeyPath ¶
func WithTLSKeyPath(tlsKeyPath string) ClientOption
WithTLSKeyPath set tls key file path
func WithTimeout ¶
func WithTimeout(timeout int) ClientOption
WithTimeout set timeout
default 1000 millisecond
type ClientProxy ¶
type ClientProxy interface { // Put puts a key-value pair into etcd. // Note that key,value can be plain bytes array and string is // an immutable representation of that bytes array. // To get a string of bytes, do string([]byte{0x10, 0x20}). Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) // Get retrieves keys. // By default, Get will return the value for "key", if any. // When passed WithRange(end), Get will return the keys in the range [key, end). // When passed WithFromKey(), Get returns keys greater than or equal to key. // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision; // if the required revision is compacted, the request will fail with ErrCompacted . // When passed WithLimit(limit), the number of returned keys is bounded by limit. // When passed WithSort(), the keys will be sorted. Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) // Delete deletes a key, or optionally using WithRange(end), [key, end). Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) // Watch watches on a key or prefix. The watched events will be returned // through the returned channel. If revisions waiting to be sent over the // watch are compacted, then the watch will be canceled by the server, the // client will post a compacted error watch response, and the channel will close. // If the requested revision is 0 or unspecified, the returned channel will // return watch events that happen after the server receives the watch request. // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed, // and "WatchResponse" from this closed channel has zero events and nil "Err()". // The context "ctx" MUST be canceled, as soon as watcher is no longer being used, // to release the associated resources. // // If the context is "context.Background/TODO", returned "WatchChan" will // not be closed and block until event is triggered, except when server // returns a non-recoverable error (e.g. ErrCompacted). // For example, when context passed with "WithRequireLeader" and the // connected server has no leader (e.g. due to network partition), // error "etcdserver: no leader" (ErrNoLeader) will be returned, // and then "WatchChan" is closed with non-nil "Err()". // In order to prevent a watch stream being stuck in a partitioned node, // make sure to wrap context with "WithRequireLeader". // // Otherwise, as long as the context has not been canceled or timed out, // watch will retry on other recoverable errors forever until reconnected. Watch(ctx context.Context, key string, opts ...clientv3.OpOption) (clientv3.WatchChan, error) // Txn creates a transaction. // // Step 1: // If takes a list of comparison. If all comparisons passed in succeed, // the operations passed into Then() will be executed. Or the operations // passed into Else() will be executed. // // Step 2: // Then takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() succeed. // // Else takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() fail. // // Step 3: // Commit tries to commit the transaction. Txn(ctx context.Context, cmps []clientv3.Cmp, thenOps []clientv3.Op, elseOps []clientv3.Op) (*clientv3.TxnResponse, error) // Lease get etcd lease proxy Lease() LeaseProxy // Locker gets the lock operation proxy for the key prefix. // // It while create an leased session and keep the lease alive until client error // or invork close function. Locker(prefix string, ttl int) (LockerProxy, error) }
ClientProxy etcd client proxy
Example ¶
package main import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "github.com/wwwangxc/gopkg/etcd" ) func main() { // new client proxy cli := etcd.NewClientProxy("etcd1", etcd.WithEndpoints([]string{"127.0.0.1:2379", "127.0.0.1:2380"}), // set endpoints etcd.WithTimeout(3000), // set timeout, unit millisecond, default 1000. etcd.WithAuth("username", "password"), // set username and password for authentication etcd.WithTLSKeyPath("/usr/local/etcd_conf/key.pem"), // set tls key file path. etcd.WithTLSCertPath("/usr/local/etcd_conf/cert.pem"), // set tls cert file path. etcd.WithCACertPath("/usr/local/etcd_conf/cacert.pem"), // set ca cert file path. ) // do etcd put operation and convert result to an error if err := etcd.PutResult(cli.Put(context.Background(), "key", "val")); err != nil { fmt.Printf("put operation fail. error:%v", err) return } // create a lease id, err := etcd.LeaseGrantResult(cli.Lease().Grant(context.Background(), 10)) if err != nil { fmt.Printf("lease grant fail. error:%v", err) return } // put with lease err = etcd.PutResult(cli.Put(context.Background(), "key", "val", clientv3.WithLease(id))) if err != nil { fmt.Printf("put operation fail. error:%v", err) } // do etcd get operation and convert result to map[string]string and an error m, err := etcd.GetResult(cli.Get(context.Background(), "key")) if err != nil { fmt.Printf("get operation fail. error:%v", err) return } for k, v := range m { fmt.Printf("key: %s\n", k) fmt.Printf("val: %s\n", v) } // do etcd delte operation and convert result to delete number and an error delNum, err := etcd.DeleteResult(cli.Delete(context.Background(), "key")) if err != nil { fmt.Printf("delete operation fail. error:%v", err) return } fmt.Printf("delete number: %d\n", delNum) // transaction err = etcd.TxnResult(cli.Txn(context.Background(), []clientv3.Cmp{clientv3.Compare(clientv3.Value("key"), "=", "val")}, // if key's value == val []clientv3.Op{clientv3.OpPut("key", "val1")}, // then put key's value = val1 []clientv3.Op{clientv3.OpPut("key", "val")})) // else put key's value = val if err != nil { fmt.Printf("txn fail. error:%v", err) return } }
Output:
func NewClientProxy ¶
func NewClientProxy(name string, opts ...ClientOption) ClientProxy
NewClientProxy new etcd client proxy
type LeaseProxy ¶
type LeaseProxy interface { // Grant creates a new lease. Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) // Revoke revokes the given lease. Revoke(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error) // TimeToLive retrieves the lease information of the given lease ID. TimeToLive(ctx context.Context, id clientv3.LeaseID, opts ...clientv3.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error) // Leases retrieves all leases. Leases(ctx context.Context) (*clientv3.LeaseLeasesResponse, error) // KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted // to the channel are not consumed promptly the channel may become full. When full, the lease // client will continue sending keep alive requests to the etcd server, but will drop responses // until there is capacity on the channel to send more responses. // // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or // canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error // containing the error reason. // // The returned "LeaseKeepAliveResponse" channel closes if underlying keep // alive stream is interrupted in some way the client cannot handle itself; // given context "ctx" is canceled or timed out. KeepAlive(ctx context.Context, id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) }
LeaseProxy etcd lease proxy
Example ¶
package main import ( "context" "fmt" "time" clientv3 "go.etcd.io/etcd/client/v3" "github.com/wwwangxc/gopkg/etcd" ) func main() { // new lease proxy lease := etcd.NewClientProxy("etcd1").Lease() // create a lease id, err := etcd.LeaseGrantResult(lease.Grant(context.Background(), 10)) if err != nil { fmt.Printf("lease grant fail. error:%v", err) return } fmt.Printf("lease:0x%x\n", id) // revoke a lease err = etcd.LeaseRevokeResult(lease.Revoke(context.Background(), id)) if err != nil { fmt.Printf("lease revoke fail. error:%v", err) return } go func(id clientv3.LeaseID) { for { // get lease ttl ttl, err := etcd.LeaseTimeToLiveResult(lease.TimeToLive(context.Background(), id)) if err != nil { fmt.Printf("get lease ttl fail. error:%v", err) return } if ttl == -1 { break } time.Sleep(time.Second) } fmt.Printf("lease:0x%x expired\n", id) }(id) // keep alive ch, err := lease.KeepAlive(context.Background(), id) if err != nil { fmt.Printf("lease keep alive fail. error:%v", err) return } for { ka := <-ch if ka == nil { fmt.Println("lease timeout") return } fmt.Println("ttl:", ka.TTL) } }
Output:
type LockerProxy ¶
type LockerProxy interface { // LockAndCall try get lock first and call f() when lock acquired. Unlock will be performed // regardless of whether the f reports an error or not. // // Will block the current goroutine when lock not acquired // Will reentrant lock when UUID option not empty. // If Heartbeat option not empty and not a reentrant lock, will automatically // renewal until unlocked. LockAndCall(ctx context.Context, f func() error) error // Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. Lock(ctx context.Context) error // TryLock locks the mutex if not already locked by another session. // If lock is held by another session, return immediately after attempting necessary cleanup // The ctx argument is used for the sending/receiving Txn RPC. // Return 'ErrLockNotAcquired' when lock not acquired. TryLock(ctx context.Context) error // Unlock // Return error if the lock key delete fail. Unlock(ctx context.Context) error // Close orphans the session and revokes the session lease. Close() error }
LockerProxy etcd locker proxy
Example ¶
package main import ( "context" "fmt" "github.com/wwwangxc/gopkg/etcd" ) func main() { lockKeyPrefix := "lock/example/lock" // Gets the lock operation proxy for the key prefix. // It while create an leased session and keep the lease alive until client error // or invork close function. locker, err := etcd.NewClientProxy("etcd1").Locker(lockKeyPrefix, 3) if err != nil { fmt.Printf("get locker proxy fail:%v\n", err) return } defer func() { // Close orphans the session and revokes the session lease. if err := locker.Close(); err != nil { fmt.Printf("locker close fail:%v", err) return } }() // Will block the current goroutine until locked. // If the context is canceled while trying to acquire the lock, the mutex tries to clean its stale lock entry. if err := locker.Lock(context.Background()); err != nil { fmt.Printf("lock fail:%v\n", err) return } // Unlock if err := locker.Unlock(context.Background()); err != nil { fmt.Printf("unlock fail:%v", err) return } // Try lock // Locks the mutex if not already locked by another session. if err = locker.TryLock(context.Background()); err != nil { // return 'ErrLockNotAcquired' when lock not acquired. if etcd.IsErrLockNotAcquired(err) { fmt.Printf("lock not acquired\n") return } fmt.Printf("try lock fail:%v\n", err) return } // Unlock if err := locker.Unlock(context.Background()); err != nil { fmt.Printf("unlock fail:%v", err) return } f := func() error { // do something... return nil } // Try get lock first and call f() when lock acquired. Unlock will be performed // regardless of whether the f reports an error or not. if err := locker.LockAndCall(context.Background(), f); err != nil { fmt.Printf("lock and call fail. error: %v\n", err) return } }
Output: