dynalock

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2020 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Overview

Package dynalock provides a locking store powered by AWS DynamoDB.

Index

Examples

Constants

View Source
const (
	// DefaultLockTTL default duration for locks
	DefaultLockTTL = 20 * time.Second
)

Variables

View Source
var (
	// ErrKeyNotFound record not found in the table
	ErrKeyNotFound = errors.New("key not found in table")

	// ErrKeyExists record already exists in table
	ErrKeyExists = errors.New("key already exists in table")

	// ErrKeyModified record has been modified, this probably means someone beat you to the change/lock
	ErrKeyModified = errors.New("key has been modified")

	// ErrLockAcquireCancelled lock acquire was cancelled
	ErrLockAcquireCancelled = errors.New("lock acquire was cancelled")
)
View Source
var (
	// DefaultLockBackOff if locing is unsuccessful then this backoff will be used
	DefaultLockBackOff = 3 * time.Second
)

Functions

func MarshalStruct

func MarshalStruct(in interface{}) (*dynamodb.AttributeValue, error)

MarshalStruct this helper method marshals a struct into an *dynamodb.AttributeValue which contains a map in the format required to provide to WriteWithAttributeValue.

func UnmarshalStruct

func UnmarshalStruct(val *dynamodb.AttributeValue, out interface{}) error

UnmarshalStruct this helper method un-marshals a struct from an *dynamodb.AttributeValue returned by KVPair.AttributeValue.

Types

type Dynalock

type Dynalock struct {
	// contains filtered or unexported fields
}

Dynalock lock store which is backed by AWS DynamoDB

func (*Dynalock) AtomicDelete

func (ddb *Dynalock) AtomicDelete(ctx context.Context, key string, previous *KVPair) (bool, error)

AtomicDelete delete of a single value

This supports two different operations: * if previous is supplied assert it exists with the version supplied * if previous is nil then assert that the key doesn't exist

FIXME: should the second case just return false, nil?

func (*Dynalock) AtomicPut

func (ddb *Dynalock) AtomicPut(ctx context.Context, key string, options ...WriteOption) (bool, *KVPair, error)

AtomicPut Atomic CAS operation on a single value.

func (*Dynalock) Delete

func (ddb *Dynalock) Delete(ctx context.Context, key string) error

Delete the value at the specified key

func (*Dynalock) Exists

func (ddb *Dynalock) Exists(ctx context.Context, key string, options ...ReadOption) (bool, error)

Exists if a Key exists in the store

func (*Dynalock) Get

func (ddb *Dynalock) Get(ctx context.Context, key string, options ...ReadOption) (*KVPair, error)

Get a value given its key

Example
type message struct{ Message string }

cfg, err := external.LoadDefaultAWSConfig()
if err != nil {
	panic("unable to load SDK config, " + err.Error())
}

dbSvc := dynamodb.New(cfg)
dl := dynalock.New(dbSvc, "testing-locks", "agent")

kv, _ := dl.Get(context.Background(), "agents/123")

msg := &message{}

av := kv.AttributeValue()

dynalock.UnmarshalStruct(av, msg)

fmt.Println("Message:", msg.Message)
Output:

func (*Dynalock) List

func (ddb *Dynalock) List(ctx context.Context, prefix string, options ...ReadOption) ([]*KVPair, error)

List the content of a given prefix

func (*Dynalock) NewLock

func (ddb *Dynalock) NewLock(ctx context.Context, key string, options ...LockOption) (Locker, error)

NewLock has to implemented at the library level since its not supported by DynamoDB

Example
cfg, err := external.LoadDefaultAWSConfig()
if err != nil {
	panic("unable to load SDK config, " + err.Error())
}

dbSvc := dynamodb.New(cfg)

dl := dynalock.New(dbSvc, "testing-locks", "agent")

lock, _ := dl.NewLock(
	context.Background(),
	"agents/123",
	dynalock.LockWithTTL(2*time.Second),
	dynalock.LockWithBytes([]byte(`{"agent": "testing"}`)),
)
lock.Lock(context.Background(), nil)
defer lock.Unlock(context.Background())
Output:

func (*Dynalock) Put

func (ddb *Dynalock) Put(ctx context.Context, key string, options ...WriteOption) error

Put a value at the specified key

Example
cfg, err := external.LoadDefaultAWSConfig()
if err != nil {
	panic("unable to load SDK config, " + err.Error())
}

dbSvc := dynamodb.New(cfg)

dl := dynalock.New(dbSvc, "testing-locks", "agent")

message := struct{ Message string }{Message: "hello"}

attrVal, _ := dynalock.MarshalStruct(&message)

dl.Put(
	context.Background(),
	"agents/123",
	dynalock.WriteWithAttributeValue(attrVal),
)
Output:

type DynamodbLock

type DynamodbLock struct {
	// contains filtered or unexported fields
}

func (*DynamodbLock) Lock

func (l *DynamodbLock) Lock(ctx context.Context, stopChan chan struct{}) (<-chan struct{}, error)

Lock attempt to lock the DynamoDB record, this will BLOCK and retry at a rate of once every 3 seconds

func (*DynamodbLock) Unlock

func (l *DynamodbLock) Unlock(ctx context.Context) error

Unlock this will unlock and perfom a DELETE to remove the DynamoDB record

type KVPair

type KVPair struct {
	Partition string `dynamodbav:"id"`
	Key       string `dynamodbav:"name"`
	Version   int64  `dynamodbav:"version"`
	Expires   int64  `dynamodbav:"expires"`
	// contains filtered or unexported fields
}

KVPair represents {Key, Value, Version} tuple, internally this uses a *dynamodb.AttributeValue which can be used to store strings, slices or structs

func (*KVPair) AttributeValue

func (kv *KVPair) AttributeValue() *dynamodb.AttributeValue

AttributeValue return the current dynamodb attribute value, may be nil

func (*KVPair) BytesValue

func (kv *KVPair) BytesValue() []byte

BytesValue use the attribute to return a slice of bytes, a nil will be returned if it is empty or nil

type LockOption

type LockOption func(opts *LockOptions)

LockOption assign various settings to the lock options

func LockWithBytes

func LockWithBytes(val []byte) LockOption

LockWithBytes byte slice to the key which is written when the lock is acquired

func LockWithRenewLock

func LockWithRenewLock(renewLockChan chan struct{}) LockOption

LockWithRenewLock renewal channel to the lock

func LockWithTTL

func LockWithTTL(ttl time.Duration) LockOption

LockWithTTL time to live (TTL) to the key which is written when the lock is acquired

type LockOptions

type LockOptions struct {
	// contains filtered or unexported fields
}

LockOptions contains optional request parameters

func NewLockOptions

func NewLockOptions(opts ...LockOption) *LockOptions

NewLockOptions create lock options, assign defaults then accept overrides

type Locker

type Locker interface {
	// Lock attempt to lock the store record, this will BLOCK and retry at a rate of once every 3 seconds
	Lock(ctx context.Context, stopChan chan struct{}) (<-chan struct{}, error)

	// Unlock this will unlock and perfom a DELETE to remove the store record
	Unlock(ctx context.Context) error
}

Locker provides locking mechanism on top of the store. Similar to `sync.Lock` except it may return errors.

type ReadOption

type ReadOption func(opts *ReadOptions)

ReadOption assign various settings to the read options

func ReadConsistentDisable

func ReadConsistentDisable() ReadOption

ReadConsistentDisable disable consistent reads

type ReadOptions

type ReadOptions struct {
	// contains filtered or unexported fields
}

ReadOptions contains optional request parameters

func NewReadOptions

func NewReadOptions(opts ...ReadOption) *ReadOptions

NewReadOptions create read options, assign defaults then accept overrides enable the read consistent flag by default

type Store

type Store interface {
	// Put a value at the specified key
	Put(ctx context.Context, key string, options ...WriteOption) error

	// Get a value given its key
	Get(ctx context.Context, key string, options ...ReadOption) (*KVPair, error)

	// List the content of a given prefix
	List(ctx context.Context, prefix string, options ...ReadOption) ([]*KVPair, error)

	// Delete the value at the specified key
	Delete(ctx context.Context, key string) error

	// Verify if a Key exists in the store
	Exists(ctx context.Context, key string, options ...ReadOption) (bool, error)

	// NewLock creates a lock for a given key.
	// The returned Locker is not held and must be acquired
	// with `.Lock`. The Value is optional.
	NewLock(ctx context.Context, key string, options ...LockOption) (Locker, error)

	// Atomic CAS operation on a single value.
	// Pass previous = nil to create a new key.
	// Pass previous = kv to update an existing value.
	AtomicPut(ctx context.Context, key string, options ...WriteOption) (bool, *KVPair, error)

	// Atomic delete of a single value
	AtomicDelete(ctx context.Context, key string, previous *KVPair) (bool, error)
}

Store represents the backend K/V storage

func New

func New(dynamoSvc dynamodbiface.ClientAPI, tableName, partition string) Store

New construct a DynamoDB backed locking store

type WriteOption

type WriteOption func(opts *WriteOptions)

WriteOption assign various settings to the write options

func WriteWithAttributeValue

func WriteWithAttributeValue(av *dynamodb.AttributeValue) WriteOption

WriteWithAttributeValue dynamodb attribute value which is written

func WriteWithBytes

func WriteWithBytes(val []byte) WriteOption

WriteWithBytes byte slice to the key which is written

func WriteWithNoExpires

func WriteWithNoExpires() WriteOption

WriteWithNoExpires time to live (TTL) is set not set so it never expires

func WriteWithPreviousKV

func WriteWithPreviousKV(previous *KVPair) WriteOption

WriteWithPreviousKV previous KV which will be checked prior to update

func WriteWithTTL

func WriteWithTTL(ttl time.Duration) WriteOption

WriteWithTTL time to live (TTL) to the key which is written

type WriteOptions

type WriteOptions struct {
	// contains filtered or unexported fields
}

WriteOptions contains optional request parameters

func NewWriteOptions

func NewWriteOptions(opts ...WriteOption) *WriteOptions

NewWriteOptions create write options, assign defaults then accept overrides

Directories

Path Synopsis
examples
competing-consumers
We have a table containing some entries, for each one we need to lock, do some work, then unlock the entry.
We have a table containing some entries, for each one we need to lock, do some work, then unlock the entry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL