bucket

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2023 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// IncludeNil goes over all buckets, including those that are nil (not loaded.)
	IncludeNil = IterMode(iota)

	// LoadedOnly iterates over all buckets that were loaded already.
	LoadedOnly

	// Load loads all buckets, including those that were not loaded yet.
	Load
)
View Source
const (
	ReadOpPeek = 0
	ReadOpPop  = 1
	ReadOpMove = 2
)
View Source
const (
	// SyncNone does not sync on normal operation (only on close)
	SyncNone = 0
	// SyncData only synchronizes the data log
	SyncData = SyncMode(1 << iota)
	// SyncIndex only synchronizes the index log (does not make sense alone)
	SyncIndex
	// SyncFull syncs both the data and index log
	SyncFull = SyncData | SyncIndex
)

The available option are inspired by SQLite: https://www.sqlite.org/pragma.html#pragma_synchronous

View Source
const (
	// ErrorModeAbort will immediately abort the current
	// operation if an error is encountered that might lead to data loss.
	ErrorModeAbort = ErrorMode(iota)

	// ErrorModeContinue tries to progress further in case of errors
	// by jumping over a faulty bucket or entry in a bucket.
	// If the error was recoverable, none is returned, but the
	// Logger in the Options will be called (if set) to log the error.
	ErrorModeContinue
)
View Source
const (
	DataLogName = "dat.log"
)

Variables

View Source
var DefaultBucketFunc = ShiftBucketFunc(37)
View Source
var IterStop = errors.New("iteration stopped")

IterStop can be returned in Iter's func when you want to stop It does not count as error.

Functions

func FixedSizeBucketFunc

func FixedSizeBucketFunc(n uint64) func(key item.Key) item.Key

FixedSizeBucketFunc returns a BucketFunc that divides buckets into equal sized buckets with `n` entries. This can also be used to create time-based keys, if you use nanosecond based keys and pass time.Minute to create a buckets with a size of one minute.

func ShiftBucketFunc

func ShiftBucketFunc(shift int) func(key item.Key) item.Key

ShiftBucketFunc creates a fast BucketFunc that divides data into buckets by masking `shift` less significant bits of the key. With a shift of 37 you roughly get 2m buckets (if your key input are nanosecond-timestamps). If you want to calculate the size of a shift, use this formula: (2 ** shift) / (1e9 / 60) = minutes

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

func Open

func Open(dir string, opts Options) (buck *Bucket, outErr error)

func (*Bucket) Close

func (b *Bucket) Close() error

func (*Bucket) DeleteLowerThan

func (b *Bucket) DeleteLowerThan(key item.Key) (ndeleted int, outErr error)

func (*Bucket) Empty

func (b *Bucket) Empty() bool

func (*Bucket) Key

func (b *Bucket) Key() item.Key

func (*Bucket) Len

func (b *Bucket) Len() int

func (*Bucket) Move

func (b *Bucket) Move(n int, dst item.Items, dstBuck *Bucket) (item.Items, int, error)

Move moves data between two buckets in a safer way. In case of crashes the data might be present in the destination queue, but is not yet deleted from the source queue. Callers should be ready to handle duplicates.

func (*Bucket) Peek

func (b *Bucket) Peek(n int, dst item.Items) (item.Items, int, error)

func (*Bucket) Pop

func (b *Bucket) Pop(n int, dst item.Items) (item.Items, int, error)

func (*Bucket) Push

func (b *Bucket) Push(items item.Items) (outErr error)

Push expects pre-sorted items!

func (*Bucket) Sync

func (b *Bucket) Sync(force bool) error

type Buckets

type Buckets struct {
	// contains filtered or unexported fields
}

func LoadAll

func LoadAll(dir string, maxParallelOpenBuckets int, opts Options) (*Buckets, error)

func (*Buckets) Clear

func (bs *Buckets) Clear() error

func (*Buckets) Close

func (bs *Buckets) Close() error

func (*Buckets) Delete

func (bs *Buckets) Delete(key item.Key) error

func (*Buckets) DeleteLowerThan

func (bs *Buckets) DeleteLowerThan(key item.Key) (int, error)

func (*Buckets) Len

func (bs *Buckets) Len() int

func (*Buckets) Push

func (bs *Buckets) Push(items item.Items) error

Push pushes a batch of `items` to the queue.

func (*Buckets) Read

func (bs *Buckets) Read(op ReadOp, n int, dst item.Items, fn ReadFn, dstBs *Buckets) error

Read handles all kind of reading operations. It is a low-level function that is not part of the official API.

func (*Buckets) Shovel

func (bs *Buckets) Shovel(dstBs *Buckets) (int, error)

func (*Buckets) Sync

func (bs *Buckets) Sync() error

func (*Buckets) ValidateBucketKeys

func (bs *Buckets) ValidateBucketKeys(bucketFn func(item.Key) item.Key) error

ValidateBucketKeys checks if the keys in the buckets correspond to the result of the key func. Failure here indicates that the key function changed. No error does not guarantee that the key func did not change though (e.g. the identity func would produce no error in this check)

type ErrorMode

type ErrorMode int

func (ErrorMode) IsValid

func (em ErrorMode) IsValid() bool

type IterMode

type IterMode int

type Logger

type Logger interface {
	Printf(fmt string, args ...any)
}

Logger is a small interface to redirect logs to. The default logger outputs to stderr.

func DefaultLogger

func DefaultLogger() Logger

DefaultLogger produces a logger that writes to stderr.

func NullLogger

func NullLogger() Logger

NullLogger produces a logger that discards all messages.

func WriterLogger

func WriterLogger(w io.Writer) Logger

type Options

type Options struct {
	// SyncMode controls how often we sync data to the disk. The more data we sync
	// the more durable is the queue at the cost of throughput.
	// Default is the safe SyncFull. Think twice before lowering this.
	SyncMode SyncMode

	// Logger is used to output some non-critical warnigns or errors that could
	// have been recovered. By default we print to stderr.
	// Only warnings or errors are logged, no debug or informal messages.
	Logger Logger

	// ErrorMode defines how non-critical errors are handled.
	// See the individual enum values for more info.
	ErrorMode ErrorMode

	// BucketFunc defines what key goes to what bucket.
	// The provided function should clamp the key value to
	// a common value. Each same value that was returned goes
	// into the same bucket. The returned value should be also
	// the minimum key of the bucket.
	//
	// Example: '(key / 10) * 10' would produce buckets with 10 items.
	//
	// What bucket size to choose? Please refer to the FAQ in the README.
	//
	// NOTE: This may not be changed after you opened a queue with it!
	//       Only way to change is to create a new queue and shovel the
	//       old data into it.
	BucketFunc func(item.Key) item.Key

	// MaxParallelOpenBuckets limits the number of buckets that can be opened
	// in parallel. Normally, operations like Push() will create more and more
	// buckets with time and old buckets do not get closed automatically, as
	// we don't know when they get accessed again. If there are more buckets
	// open than this number they get closed and will be re-opened if accessed
	// again. If this happens frequently, this comes with a performance penalty.
	// If you tend to access your data with rather random keys, you might want
	// to increase this number, depending on how much resources you have.
	//
	// This is currently only applies to write operations (i.e. Push() and so on),
	// not for read operations like Pop() (as memory needs to stay intact when
	// reading from several buckets). If you have situations where you do read-only
	// operations for some time you should throw in a CloseUnused() call from time
	// to time to make sure memory gets cleaned up.
	//
	// If this number is <= 0, then this feature is disabled, which is not
	// recommended.
	MaxParallelOpenBuckets int
}

Options are fine-tuning knobs specific to individual buckets

func DefaultOptions

func DefaultOptions() Options

func (*Options) Validate

func (o *Options) Validate() error

type ReadFn

type ReadFn func(items item.Items) error

type ReadOp

type ReadOp int

ReadOp define the kind of operation done by the Read() function.

type SyncMode

type SyncMode int

func (SyncMode) IsValid

func (sm SyncMode) IsValid() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL