Documentation ¶
Overview ¶
Example ¶
package main import ( "context" "fmt" "os" "github.com/embano1/memlog" ) func main() { ctx := context.Background() l, err := memlog.New(ctx) if err != nil { fmt.Printf("create log: %v", err) os.Exit(1) } offset, err := l.Write(ctx, []byte("Hello World")) if err != nil { fmt.Printf("write: %v", err) os.Exit(1) } fmt.Printf("reading record at offset %d\n", offset) record, err := l.Read(ctx, offset) if err != nil { fmt.Printf("read: %v", err) os.Exit(1) } fmt.Printf("Data: %s", record.Data) }
Output: reading record at offset 0 Data: Hello World
Example (Batch) ¶
package main import ( "context" "errors" "fmt" "os" "github.com/embano1/memlog" ) func main() { const batchSize = 10 ctx := context.Background() l, err := memlog.New(ctx) if err != nil { fmt.Printf("create log: %v", err) os.Exit(1) } // seed log with data for i := 0; i < 15; i++ { d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i) _, err = l.Write(ctx, []byte(d)) if err != nil { fmt.Printf("write: %v", err) os.Exit(1) } } startOffset := memlog.Offset(0) batch := make([]memlog.Record, batchSize) fmt.Printf("reading batch starting at offset %d\n", startOffset) count, err := l.ReadBatch(ctx, startOffset, batch) if err != nil { fmt.Printf("read batch: %v", err) os.Exit(1) } fmt.Printf("records received in batch: %d\n", count) // print valid batch entries up to "count" for i := 0; i < count; i++ { r := batch[i] fmt.Printf("batch item: %d\toffset:%d\tdata: %s\n", i, r.Metadata.Offset, r.Data) } // read next batch and check if end of log reached startOffset += memlog.Offset(count) fmt.Printf("reading batch starting at offset %d\n", startOffset) count, err = l.ReadBatch(ctx, startOffset, batch) if err != nil { if errors.Is(err, memlog.ErrFutureOffset) { fmt.Println("reached end of log") } else { fmt.Printf("read batch: %v", err) os.Exit(1) } } fmt.Printf("records received in batch: %d\n", count) // print valid batch entries up to "count" for i := 0; i < count; i++ { r := batch[i] fmt.Printf("batch item: %d\toffset:%d\tdata: %s\n", i, r.Metadata.Offset, r.Data) } }
Output: reading batch starting at offset 0 records received in batch: 10 batch item: 0 offset:0 data: {"id":0,"message","hello world"} batch item: 1 offset:1 data: {"id":1,"message","hello world"} batch item: 2 offset:2 data: {"id":2,"message","hello world"} batch item: 3 offset:3 data: {"id":3,"message","hello world"} batch item: 4 offset:4 data: {"id":4,"message","hello world"} batch item: 5 offset:5 data: {"id":5,"message","hello world"} batch item: 6 offset:6 data: {"id":6,"message","hello world"} batch item: 7 offset:7 data: {"id":7,"message","hello world"} batch item: 8 offset:8 data: {"id":8,"message","hello world"} batch item: 9 offset:9 data: {"id":9,"message","hello world"} reading batch starting at offset 10 reached end of log records received in batch: 5 batch item: 0 offset:10 data: {"id":10,"message","hello world"} batch item: 1 offset:11 data: {"id":11,"message","hello world"} batch item: 2 offset:12 data: {"id":12,"message","hello world"} batch item: 3 offset:13 data: {"id":13,"message","hello world"} batch item: 4 offset:14 data: {"id":14,"message","hello world"}
Example (Stream) ¶
package main import ( "context" "errors" "fmt" "os" "time" "golang.org/x/sync/errgroup" "github.com/embano1/memlog" ) func main() { // showing some custom options in action const ( logStart = 10 logSize = 100 writeRecords = 10 ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() opts := []memlog.Option{ memlog.WithStartOffset(logStart), memlog.WithMaxSegmentSize(logSize), } l, err := memlog.New(ctx, opts...) if err != nil { fmt.Printf("create log: %v", err) os.Exit(1) } // write some records (offsets 10-14) for i := 0; i < writeRecords/2; i++ { d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart) _, err = l.Write(ctx, []byte(d)) if err != nil { fmt.Printf("write: %v", err) os.Exit(1) } } eg, egCtx := errgroup.WithContext(ctx) _, latest := l.Range(egCtx) // stream records eg.Go(func() error { // start stream from latest (offset 14) stream := l.Stream(egCtx, latest) for { if r, ok := stream.Next(); ok { fmt.Printf("Record at offset %d says %q\n", r.Metadata.Offset, r.Data) continue } break } return stream.Err() }) // continue writing while streaming eg.Go(func() error { for i := writeRecords / 2; i < writeRecords; i++ { d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart) _, err := l.Write(ctx, []byte(d)) if err != nil && !errors.Is(err, context.Canceled) { return err } } return nil }) // simulate SIGTERM after 2s eg.Go(func() error { time.Sleep(time.Second * 2) cancel() return nil }) if err = eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { fmt.Printf("run example: %v", err) os.Exit(1) } }
Output: Record at offset 14 says "{\"id\":14,\"message\",\"hello world\"}" Record at offset 15 says "{\"id\":15,\"message\",\"hello world\"}" Record at offset 16 says "{\"id\":16,\"message\",\"hello world\"}" Record at offset 17 says "{\"id\":17,\"message\",\"hello world\"}" Record at offset 18 says "{\"id\":18,\"message\",\"hello world\"}" Record at offset 19 says "{\"id\":19,\"message\",\"hello world\"}"
Index ¶
- Constants
- Variables
- type Header
- type Log
- func (l *Log) Range(_ context.Context) (earliest, latest Offset)
- func (l *Log) Read(ctx context.Context, offset Offset) (Record, error)
- func (l *Log) ReadBatch(ctx context.Context, offset Offset, batch []Record) (int, error)
- func (l *Log) Stream(ctx context.Context, start Offset) Stream
- func (l *Log) Write(ctx context.Context, data []byte) (Offset, error)
- type Offset
- type Option
- type Record
- type Stream
Examples ¶
Constants ¶
const ( // DefaultStartOffset is the start offset of the log DefaultStartOffset = Offset(0) // DefaultSegmentSize is the segment size, i.e. number of offsets, in the log DefaultSegmentSize = 1024 // DefaultMaxRecordDataBytes is the maximum data (payload) size of a record DefaultMaxRecordDataBytes = 1024 << 10 // 1MiB )
Variables ¶
var ( // ErrRecordTooLarge is returned when the record data is larger than the // configured maximum record size ErrRecordTooLarge = errors.New("record data too large") // ErrFutureOffset is returned on reads when the specified offset is in the // future and not written yet ErrFutureOffset = errors.New("future offset") // ErrOutOfRange is returned when the specified offset is invalid for the log // configuration or already purged from history ErrOutOfRange = errors.New("offset out of range") )
Functions ¶
This section is empty.
Types ¶
type Header ¶
type Header struct { // Offset is the record offset relative to the log start Offset Offset `json:"offset,omitempty"` // Created is the UTC timestamp when a record was successfully written to the // log Created time.Time `json:"created"` // UTC }
Header is metadata associated with a record
type Log ¶
type Log struct {
// contains filtered or unexported fields
}
Log is an append-only in-memory data structure storing records. Records are stored and retrieved using unique offsets. The log can be customized during initialization with New() to define a custom start offset, and size limits for the log and individual records.
The log is divided into an active and history segment. When the active segment is full (MaxSegmentSize), it becomes the read-only history segment and a new empty active segment with the same size is created.
The maximum number of records in a log is twice the configured segment size (active + history). When this limit is reached, the history segment is purged, replaced with the current active segment and a new empty active segment is created.
Safe for concurrent use.
func (*Log) Range ¶
Range returns the earliest and latest available record offset in the log. If the log is empty, an invalid offset (-1) for both return values is returned. If the log has been purged one or more times, earliest points to the oldest available record offset in the log, i.e. not the configured start offset.
Note that these values might have changed after retrieval, e.g. due to concurrent writes.
Safe for concurrent use.
func (*Log) Read ¶
Read reads a record from the log at the specified offset. If an error occurs, an invalid (empty) record and the error is returned.
Safe for concurrent use.
func (*Log) ReadBatch ¶ added in v0.3.0
ReadBatch reads multiple records into batch starting at the specified offset. The number of records read into batch and the error, if any, is returned.
ReadBatch will read at most len(batch) records, always starting at batch index 0. ReadBatch stops reading at the end of the log, indicated by ErrFutureOffset.
The caller must expect partial batch results and must not read more records from batch than indicated by the returned number of records. See the example for how to use this API.
Safe for concurrent use.
func (*Log) Stream ¶
Stream returns a stream iterator to stream records, starting at the given start offset. If the start offset is in the future, stream will continuously poll until this offset is written.
Use Stream.Next() to read from the stream. See the example for how to use this API.
The returned stream iterator must only be used within the same goroutine.
type Option ¶
Option customizes a log
func WithMaxRecordDataSize ¶
WithMaxRecordDataSize sets the maximum record data (payload) size in bytes
func WithMaxSegmentSize ¶
WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in a log segment. Must be greater than 0.
func WithStartOffset ¶
WithStartOffset sets the start offset of the log. Must be equal or greater than 0.
type Stream ¶ added in v0.2.0
type Stream struct {
// contains filtered or unexported fields
}
Stream is an iterator to stream records in order from a log. It must only be used within the same goroutine.
func (*Stream) Err ¶ added in v0.2.0
Err returns the first error that has ocurred during streaming. This method should be called to inspect the error that caused stopping the iterator.