Documentation
¶
Index ¶
- Constants
- Variables
- func FixedSizeBucketFunc(n uint64) func(key item.Key) item.Key
- func ShiftBucketFunc(shift int) func(key item.Key) item.Key
- type Bucket
- func (b *Bucket) Close() error
- func (b *Bucket) DeleteLowerThan(key item.Key) (ndeleted int, outErr error)
- func (b *Bucket) Empty() bool
- func (b *Bucket) Key() item.Key
- func (b *Bucket) Len() int
- func (b *Bucket) Move(n int, dst item.Items, dstBuck *Bucket) (item.Items, int, error)
- func (b *Bucket) Peek(n int, dst item.Items) (item.Items, int, error)
- func (b *Bucket) Pop(n int, dst item.Items) (item.Items, int, error)
- func (b *Bucket) Push(items item.Items) (outErr error)
- func (b *Bucket) Sync(force bool) error
- type Buckets
- func (bs *Buckets) Clear() error
- func (bs *Buckets) Close() error
- func (bs *Buckets) Delete(key item.Key) error
- func (bs *Buckets) DeleteLowerThan(key item.Key) (int, error)
- func (bs *Buckets) Len() int
- func (bs *Buckets) Push(items item.Items) error
- func (bs *Buckets) Read(op ReadOp, n int, dst item.Items, fn ReadFn, dstBs *Buckets) error
- func (bs *Buckets) Shovel(dstBs *Buckets) (int, error)
- func (bs *Buckets) Sync() error
- func (bs *Buckets) ValidateBucketKeys(bucketFn func(item.Key) item.Key) error
- type ErrorMode
- type IterMode
- type Logger
- type Options
- type ReadFn
- type ReadOp
- type SyncMode
Constants ¶
const ( // IncludeNil goes over all buckets, including those that are nil (not loaded.) IncludeNil = IterMode(iota) // LoadedOnly iterates over all buckets that were loaded already. LoadedOnly // Load loads all buckets, including those that were not loaded yet. Load )
const ( ReadOpPeek = 0 ReadOpPop = 1 ReadOpMove = 2 )
const ( // SyncNone does not sync on normal operation (only on close) SyncNone = 0 // SyncData only synchronizes the data log SyncData = SyncMode(1 << iota) // SyncIndex only synchronizes the index log (does not make sense alone) SyncIndex // SyncFull syncs both the data and index log SyncFull = SyncData | SyncIndex )
The available option are inspired by SQLite: https://www.sqlite.org/pragma.html#pragma_synchronous
const ( // ErrorModeAbort will immediately abort the current // operation if an error is encountered that might lead to data loss. ErrorModeAbort = ErrorMode(iota) // ErrorModeContinue tries to progress further in case of errors // by jumping over a faulty bucket or entry in a bucket. // If the error was recoverable, none is returned, but the // Logger in the Options will be called (if set) to log the error. ErrorModeContinue )
const (
DataLogName = "dat.log"
)
Variables ¶
var DefaultBucketFunc = ShiftBucketFunc(37)
var IterStop = errors.New("iteration stopped")
IterStop can be returned in Iter's func when you want to stop It does not count as error.
Functions ¶
func FixedSizeBucketFunc ¶
FixedSizeBucketFunc returns a BucketFunc that divides buckets into equal sized buckets with `n` entries. This can also be used to create time-based keys, if you use nanosecond based keys and pass time.Minute to create a buckets with a size of one minute.
func ShiftBucketFunc ¶
ShiftBucketFunc creates a fast BucketFunc that divides data into buckets by masking `shift` less significant bits of the key. With a shift of 37 you roughly get 2m buckets (if your key input are nanosecond-timestamps). If you want to calculate the size of a shift, use this formula: (2 ** shift) / (1e9 / 60) = minutes
Types ¶
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
func (*Bucket) DeleteLowerThan ¶
func (*Bucket) Move ¶
Move moves data between two buckets in a safer way. In case of crashes the data might be present in the destination queue, but is not yet deleted from the source queue. Callers should be ready to handle duplicates.
type Buckets ¶
type Buckets struct {
// contains filtered or unexported fields
}
func (*Buckets) Read ¶
Read handles all kind of reading operations. It is a low-level function that is not part of the official API.
func (*Buckets) ValidateBucketKeys ¶
ValidateBucketKeys checks if the keys in the buckets correspond to the result of the key func. Failure here indicates that the key function changed. No error does not guarantee that the key func did not change though (e.g. the identity func would produce no error in this check)
type Logger ¶
Logger is a small interface to redirect logs to. The default logger outputs to stderr.
func DefaultLogger ¶
func DefaultLogger() Logger
DefaultLogger produces a logger that writes to stderr.
func WriterLogger ¶
type Options ¶
type Options struct {
// SyncMode controls how often we sync data to the disk. The more data we sync
// the more durable is the queue at the cost of throughput.
// Default is the safe SyncFull. Think twice before lowering this.
SyncMode SyncMode
// Logger is used to output some non-critical warnigns or errors that could
// have been recovered. By default we print to stderr.
// Only warnings or errors are logged, no debug or informal messages.
Logger Logger
// ErrorMode defines how non-critical errors are handled.
// See the individual enum values for more info.
ErrorMode ErrorMode
// BucketFunc defines what key goes to what bucket.
// The provided function should clamp the key value to
// a common value. Each same value that was returned goes
// into the same bucket. The returned value should be also
// the minimum key of the bucket.
//
// Example: '(key / 10) * 10' would produce buckets with 10 items.
//
// What bucket size to choose? Please refer to the FAQ in the README.
//
// NOTE: This may not be changed after you opened a queue with it!
// Only way to change is to create a new queue and shovel the
// old data into it.
BucketFunc func(item.Key) item.Key
// MaxParallelOpenBuckets limits the number of buckets that can be opened
// in parallel. Normally, operations like Push() will create more and more
// buckets with time and old buckets do not get closed automatically, as
// we don't know when they get accessed again. If there are more buckets
// open than this number they get closed and will be re-opened if accessed
// again. If this happens frequently, this comes with a performance penalty.
// If you tend to access your data with rather random keys, you might want
// to increase this number, depending on how much resources you have.
//
// This is currently only applies to write operations (i.e. Push() and so on),
// not for read operations like Pop() (as memory needs to stay intact when
// reading from several buckets). If you have situations where you do read-only
// operations for some time you should throw in a CloseUnused() call from time
// to time to make sure memory gets cleaned up.
//
// If this number is <= 0, then this feature is disabled, which is not
// recommended.
MaxParallelOpenBuckets int
}
Options are fine-tuning knobs specific to individual buckets
func DefaultOptions ¶
func DefaultOptions() Options