klevdb

package module
v0.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: MIT Imports: 18 Imported by: 2

README

klevdb

CI Go Reference

klevdb is a fast message store, written in Go. Think single partition on Kafka, but stored locally.

In addition to basic consuming by offset, you can also configure klevdb to index times and keys. Time indexes allow you to quickly find a message by its time (or the first message after a certain time). Key indexes allow you to quickly find the last message with a given key.

Usage

To add klevdb to your package use:

go get github.com/klev-dev/klevdb

To use klevdb:

package main

import (
    "fmt"

    "github.com/klev-dev/klevdb"
)

func main() {
	db, _ := klevdb.Open("/tmp/kdb", klevdb.Options{
		CreateDirs: true,
		KeyIndex:   true,
	})
	defer db.Close()

	publishNext, _ := db.Publish([]klevdb.Message{
		{
			Key:   []byte("key1"),
			Value: []byte("val1"),
		},
		{
			Key:   []byte("key1"),
			Value: []byte("val2"),
		},
	})
	fmt.Println("published, next offset:", publishNext)

	consumeNext, msgs, _ := db.Consume(klevdb.OffsetOldest, 1)
	fmt.Println("consumed:", msgs, "value:", string(msgs[0].Value))
	fmt.Println("next consume offset:", consumeNext)

	msg, _ := db.GetByKey([]byte("key1"))
	fmt.Println("got:", msg, "value:", string(msg.Value))
}

Running the above program, outputs the following:

published, next offset: 2
consumed: [{0 2009-11-10 23:00:00 +0000 UTC [107 101 121 49] [118 97 108 49]}] value: val1
next consume offset: 1
got: {1 2009-11-10 23:00:00 +0000 UTC [107 101 121 49] [118 97 108 50]} value: val2

Further documentation is available at GoDoc

Performance

Benchmarks on framework gen1 i5:

goos: linux
goarch: amd64
pkg: github.com/klev-dev/klevdb
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
Publish
≻ make bench-publish 
...
BenchmarkSingle/Publish/V2/1/No-8               575226	      2473 ns/op	  76.82 MB/s	     119 B/op	       0 allocs/op
BenchmarkSingle/Publish/V2/1/Times-8         	  669390	      2649 ns/op	  74.75 MB/s	     119 B/op	       0 allocs/op
BenchmarkSingle/Publish/V2/1/Keys-8          	  538255	      2918 ns/op	  67.85 MB/s	     262 B/op	       6 allocs/op
BenchmarkSingle/Publish/V2/1/All-8           	  515534	      2898 ns/op	  71.08 MB/s	     262 B/op	       6 allocs/op
BenchmarkSingle/Publish/V2/8/No-8            	  641661	      2531 ns/op	 600.64 MB/s	     150 B/op	       0 allocs/op
BenchmarkSingle/Publish/V2/8/Times-8         	  674102	      2566 ns/op	 617.35 MB/s	     150 B/op	       0 allocs/op
BenchmarkSingle/Publish/V2/8/Keys-8          	  583078	      2780 ns/op	 569.76 MB/s	     287 B/op	       5 allocs/op
BenchmarkSingle/Publish/V2/8/All-8           	  576492	      2738 ns/op	 601.84 MB/s	     287 B/op	       5 allocs/op
...

With default rollover of 1MB, for messages with keys 10B and values 128B:

  • ~400,000 messages/sec, no indexes
  • ~350,000 messages/sec, with all indexes enabled
  • scales linearly with the batch size
Consume
≻ make bench-consume 
...
BenchmarkSingle/Consume/V2/W/No/1-8         	 3940876	       291.1 ns/op	 652.67 MB/s	     256 B/op	       2 allocs/op
BenchmarkSingle/Consume/V2/W/No/8-8         	10298083	       124.1 ns/op	12248.89 MB/s	     264 B/op	       1 allocs/op
...
BenchmarkSingle/Consume/V2/W/All/1-8         	 4214562	       300.9 ns/op	 684.58 MB/s	     256 B/op	       2 allocs/op
BenchmarkSingle/Consume/V2/W/All/8-8         	 7985094	       163.2 ns/op	10100.82 MB/s	     264 B/op	       1 allocs/op
...

With default rollover of 1MB, for messages with keys 10B and values 128B:

  • ~3,500,000 messages/sec, single message consume
  • ~8,000,000 messages/sec, in 8 message batch consume
Get
≻ make bench-get
...
BenchmarkSingle/Get/V2/ByOffset-8         	 3820048	       295.7 ns/op	 642.63 MB/s	     256 B/op	       2 allocs/op
BenchmarkSingle/Get/V2/ByKey/W-8          	 1000000	      3363 ns/op	  58.88 MB/s	     264 B/op	       3 allocs/op
BenchmarkSingle/Get/V2/ByKey/A-8          	 1000000	      3634 ns/op	  54.49 MB/s	     456 B/op	       8 allocs/op
BenchmarkSingle/Get/V2/ByTime/W-8         	 1000000	      2499 ns/op	  79.24 MB/s	     256 B/op	       2 allocs/op
BenchmarkSingle/Get/V2/ByTime/A-8         	 1000000	      2374 ns/op	  83.42 MB/s	     313 B/op	       2 allocs/op
...

With default rollover of 1MB, for messages with keys 10B and values 128B:

  • ~3,300,000 gets/sec, across all offsets
  • ~290,000 key reads/sec, across all keys
  • ~420,000 time reads/sec, across all times
Multi
≻ make bench-multi
BenchmarkMulti/Base/V2-8      	  442836	      3155 ns/op	                   570 B/op	       6 allocs/op
BenchmarkMulti/Publish/V2-8   	   45343	     26558 ns/op	  31.03 MB/s	    2807 B/op	      56 allocs/op
BenchmarkMulti/Consume/V2-8   	 1707186	     681.4 ns/op	1162.23 MB/s	    2559 B/op	      12 allocs/op
BenchmarkMulti/GetKey/V2-8    	  288034	      4286 ns/op	  48.06 MB/s	    2640 B/op	      30 allocs/op
...

Future ideas

  • cmd to interact with logs
  • unified key index - an index across all segments
  • unified time index - across all segments
  • lightweight logs - delay opening the writer
  • multi index store
  • remove interfaces

Documentation

Index

Constants

View Source
const (
	// OffsetOldest represents the smallest offset still available
	// Use it to consume all messages, starting at the first available
	OffsetOldest = message.OffsetOldest
	// OffsetNewest represents the offset that will be used for the next publish
	// Use it to consume only new messages
	OffsetNewest = message.OffsetNewest
	// OffsetInvalid is the offset returned when error is detected
	OffsetInvalid = message.OffsetInvalid
)

Variables

View Source
var (
	V1    = Version{message.V1, index.V1}
	V2    = Version{message.V2, index.V2}
	VLast = V2
)
View Source
var ErrInvalidOffset = message.ErrInvalidOffset

ErrInvalidOffset error is returned when the offset attribute is invalid or out of bounds

View Source
var ErrNoIndex = errors.New("no index")

ErrNoIndex error is returned when we try to use key or timestamp, but the log doesn't include index on them

View Source
var ErrNotFound = message.ErrNotFound

ErrNotFound error is returned when the offset, key or timestamp is not found

View Source
var ErrReadonly = errors.New("log opened in readonly mode")

ErrReadonly error is returned when attempting to modify (e.g. publish or delete) from a log that is open as a readonly

View Source
var InvalidMessage = message.Invalid

InvalidMessage returned when an error has occurred

View Source
var StringCodec = stringCodec{}

StringCodec supports coding a string

View Source
var StringOptCodec = stringOptCodec{}

StringOptCodec supports coding an optional string, e.g. differentiates between "" and nil strings

View Source
var VarintCodec = varintCodec{}

VarintCodec supports coding integers as varint

Functions

func Backup

func Backup(src, dst string) error

Backup backups a store directory to another location, without opening the store

func Check

func Check(dir string, opts Options) error

Check runs an integrity check, without opening the store

func Compact added in v0.12.0

func Compact(ctx context.Context, l Log, age time.Duration, boff DeleteMultiBackoff) error

func CompactDeletesMultiOffsets added in v0.12.0

func CompactDeletesMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error)

CompactDeletesMultiOffsets is similar to CompactDeletesMulti, but only returns the deleted offsets

func CompactUpdatesMultiOffsets added in v0.12.0

func CompactUpdatesMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error)

CompactUpdatesMultiOffsets is similar to CompactUpdatesMulti, but only returns the deleted offsets

func DeleteMultiOffsets added in v0.12.0

func DeleteMultiOffsets(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error)

DeleteMultiOffsets is similar to DeleteMulti but will only collect offsets instead of whole messages

func FindByAge added in v0.12.0

func FindByAge(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error)

FindByAge returns a set of offsets for messages that are at the start of the log and before given time.

func FindByCount added in v0.12.0

func FindByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, error)

FindByCount returns a set of offsets for messages that when removed will keep the number of messages in the log under max

func FindByOffset added in v0.12.0

func FindByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, error)

FindByOffset returns a set of offsets for messages whose offset is before a given offset

func FindBySize added in v0.12.0

func FindBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, error)

FindBySize returns a set of offsets for messages that if deleted will decrease the log size to sz

func FindDeletes added in v0.12.0

func FindDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error)

FindDeletes returns a set of offsets for messages with nil value for a given key, before a given time.

Messages that have a nil value are considered deletes for this key, and therefore eligible for deletion.

func FindUpdates added in v0.12.0

func FindUpdates(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error)

FindUpdates returns a set of offsets for messages that have the same key further in the log, before a given time.

Messages before the last one for a given key are considered updates that are no longer relevant, and therefore are eligible for deletion.

func Migrate added in v0.12.0

func Migrate(dir string, opts Options, version Version) error

Migrate rewrites all segments with a concrete options and version

func Recover

func Recover(dir string, opts Options) error

Recover rewrites the storage to include all messages prior the first that fails an integrity check

func TrimByAgeMultiOffsets added in v0.12.0

func TrimByAgeMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error)

TrimByAgeMultiOffsets is similar to TrimByAgeMulti, but only returns the deleted offsets

func TrimByCountMultiOffsets added in v0.12.0

func TrimByCountMultiOffsets(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error)

TrimByCountMultiOffsets is similar to TrimByCountMulti, but only returns the deleted offsets

func TrimByOffsetMultiOffsets added in v0.12.0

func TrimByOffsetMultiOffsets(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error)

TrimByOffsetMultiOffsets is similar to TrimByOffsetMulti, but only returns the deleted offsets

Types

type BlockingLog added in v0.6.0

type BlockingLog interface {
	Log

	// ConsumeBlocking is similar to [Consume], but if offset is equal to the next offset it will block until next message is published
	ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

	// ConsumeByKeyBlocking is similar to [ConsumeBlocking], but only returns messages matching the key
	ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)
}

BlockingLog enhances Log adding blocking consume

func OpenBlocking added in v0.6.0

func OpenBlocking(dir string, opts Options) (BlockingLog, error)

OpenBlocking opens a Log and wraps it with support for blocking consume

func WrapBlocking added in v0.6.0

func WrapBlocking(l Log) (BlockingLog, error)

WrapBlocking wraps a Log with support for blocking consume

type Codec added in v0.5.0

type Codec[T any] interface {
	Encode(t T, empty bool) (b []byte, err error)
	Decode(b []byte) (t T, empty bool, err error)
}

Codec is interface satisfied by all codecs

type DeleteMultiBackoff added in v0.3.0

type DeleteMultiBackoff func(context.Context) error

DeleteMultiBackoff is called on each iteration of DeleteMulti to give applications opportunity to not overload the target log with deletes

func DeleteMultiWithWait added in v0.3.0

func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff

DeleteMultiWithWait returns a backoff func that sleeps/waits for a certain duration. If context is canceled while executing it returns the associated error

type JsonCodec added in v0.5.0

type JsonCodec[T any] struct{}

JsonCodec supports coding values as JSON

func (JsonCodec[T]) Decode added in v0.5.0

func (c JsonCodec[T]) Decode(b []byte) (T, bool, error)

func (JsonCodec[T]) Encode added in v0.5.0

func (c JsonCodec[T]) Encode(t T, empty bool) ([]byte, error)

type Log

type Log interface {
	// Publish appends messages to the log.
	// It returns the offset of the next message to be appended.
	// The offset of the message is ignored, set to the actual offset.
	// If the time of the message is 0, it is set to the current UTC time.
	Publish(messages []Message) (nextOffset int64, err error)

	// NextOffset returns the offset of the next message to be published.
	NextOffset() (nextOffset int64, err error)

	// Consume retrieves messages from the log, starting at the offset.
	// It returns offset, which can be used to retrieve for the next consume.
	// If offset == OffsetOldest, the first message will be the oldest
	//   message still available on the log. If the log is empty,
	//   it will return no error, nextOffset will be 0
	// If offset == OffsetNewest, no actual messages will be returned,
	//   but nextOffset will be set to the offset that will be used
	//   for the next Publish call
	// If offset is before the first available on the log, or is after
	//   NextOffset, it returns ErrInvalidOffset
	// If the exact offset is already deleted, it will start consuming
	//   from the next available offset.
	// Consume is allowed to return no messages, but with increasing nextOffset
	//   in case messages between offset and nextOffset have been deleted.
	// NextOffset is always bigger than offset, unless we are caught up
	//   to the head of the log in which case they are equal.
	Consume(offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

	// ConsumeByKey is similar to Consume, but only returns messages matching the key
	ConsumeByKey(key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

	// Get retrieves a single message, by its offset
	// If offset == OffsetOldest, it returns the first message on the log
	// If offset == OffsetNewest, it returns the last message on the log
	// If offset is before the first available on the log, or is after
	//   NextOffset, it returns ErrInvalidOffset
	// If log is empty, it returns ErrInvalidOffset
	// If the exact offset has been deleted, it returns ErrNotFound
	Get(offset int64) (message Message, err error)

	// GetByKey retrieves the last message in the log for this key
	// If no such message is found, it returns ErrNotFound
	GetByKey(key []byte) (message Message, err error)
	// OffsetByKey retrieves the last message offset in the log for this key
	// If no such message is found, it returns ErrNotFound
	OffsetByKey(key []byte) (offset int64, err error)

	// GetByTime retrieves the first message after start time
	// If start time is after all messages in the log, it returns ErrNotFound
	GetByTime(start time.Time) (message Message, err error)
	// OffsetByTime retrieves the first message offset and its time after start time
	// If start time is after all messages in the log, it returns ErrNotFound
	OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error)

	// Delete tries to delete a set of messages by their offset
	//   from the log and returns the amount of storage deleted
	// It does not guarantee that it will delete all messages,
	//   it returns list of actually deleted messages.
	Delete(offsets map[int64]struct{}) (deletedMessages []Message, deletedSize int64, err error)

	// Size returns the amount of storage a message occupies in the
	// NewSegmentsVersion format (see VersionOptions), plus the index overhead.
	// For logs with mixed V1/V2 segments this may differ from the actual
	// on-disk size of messages stored in older segments.
	Size(m Message) int64

	// Stat returns log stats like disk space, number of messages
	Stat() (Stats, error)

	// Backup takes a backup snapshot of this log to another location
	Backup(dir string) error

	// Sync forces persisting data to the disk. It returns the nextOffset
	// at the time of the Sync, so clients can determine what portion
	// of the log is now durable.
	Sync() (nextOffset int64, err error)

	// GC releases any unused resources associated with this log
	GC(unusedFor time.Duration) error

	// Close closes the log
	Close() error
}

func Open

func Open(dir string, opts Options) (result Log, err error)

Open opens or creates a Log based on a dir and set of options

type Message

type Message = message.Message

func CompactDeletes added in v0.12.0

func CompactDeletes(ctx context.Context, l Log, before time.Time) ([]Message, int64, error)

CompactDeletes tries to remove messages with nil value before given time. It will not remove messages for keys it sees before that offset.

This is similar to removing keys, which were deleted (e.g. value set to nil) and are therefore no longer relevant/active.

returns the messages it deleted and the amount of storage freed

func CompactDeletesMulti added in v0.12.0

func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error)

CompactDeletesMulti is similar to CompactDeletes, but will try to remove messages from multiple segments

func CompactUpdates added in v0.12.0

func CompactUpdates(ctx context.Context, l Log, before time.Time) ([]Message, int64, error)

CompactUpdates tries to remove messages before given time that are repeated further in the log leaving only the last message for a given key.

This is similar to removing the old value updates, leaving only the current value (last update) for a key.

returns the messages it deleted and the amount of storage freed

func CompactUpdatesMulti added in v0.12.0

func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error)

CompactUpdatesMulti is similar to CompactUpdates, but will try to remove messages from multiple segments

func DeleteMulti added in v0.3.0

func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) ([]Message, int64, error)

DeleteMulti tries to delete all messages with offsets from the log and returns the amount of storage deleted

If error is encountered, it will return the deleted offsets and size, together with the error

DeleteMultiBackoff is called on each iteration to give others a chance to work with the log, while being deleted

func TrimByAge added in v0.12.0

func TrimByAge(ctx context.Context, l Log, before time.Time) ([]Message, int64, error)

TrimByAge tries to remove the messages at the start of the log before given time.

returns the messages it deleted and the amount of storage freed

func TrimByAgeMulti added in v0.12.0

func TrimByAgeMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error)

TrimByAgeMulti is similar to TrimByAge, but will try to remove messages from multiple segments

func TrimByCount added in v0.12.0

func TrimByCount(ctx context.Context, l Log, max int) ([]Message, int64, error)

TrimByCount tries to remove messages to keep the number of messages in the log under max count.

returns the messages it deleted and the amount of storage freed

func TrimByCountMulti added in v0.12.0

func TrimByCountMulti(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) ([]Message, int64, error)

TrimByCountMulti is similar to TrimByCount, but will try to remove messages from multiple segments

func TrimByOffset added in v0.12.0

func TrimByOffset(ctx context.Context, l Log, before int64) ([]Message, int64, error)

TrimByOffset tries to remove the messages at the start of the log before offset

returns the messages it deleted and the amount of storage freed

func TrimByOffsetMulti added in v0.12.0

func TrimByOffsetMulti(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) ([]Message, int64, error)

TrimByOffsetMulti is similar to TrimByOffset, but will try to remove messages from multiple segments

func TrimBySize added in v0.12.0

func TrimBySize(ctx context.Context, l Log, sz int64) ([]Message, int64, error)

TrimBySize tries to remove messages until log size is less than sz

returns the messages it deleted and the amount of storage freed

func TrimBySizeMulti added in v0.12.0

func TrimBySizeMulti(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) ([]Message, int64, error)

TrimBySizeMulti is similar to TrimBySize, but will try to remove messages from multiple segments

func TrimBySizeMultiOffsets added in v0.12.0

func TrimBySizeMultiOffsets(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) ([]Message, int64, error)

TrimBySizeMultiOffsets is similar to TrimBySizeMulti, but only returns the deleted offsets

type Options

type Options struct {
	// When set will try to create all directories
	CreateDirs bool
	// Open the store in readonly mode
	Readonly bool
	// Index message keys, enabling GetByKey and OffsetByKey.
	// This setting must not change after the store is first created; changing it will return ErrCorrupted.
	KeyIndex bool
	// Index message times, enabling GetByTime and OffsetByTime.
	// This setting must not change after the store is first created; changing it will return ErrCorrupted.
	TimeIndex bool
	// Force filesystem sync after each Publish
	AutoSync bool
	// At what segment size it will rollover to a new segment. Defaults to 1MB.
	Rollover int64
	// Check the head segment for integrity, before opening it for reading/writing.
	Check bool
	// Recover any good prefix from the head segment, before opening it for reading/writing. If the whole
	// segment is readable and passing the integrity checks, Recover is a noop. If both Check and Recover are set,
	// Open will directly try to recover the segment in read-write mode.
	Recover bool
	// Upgrade specifies how to upgrade the versions
	Version VersionOptions
}

type Stats

type Stats = segment.Stats

func Stat

func Stat(dir string, opts Options) (Stats, error)

Stat stats a store directory, without opening the store

type TBlockingLog added in v0.6.0

type TBlockingLog[K any, V any] interface {
	TLog[K, V]

	// ConsumeBlocking see [BlockingLog.ConsumeBlocking]
	ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

	// ConsumeByKeyBlocking see [BlockingLog.ConsumeByKeyBlocking]
	ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)
}

TBlockingLog enhances TLog adding blocking consume

func OpenTBlocking added in v0.6.0

func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error)

OpenTBlocking opens a TLog and wraps it with support for blocking consume

func WrapTBlocking added in v0.6.0

func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error)

WrapTBlocking wraps a TLog with support for blocking consume

type TLog added in v0.6.0

type TLog[K any, V any] interface {
	// Publish see [Log.Publish]
	Publish(messages []TMessage[K, V]) (nextOffset int64, err error)

	// NextOffset see [Log.NextOffset]
	NextOffset() (nextOffset int64, err error)

	// Consume see [Log.Consume]
	Consume(offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

	// ConsumeByKey see [Log.ConsumeByKey]
	ConsumeByKey(key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

	// Get see [Log.Get]
	Get(offset int64) (message TMessage[K, V], err error)

	// GetByKey see [Log.GetByKey]
	GetByKey(key K, empty bool) (message TMessage[K, V], err error)

	// OffsetByKey see [Log.OffsetByKey]
	OffsetByKey(key K, empty bool) (int64, error)

	// GetByTime see [Log.GetByTime]
	GetByTime(start time.Time) (message TMessage[K, V], err error)

	// OffsetByTime see [Log.OffsetByTime]
	OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error)

	// Delete see [Log.Delete]
	Delete(offsets map[int64]struct{}) (deletedMessages []TMessage[K, V], deletedSize int64, err error)

	// Size see [Log.Size]
	Size(m Message) int64

	// Stat see [Log.Stat]
	Stat() (Stats, error)

	// Backup see [Log.Backup]
	Backup(dir string) error

	// Sync see [Log.Sync]
	Sync() (nextOffset int64, err error)

	// GC see [Log.GC]
	GC(unusedFor time.Duration) error

	// Close see [Log.Close]
	Close() error

	// Raw returns the underlying log
	Raw() Log
}

TLog is a typed Log which encodes/decodes keys and values to bytes.

func OpenT added in v0.6.0

func OpenT[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error)

OpenT opens a typed log with specified key/value codecs

func WrapT added in v0.6.0

func WrapT[K any, V any](l Log, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error)

WrapT wraps a log with specified key/value codecs

type TMessage added in v0.6.0

type TMessage[K any, V any] struct {
	Offset     int64
	Time       time.Time
	Key        K
	KeyEmpty   bool
	Value      V
	ValueEmpty bool
}

TMessage represents a typed Message

type Version added in v0.12.0

type Version struct {
	// contains filtered or unexported fields
}

type VersionOptions added in v0.12.0

type VersionOptions struct {
	// NewSegmentsVersion indicates what version will new segments use
	NewSegmentsVersion Version

	// KeepRewriteVersion rewriting segments (delete) will keep the original segment version
	KeepRewriteVersion bool

	// EagerVersionMigrate when true, open will rewrite all segments with NewSegmentsVersion
	EagerVersionMigrate bool
}

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL