kv

package
v0.1.44 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2024 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package kv turns your S3 bucket into a diffable serverless key-value store.

Requirements

- go1.14+

- S3-compatible storage, like minio, Wasabi, or AWS

Example
package main

import (
	"context"
	"fmt"
	"net/http/httptest"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/johannesboyne/gofakes3"
	"github.com/johannesboyne/gofakes3/backend/s3mem"
	"github.com/jrhy/mast"
	"github.com/jrhy/s3db/kv"
)

func s3Config(endpoint string) *aws.Config {
	return &aws.Config{
		Credentials: credentials.NewStaticCredentials(
			"TEST-ACCESSKEYID",
			"TEST-SECRETACCESSKEY",
			"",
		),
		Region:           aws.String("ca-west-1"),
		Endpoint:         &endpoint,
		S3ForcePathStyle: aws.Bool(true),
	}
}

func setupS3(bucketName string) (*aws.Config, func()) {
	backend := s3mem.New()
	faker := gofakes3.New(backend)
	ts := httptest.NewServer(faker.Server())
	s3cfg, closer := s3Config(ts.URL), ts.Close

	c := s3.New(session.New(s3cfg))
	_, err := c.CreateBucket(&s3.CreateBucketInput{
		Bucket: &bucketName,
	})
	if err != nil {
		panic(err)
	}

	return s3cfg, closer
}

func main() {
	ctx := context.Background()
	s3cfg, close := setupS3("bucket")
	defer close()

	c := s3.New(session.New(s3cfg))

	cfg := kv.Config{
		Storage: &kv.S3BucketInfo{
			EndpointURL: c.Endpoint,
			BucketName:  "bucket",
			Prefix:      "/my-awesome-database",
		},
		KeysLike:      "key",
		ValuesLike:    1234,
		NodeCache:     mast.NewNodeCache(1024),
		NodeEncryptor: kv.V1NodeEncryptor([]byte("This is a secret passphrase if ever there was one.")),
	}
	s, err := kv.Open(ctx, c, cfg, kv.OpenOptions{}, time.Now())
	if err != nil {
		panic(err)
	}

	// setting a value
	err = s.Set(ctx, time.Now(), "hello", 5)
	if err != nil {
		panic(err)
	}

	// getting a value
	var v int
	ok, err := s.Get(ctx, "hello", &v)
	if err != nil {
		panic(err)
	}
	if !ok {
		panic("how is that not OK?")
	}
	fmt.Printf("hello=%d\n", v)

	_, err = s.Commit(ctx)
	if err != nil {
		panic(err)
	}

	fmt.Printf("size %d\n", s.Size())
}
Output:

hello=5
size 1

Index

Examples

Constants

View Source
const (
	// DefaultBranchFactor is the number of entries that will be stored per S3 object, if
	// not overridden by Config.BranchFactor.
	DefaultBranchFactor = 4096
)

Variables

View Source
var ErrMACVerificationFailure = errors.New("MAC verification failure")
View Source
var (
	// ErrReadOnly is the result of an attempt to Set or Commit on a tree that was not opened for writing.
	ErrReadOnly = errors.New("opened as read-only")
)

Functions

func DeleteHistoricVersions

func DeleteHistoricVersions(ctx context.Context, s *DB, before time.Time) error

DeleteeHistoricalVersionsAndData deletes storage associated with old versions, and any nodes from those versions that are no longer necessary for reading later versions. Deletion reduces the number of objects in the bucket but means that attempts to Diff(), TraceHistory(), or merge versions before the cutoff will fail.

Types

type Config

type Config struct {
	// S3 endpoint, bucket name, and database prefix
	Storage *S3BucketInfo
	// An optional label that will be added to committed versions' names for easier listing (e.g. "daily-report-")
	CustomRootPrefix string
	// An example of a key, to know what concrete type to make when unmarshaling
	KeysLike interface{}
	// An example of a value, to know what concrete type to make when unmarshaling
	ValuesLike interface{}
	// Sets the entries-per-S3 object for new trees (ignored unless tree is empty)
	BranchFactor uint
	// An optional S3-endpoint-scoped cache, instead of re-downloading recently-used tree nodes
	NodeCache mast.NodeCache
	// A way to keep your cloud provider out of your nodes
	NodeEncryptor Encryptor
	// OnConflictMerged is a callback that will be invoked whenever entries for a key have
	// different values in different trees that are being merged. It can be used to detect
	// when a uniqueness constraint has been broken and which keys need fixing.
	OnConflictMerged
	// LogFunc is a callback that will be invoked to provide details on potential corruption.
	LogFunc func(string)

	CustomMerge func(key interface{}, v1, v2 crdtpub.Value) crdtpub.Value

	MastNodeFormat string

	CustomMarshal func(interface{}) ([]byte, error)
	// CustomUnmarshal, if using registered types, will be invoked
	// to read tree nodes which are mast.PersistedNode with keys of
	// KeysLike, and values of crdtpub.Value of ValuesLike. All of
	// those should be registered before invoking kv.Open().
	CustomUnmarshal              func([]byte, interface{}) error
	UnmarshalUsesRegisteredTypes bool
}

Config defines how values are stored and (un)marshaled.

type Cursor

type Cursor struct {
	*mast.Cursor
}

func (*Cursor) Get

func (c *Cursor) Get() (interface{}, *crdtpub.Value, bool)

Get overrides mast.Cursor.Get() to make the crdt.Value wrapping clear.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is an Open()ed database.

func Open

func Open(ctx context.Context, S3 S3Interface, cfg Config, opts OpenOptions, when time.Time) (*DB, error)

Open returns a database. 'when' marks the creation time of the new version.

func (*DB) BranchFactor

func (d *DB) BranchFactor() uint

func (*DB) Cancel

func (s *DB) Cancel()

Cancel indicates you don't want any of the previously-set entries persisted.

func (*DB) Clone

func (s *DB) Clone(ctx context.Context) (*DB, error)

Clone returns an independent database, with the same entries and uncommitted values as its source. Clones don't duplicate nodes that can be shared.

func (*DB) Commit

func (s *DB) Commit(ctx context.Context) (*string, error)

Commit ensures any Set() entries become accessible on subsequent Open()s.

func (*DB) Cursor

func (d *DB) Cursor(ctx context.Context) (*Cursor, error)

func (DB) Diff

func (s DB) Diff(
	ctx context.Context,
	from *DB,
	f func(key, myValue, fromValue interface{}) (keepGoing bool, err error),
) error

Diff shows the differences from the the given tree to this one, invoking the given callback for each added/removed/changed value.

func (*DB) Get

func (s *DB) Get(ctx context.Context, key interface{}, value interface{}) (bool, error)

Get retrieves the value for the given key. value must be a pointer that to which an Config.ValuesLike can be assigned, OR a pointer to a crdt.Value, in which case the CRDT value metadata will be included.

func (DB) Height

func (s DB) Height() int

Height is the number of nodes between a leaf and the root—the number of nodes that need to be retrieved to do a Get() in the worst case.

func (DB) IsDirty

func (s DB) IsDirty() bool

IsDirty returns true if there are entries in memory that haven't been Commit()ted.

func (*DB) IsTombstoned

func (s *DB) IsTombstoned(ctx context.Context, key interface{}) (bool, error)

IsTombstoned returns true if Tombstone() was invoked and committed since the last RemoveTombstones().

func (*DB) RemoveTombstones

func (s *DB) RemoveTombstones(ctx context.Context, before time.Time) error

RemoveTombstones gets rid of the old tombstones, in the current version. The time should be chosen such that there are not going to be any merges for affected entries, otherwise merged sets will start new rounds of values for the affected entries. For example, if it would be valid for clients to retry hour-old requests, 'before' should be at least an hour ago.

func (*DB) Roots added in v0.1.33

func (s *DB) Roots() ([]string, error)

func (*DB) Set

func (s *DB) Set(ctx context.Context, when time.Time, key interface{}, value interface{}) error

Set puts a new value in memory. If the database already has a value later than "when", this does nothing. Any new values are buffered in memory until Commit()ted or Cancel()ed.

func (DB) Size

func (s DB) Size() uint64

Size returns the number of entries and tombstones in this tree.

func (*DB) StartDiff added in v0.1.33

func (d *DB) StartDiff(ctx context.Context, other *DB) (*DiffCursor, error)

func (*DB) Tombstone

func (s *DB) Tombstone(ctx context.Context, when time.Time, key interface{}) error

Tombstone sets a persistent record indicating an entry will no longer be accessible, until RemoveTombstones() is done. If multiple versions have different values or tombstones for a key, they'll eventually get merged into the earliest tombstone. You can set a tombstone even if the entry was never set to a value, just to ensure that any future sets are ineffective.

func (*DB) TraceHistory

func (s *DB) TraceHistory(
	ctx context.Context,
	key interface{},
	after time.Time,
	cb func(when time.Time, value interface{}) (keepGoing bool, err error),
) error

TraceHistory invokes the given callback for each historic value of 'key', up to the last RemoveTombstones().

type DiffCursor added in v0.1.33

type DiffCursor struct {
	*mast.DiffCursor
}

func (*DiffCursor) NextEntry added in v0.1.33

func (dc *DiffCursor) NextEntry(ctx context.Context) (mast.Diff, error)

type Encryptor

type Encryptor interface {
	Encrypt(path string, value []byte) ([]byte, error)
	Decrypt(path string, value []byte) ([]byte, error)
}

Encryptor encrypts bytes with a nonce derived from the given path.

func V1NodeEncryptor

func V1NodeEncryptor(passphrase []byte) Encryptor

type OnConflictMerged

type OnConflictMerged func(key, v1, v2 interface{}) error

OnConflictMerged is a callback that will be invoked whenever entries for a key have different values in different trees that are being merged. It can be used to detect when a uniqueness constraint has been broken and which keys need fixing.

type OpenOptions

type OpenOptions struct {
	// ReadOnly means the database should not permit modifications.
	ReadOnly bool
	// OnlyVersions is used to see a tree as it looked for a set of historic versions
	OnlyVersions []string
	// ForceRebranch is used to change the branch factor of an existing tree, rewriting nodes and roots if necessary
	ForceRebranch bool
}

OpenOptions control how databases are opened.

type S3BucketInfo

type S3BucketInfo struct {
	// EndpointURL is used to distinguish servers for caching
	EndpointURL string
	// BucketName where the node objects are stored
	BucketName string
	// Prefix is a key prefix that distinguishes objects for this tree
	Prefix string
}

S3BucketInfo describes where DB objects, including nodes and roots, are stored.

type S3Interface

type S3Interface interface {
	// à la AWS SDK for S3
	DeleteObjectWithContext(ctx aws.Context, input *s3.DeleteObjectInput, opts ...request.Option) (*s3.DeleteObjectOutput, error)
	// à la AWS SDK for S3
	GetObjectWithContext(ctx aws.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error)
	// à la AWS SDK for S3
	ListObjectsV2WithContext(ctx aws.Context, input *s3.ListObjectsV2Input, opts ...request.Option) (*s3.ListObjectsV2Output, error)
	// à la AWS SDK for S3
	PutObjectWithContext(ctx aws.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error)
}

S3Interface is the subset of the AWS SDK for S3 used access compatible buckets

Directories

Path Synopsis
cmd
kv
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL