dynalock

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2021 License: Apache-2.0 Imports: 12 Imported by: 1

README

dynalock

This is a small K/V library written Go, which uses AWS DynamoDB as the data store.

It supports create, read, update and delete (CRUD) for key/value pairs, and provides locks based on the sync.Lock API.

GitHub Actions status Go Report Card Documentation

What is the problem?

The main problems I am trying to solve in with this package are:

  1. Enable users of the API to store and coordinate work across resources, using multiple lambdas, and containers running in a range of services.
  2. Do this locking and coordination without needing to spin up a cluster using etcd, or consul
  3. Provide a solid and simple locking / storage API which can be used no matter how small your project is.
  4. Try and make this API simple, while also reduce the operations for this service using AWS services.

What sorts of things can this help with?

Some examples of uses for a library like this are:

  1. When using scheduled lambda functions this library will enable you to lock resources before performing actions with it, this could be a payment api or a ECS cluster, either way it is important to ensure only ONE service is performing that task at one time.
  2. When you start using step functions, how can you ensure only one workflow is active and performing some task, like provisioning, without having to worry about parallel executions.

So the key here is storing state, and coordinating changes across workers, or resources.

Why DynamoDB?

DynamoDB is used for locking in a range of Amazon provided APIs and libraries, so I am not the first to do this. see references. This service also satisfy the requirement to be easy to start with as it is just a service.

Cost?

I am currently working on some testing around this, but with a bit of tuning you can keep the read/write load very low. But this is specifically designed as a starting point, while ensuring there is a clear abstraction between the underlying services and your code.

To manage this I would recommend you set alarms for read / write metrics, start with on demand but you will probably want to switch to specific read/write limits for production.

I will be posting some graphs, and analysis of my work as I go to help flesh this out better.

Usage

The main interfaces are as follows, for something more complete see the competing consumers example.

go get -u -v github.com/wolfeidau/dynalock

v1.x Go Documentation

Looking for AWS SDK v2?

I have added a new v2 module which supports https://github.com/aws/aws-sdk-go-v2.

go get -u -v github.com/wolfeidau/dynalock/v2

v2.x Go Documentation

References

Prior work in this space:

This borrows a lot of ideas, tests and a subset of the API from https://github.com/abronan/valkeyrie.

Updates to the original API are based on a great blog post by @davecheney https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis

License

This code is released under the Apache 2.0 license.

Documentation

Overview

Package dynalock provides a locking store powered by AWS DynamoDB.

Index

Examples

Constants

View Source
const (
	// DefaultLockTTL default duration for locks
	DefaultLockTTL = 20 * time.Second
)

Variables

View Source
var (
	// ErrKeyNotFound record not found in the table
	ErrKeyNotFound = errors.New("key not found in table")

	// ErrKeyExists record already exists in table
	ErrKeyExists = errors.New("key already exists in table")

	// ErrKeyModified record has been modified, this probably means someone beat you to the change/lock
	ErrKeyModified = errors.New("key has been modified")

	// ErrLockAcquireCancelled lock acquire was cancelled
	ErrLockAcquireCancelled = errors.New("lock acquire was cancelled")
)
View Source
var (
	// DefaultLockBackOff if locing is unsuccessful then this backoff will be used
	DefaultLockBackOff = 3 * time.Second
)

Functions

func MarshalStruct

func MarshalStruct(in interface{}) (*dynamodb.AttributeValue, error)

MarshalStruct this helper method marshals a struct into an *dynamodb.AttributeValue which contains a map in the format required to provide to WriteWithAttributeValue.

func UnmarshalStruct

func UnmarshalStruct(val *dynamodb.AttributeValue, out interface{}) error

UnmarshalStruct this helper method un-marshals a struct from an *dynamodb.AttributeValue returned by KVPair.AttributeValue.

Types

type Dynalock

type Dynalock struct {
	// contains filtered or unexported fields
}

Dynalock lock store which is backed by AWS DynamoDB

func (*Dynalock) AtomicDelete

func (ddb *Dynalock) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete delete of a single value

This supports two different operations: * if previous is supplied assert it exists with the version supplied * if previous is nil then assert that the key doesn't exist

FIXME: should the second case just return false, nil?

func (*Dynalock) AtomicPut

func (ddb *Dynalock) AtomicPut(key string, options ...WriteOption) (bool, *KVPair, error)

AtomicPut Atomic CAS operation on a single value.

func (*Dynalock) Delete

func (ddb *Dynalock) Delete(key string) error

Delete the value at the specified key

func (*Dynalock) Exists

func (ddb *Dynalock) Exists(key string, options ...ReadOption) (bool, error)

Exists if a Key exists in the store

func (*Dynalock) Get

func (ddb *Dynalock) Get(key string, options ...ReadOption) (*KVPair, error)

Get a value given its key

Example
package main

import (
	"fmt"

	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/dynamodb"
	"github.com/wolfeidau/dynalock"
)

func main() {

	type message struct{ Message string }

	sess := session.Must(session.NewSession())

	dbSvc := dynamodb.New(sess)

	dl := dynalock.New(dbSvc, "testing-locks", "agent")

	kv, _ := dl.Get("agents/123")

	msg := &message{}

	dynalock.UnmarshalStruct(kv.AttributeValue(), msg)

	fmt.Println("Message:", msg.Message)
}
Output:

func (*Dynalock) List

func (ddb *Dynalock) List(prefix string, options ...ReadOption) ([]*KVPair, error)

List the content of a given prefix

func (*Dynalock) NewLock

func (ddb *Dynalock) NewLock(key string, options ...LockOption) (Locker, error)

NewLock has to implemented at the library level since its not supported by DynamoDB

Example
package main

import (
	"time"

	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/dynamodb"
	"github.com/wolfeidau/dynalock"
)

func main() {

	sess := session.Must(session.NewSession())

	dbSvc := dynamodb.New(sess)

	dl := dynalock.New(dbSvc, "testing-locks", "agent")

	lock, _ := dl.NewLock(
		"agents/123",
		dynalock.LockWithTTL(2*time.Second),
		dynalock.LockWithBytes([]byte(`{"agent": "testing"}`)),
	)
	lock.Lock(nil)
	defer lock.Unlock()
}
Output:

func (*Dynalock) Put

func (ddb *Dynalock) Put(key string, options ...WriteOption) error

Put a value at the specified key

Example
package main

import (
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/dynamodb"
	"github.com/wolfeidau/dynalock"
)

func main() {

	sess := session.Must(session.NewSession())

	dbSvc := dynamodb.New(sess)

	dl := dynalock.New(dbSvc, "testing-locks", "agent")

	message := struct{ Message string }{Message: "hello"}

	attrVal, _ := dynalock.MarshalStruct(&message)

	dl.Put(
		"agents/123",
		dynalock.WriteWithAttributeValue(attrVal),
	)

}
Output:

type KVPair

type KVPair struct {
	Partition string `dynamodbav:"id"`
	Key       string `dynamodbav:"name"`
	Version   int64  `dynamodbav:"version"`
	Expires   int64  `dynamodbav:"expires"`
	// contains filtered or unexported fields
}

KVPair represents {Key, Value, Version} tuple, internally this uses a *dynamodb.AttributeValue which can be used to store strings, slices or structs

func (*KVPair) AttributeValue

func (kv *KVPair) AttributeValue() *dynamodb.AttributeValue

AttributeValue return the current dynamodb attribute value, may be nil

func (*KVPair) BytesValue

func (kv *KVPair) BytesValue() []byte

BytesValue use the attribute to return a slice of bytes, a nil will be returned if it is empty or nil

type LockOption

type LockOption func(opts *LockOptions)

LockOption assign various settings to the lock options

func LockWithBytes

func LockWithBytes(val []byte) LockOption

LockWithBytes byte slice to the key which is written when the lock is acquired

func LockWithRenewLock

func LockWithRenewLock(renewLockChan chan struct{}) LockOption

LockWithRenewLock renewal channel to the lock

func LockWithTTL

func LockWithTTL(ttl time.Duration) LockOption

LockWithTTL time to live (TTL) to the key which is written when the lock is acquired

type LockOptions

type LockOptions struct {
	// contains filtered or unexported fields
}

LockOptions contains optional request parameters

func NewLockOptions

func NewLockOptions(opts ...LockOption) *LockOptions

NewLockOptions create lock options, assign defaults then accept overrides

type Locker

type Locker interface {
	// Lock attempt to lock the store record, this will BLOCK and retry at a rate of once every 3 seconds
	Lock(stopChan chan struct{}) (<-chan struct{}, error)

	// Lock attempt to lock the store record, this will BLOCK and retry at a rate of once every 3 seconds
	LockWithContext(ctx context.Context) (<-chan struct{}, error)

	// Unlock this will unlock and perfom a DELETE to remove the store record
	Unlock() error
}

Locker provides locking mechanism on top of the store. Similar to `sync.Lock` except it may return errors.

type ReadOption

type ReadOption func(opts *ReadOptions)

ReadOption assign various settings to the read options

func ReadConsistentDisable

func ReadConsistentDisable() ReadOption

ReadConsistentDisable disable consistent reads

type ReadOptions

type ReadOptions struct {
	// contains filtered or unexported fields
}

ReadOptions contains optional request parameters

func NewReadOptions

func NewReadOptions(opts ...ReadOption) *ReadOptions

NewReadOptions create read options, assign defaults then accept overrides enable the read consistent flag by default

type Store

type Store interface {
	// Put a value at the specified key
	Put(key string, options ...WriteOption) error

	// Get a value given its key
	Get(key string, options ...ReadOption) (*KVPair, error)

	// List the content of a given prefix
	List(prefix string, options ...ReadOption) ([]*KVPair, error)

	// Delete the value at the specified key
	Delete(key string) error

	// Verify if a Key exists in the store
	Exists(key string, options ...ReadOption) (bool, error)

	// NewLock creates a lock for a given key.
	// The returned Locker is not held and must be acquired
	// with `.Lock`. The Value is optional.
	NewLock(key string, options ...LockOption) (Locker, error)

	// Atomic CAS operation on a single value.
	// Pass previous = nil to create a new key.
	// Pass previous = kv to update an existing value.
	AtomicPut(key string, options ...WriteOption) (bool, *KVPair, error)

	// Atomic delete of a single value
	AtomicDelete(key string, previous *KVPair) (bool, error)
}

Store represents the backend K/V storage

func New

func New(dynamoSvc dynamodbiface.DynamoDBAPI, tableName, partition string) Store

New construct a DynamoDB backed locking store

func NewWithDefaults added in v1.2.0

func NewWithDefaults(tableName, partition string) Store

NewWithDefaults construct a DynamoDB backed locking store with default session / service

type WriteOption

type WriteOption func(opts *WriteOptions)

WriteOption assign various settings to the write options

func WriteWithAttributeValue

func WriteWithAttributeValue(av *dynamodb.AttributeValue) WriteOption

WriteWithAttributeValue dynamodb attribute value which is written

func WriteWithBytes

func WriteWithBytes(val []byte) WriteOption

WriteWithBytes byte slice to the key which is written

func WriteWithNoExpires added in v1.1.0

func WriteWithNoExpires() WriteOption

WriteWithNoExpires time to live (TTL) is set not set so it never expires

func WriteWithPreviousKV

func WriteWithPreviousKV(previous *KVPair) WriteOption

WriteWithPreviousKV previous KV which will be checked prior to update

func WriteWithTTL

func WriteWithTTL(ttl time.Duration) WriteOption

WriteWithTTL time to live (TTL) to the key which is written

type WriteOptions

type WriteOptions struct {
	// contains filtered or unexported fields
}

WriteOptions contains optional request parameters

func NewWriteOptions

func NewWriteOptions(opts ...WriteOption) *WriteOptions

NewWriteOptions create write options, assign defaults then accept overrides

Directories

Path Synopsis
examples
competing-consumers
We have a table containing some entries, for each one we need to lock, do some work, then unlock the entry.
We have a table containing some entries, for each one we need to lock, do some work, then unlock the entry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL