immuta

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2026 License: MIT Imports: 9 Imported by: 2

README ΒΆ

β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ•—β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ•—β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ•—β–‘β–‘β–‘β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–‘
β–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ•—β–‘β–ˆβ–ˆβ–ˆβ–ˆβ•‘β–ˆβ–ˆβ–ˆβ–ˆβ•—β–‘β–ˆβ–ˆβ–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β–‘β–‘β–‘β–ˆβ–ˆβ•‘β•šβ•β•β–ˆβ–ˆβ•”β•β•β•β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•—
β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β–ˆβ–ˆβ–ˆβ–ˆβ•”β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•”β–ˆβ–ˆβ–ˆβ–ˆβ•”β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β–‘β–‘β–‘β–ˆβ–ˆβ•‘β–‘β–‘β–‘β–ˆβ–ˆβ•‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•‘
β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β•šβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β•šβ–ˆβ–ˆβ•”β•β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β–‘β–‘β–‘β–ˆβ–ˆβ•‘β–‘β–‘β–‘β–ˆβ–ˆβ•‘β–‘β–‘β–‘β–ˆβ–ˆβ•”β•β•β–ˆβ–ˆβ•‘
β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β–‘β•šβ•β•β–‘β–ˆβ–ˆβ•‘β–ˆβ–ˆβ•‘β–‘β•šβ•β•β–‘β–ˆβ–ˆβ•‘β•šβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•”β•β–‘β–‘β–‘β–ˆβ–ˆβ•‘β–‘β–‘β–‘β–ˆβ–ˆβ•‘β–‘β–‘β–ˆβ–ˆβ•‘
β•šβ•β•β•šβ•β•β–‘β–‘β–‘β–‘β–‘β•šβ•β•β•šβ•β•β–‘β–‘β–‘β–‘β–‘β•šβ•β•β–‘β•šβ•β•β•β•β•β•β–‘β–‘β–‘β–‘β•šβ•β•β–‘β–‘β–‘β•šβ•β•β–‘β–‘β•šβ•β•

Go Reference Go Report Card

Immuta is a high-performance append-only log implementation based on a single writer, multiple readers architecture. It uses the filesystem as its storage backend and solid for signaling.

Features

  • Single Writer, Multiple Readers - Optimized for append-only workloads with concurrent read access
  • Namespace Isolation - Data is isolated in separate files per namespace
  • Pluggable Transformers - Support for compression, encryption, or any custom data transformation
  • Chainable Transformers - Multiple transformations can be chained together

File Format

+----------+----------+----------+---------------+----------+---------------+
|          |          |          |               |          |               |
| MESSAGES |  LAST    |  PAYLOAD |    PAYLOAD    |  PAYLOAD |    PAYLOAD    | ...
|   COUNT  |  INDEX   |   SIZE   |               |   SIZE   |               |
+----------+----------+----------+---------------+----------+---------------+
   8 bytes   8 bytes    8 bytes                    8 bytes
            (Header)             (Record 1)                  (Record 2)

Installation

go get ella.to/immuta@v0.2.3

Quick Start

package main

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"ella.to/immuta"
)

func main() {
	// Create storage with a namespace
	log, err := immuta.New(
		immuta.WithLogsDirPath("./log-data"),
		immuta.WithReaderCount(10),      // Pool of file descriptors for readers
		immuta.WithFastWrite(true),      // Use buffered writes (faster but less durable)
		immuta.WithNamespaces("events"), // Create a namespace called "events"
	)
	if err != nil {
		panic(err)
	}
	defer log.Close()

	// Write data
	content := []byte("hello world")
	index, size, err := log.Append(context.Background(), "events", bytes.NewReader(content))
	if err != nil {
		panic(err)
	}
	fmt.Printf("Written at index %d, size %d bytes\n", index, size)

	// Read data using a stream
	stream := log.Stream(context.Background(), "events", 0) // 0 = start from beginning
	defer stream.Done()

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	r, size, err := stream.Next(ctx)
	if err != nil {
		if errors.Is(err, context.DeadlineExceeded) {
			fmt.Println("No more messages")
			return
		}
		panic(err)
	}
	defer r.Done() // Important: release the file descriptor

	var buf bytes.Buffer
	io.Copy(&buf, r)
	fmt.Printf("Read: %s (size: %d)\n", buf.String(), size)
}

Transactional Appends (Save) πŸ”’

Immuta provides a transactional pattern for appends using the Save method. Each append is staged until committed β€” callers should defer a call to Save with a pointer to the returning err so the storage can either commit (on success) or roll back (on error).

Example:

func write(ctx context.Context, log *immuta.Storage) (err error) {
    // Defer the Save call immediately so it can commit or rollback based on the named return error.
    defer log.Save("events", &err)

    // Perform the append; Save will commit when this function returns with err == nil
    _, _, err = log.Append(ctx, "events", bytes.NewReader([]byte("some data")))
    if err != nil {
        return err
    }
    return nil
}

Notes:

  • Call defer log.Save(namespace, &err) immediately after you start the operation that performs appends.
  • On error, Save will truncate the log file back to the previous state.
  • On success, Save updates the log header and notifies readers of the new messages.

Configuration Options

Option Description Default
WithLogsDirPath(path) Directory for log files "./logs"
WithReaderCount(n) Number of pooled file descriptors for concurrent reads 5
WithFastWrite(bool) Use buffered writes (faster) or sync writes (more durable) true
WithNamespaces(names...) Create one or more namespaces Required
WithWriteTransform(t) Transform data before writing nil
WithReadTransform(t) Transform data after reading nil

Stream Positioning

When creating a stream, the startPos parameter controls where reading begins:

// Start from the beginning (read all messages)
stream := log.Stream(ctx, "events", 0)

// Start from the latest (only new messages)
stream := log.Stream(ctx, "events", -1)

// Skip the first N messages
stream := log.Stream(ctx, "events", 10) // Skip first 10 messages

Data Transformers

Transformers allow you to modify data as it's written or read. Common use cases include compression and encryption.

Transformer Type

type Transformer func(r io.Reader) (io.Reader, error)

Compression Example

import (
	"bytes"
	"compress/flate"
	"io"
	"ella.to/immuta"
)

// Compress transforms data by compressing it
func Compress(level int) immuta.Transformer {
	return func(r io.Reader) (io.Reader, error) {
		var buf bytes.Buffer
		w, err := flate.NewWriter(&buf, level)
		if err != nil {
			return nil, err
		}
		if _, err := io.Copy(w, r); err != nil {
			w.Close()
			return nil, err
		}
		if err := w.Close(); err != nil {
			return nil, err
		}
		return &buf, nil
	}
}

// Decompress transforms data by decompressing it
func Decompress() immuta.Transformer {
	return func(r io.Reader) (io.Reader, error) {
		return flate.NewReader(r), nil
	}
}

func main() {
	log, _ := immuta.New(
		immuta.WithLogsDirPath("./logs"),
		immuta.WithNamespaces("compressed"),
		immuta.WithWriteTransform(Compress(flate.BestSpeed)),
		immuta.WithReadTransform(Decompress()),
	)
	defer log.Close()
	
	// Data is automatically compressed on write and decompressed on read
}

Chaining Transformers

Multiple transformers can be chained together. They are applied in order:

// Chain compression and then encryption on write
writeChain := immuta.ChainTransformers(
	Compress(flate.BestSpeed),
	Encrypt(key),
)

// Chain decryption and then decompression on read (reverse order)
readChain := immuta.ChainTransformers(
	Decrypt(key),
	Decompress(),
)

log, _ := immuta.New(
	immuta.WithLogsDirPath("./logs"),
	immuta.WithNamespaces("secure"),
	immuta.WithWriteTransform(writeChain),
	immuta.WithReadTransform(readChain),
)

Using Third-Party Libraries

You can use any compression or encryption library by wrapping it in a Transformer:

import "github.com/klauspost/compress/s2"

func S2Compress() immuta.Transformer {
	return func(r io.Reader) (io.Reader, error) {
		var buf bytes.Buffer
		w := s2.NewWriter(&buf)
		if _, err := io.Copy(w, r); err != nil {
			w.Close()
			return nil, err
		}
		if err := w.Close(); err != nil {
			return nil, err
		}
		return &buf, nil
	}
}

func S2Decompress() immuta.Transformer {
	return func(r io.Reader) (io.Reader, error) {
		return s2.NewReader(r), nil
	}
}

Error Handling

Immuta provides specific error types:

var (
	ErrNamespaceRequired = errors.New("namespace is required")
	ErrNamesapceNotFound = errors.New("namespace not found")
	ErrStorageClosed     = errors.New("storage is closed")
)

When storage is closed, any blocked stream.Next() calls will unblock and return ErrStorageClosed.

Complete Example

package main

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"sync"
	"time"

	"ella.to/immuta"
)

func main() {
	log, err := immuta.New(
		immuta.WithLogsDirPath("./log-data"),
		immuta.WithReaderCount(10),
		immuta.WithFastWrite(true),
		immuta.WithNamespaces("events"),
	)
	if err != nil {
		panic(err)
	}
	defer log.Close()

	var wg sync.WaitGroup

	// Writer goroutine
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 100; i++ {
			msg := fmt.Sprintf("message %d", i)
			_, _, err := log.Append(context.Background(), "events", bytes.NewReader([]byte(msg)))
			if err != nil {
				fmt.Printf("Write error: %v\n", err)
				return
			}
		}
	}()

	// Reader goroutine
	wg.Add(1)
	go func() {
		defer wg.Done()
		stream := log.Stream(context.Background(), "events", 0)
		defer stream.Done()

		count := 0
		for count < 100 {
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			r, _, err := stream.Next(ctx)
			cancel()

			if errors.Is(err, context.DeadlineExceeded) {
				break
			}
			if err != nil {
				fmt.Printf("Read error: %v\n", err)
				return
			}

			var buf bytes.Buffer
			io.Copy(&buf, r)
			r.Done()

			count++
		}
		fmt.Printf("Read %d messages\n", count)
	}()

	wg.Wait()
}

Performance

Benchmarks on Apple M2 Pro:

go test -bench=. -benchmem

Write Performance

Message Size Throughput Allocations
100 bytes 18.30 MB/s 2 allocs/op
1 KB 162.09 MB/s 2 allocs/op
4 KB 421.89 MB/s 2 allocs/op
64 KB 922.17 MB/s 2 allocs/op

Read Performance

Message Size Throughput Allocations
1 KB 800.07 MB/s 5 allocs/op

Bulk Operations

  • Writing 100k records of 1KB: ~1.2 seconds
  • Reading 100k records of 1KB: ~335ms
# Run benchmarks
go test -bench=. -benchmem

# Run with race detector
go test -race ./...

Thread Safety

  • Append: Should be called from a single goroutine (not safe for concurrent writes)
  • Stream: Safe for concurrent use; multiple streams can read simultaneously
  • Close: Safe to call multiple times; properly unblocks waiting streams

License

MIT License - see LICENSE for details.

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

View Source
const (
	FileHeaderSize   = 8 + 8 // total number of messages + index of the last message
	RecordHeaderSize = 8     // size of the content of the message
)

Variables ΒΆ

View Source
var (
	ErrNamespaceRequired = errors.New("namespace is required")
	ErrNamesapceNotFound = errors.New("namespace not found")
	ErrStorageClosed     = errors.New("storage is closed")
	ErrReaderCountIsZero = errors.New("reader count must be greater than zero")
)

Functions ΒΆ

This section is empty.

Types ΒΆ

type Appender ΒΆ

type Appender interface {
	// Append writes the content of the reader to the storage medium.
	// and returns the index and size of the content written.
	Append(ctx context.Context, r io.Reader) (index int64, size int64, err error)
	// Save implements the transactional commit/rollback for the most recent Append.
	// Should be deferred by the caller as: defer storage.Save(&err)
	Save(err *error)
}

Appender is a single method interface that writes the content of the reader to the storage medium.

type OptionFunc ΒΆ added in v0.0.4

type OptionFunc func(*options) error

func WithFastWrite ΒΆ

func WithFastWrite(fast bool) OptionFunc

func WithLogsDirPath ΒΆ added in v0.0.4

func WithLogsDirPath(path string) OptionFunc

func WithNamespaces ΒΆ added in v0.0.4

func WithNamespaces(namespaces ...string) OptionFunc

func WithReadTransform ΒΆ added in v0.1.0

func WithReadTransform(t Transformer) OptionFunc

WithReadTransform sets a transformer to be applied when reading data. This can be used for decompression, decryption, etc.

func WithReaderCount ΒΆ added in v0.0.4

func WithReaderCount(count int) OptionFunc

func WithWriteTransform ΒΆ added in v0.1.0

func WithWriteTransform(t Transformer) OptionFunc

WithWriteTransform sets a transformer to be applied when writing data. This can be used for compression, encryption, etc.

type Reader ΒΆ added in v0.0.3

type Reader struct {
	// contains filtered or unexported fields
}

func (*Reader) Done ΒΆ added in v0.0.3

func (r *Reader) Done()

func (*Reader) Read ΒΆ added in v0.0.3

func (r *Reader) Read(p []byte) (n int, err error)

type Storage ΒΆ

type Storage struct {
	// contains filtered or unexported fields
}

func New ΒΆ

func New(optFns ...OptionFunc) (*Storage, error)

func (*Storage) Append ΒΆ

func (s *Storage) Append(ctx context.Context, namespace string, r io.Reader) (index int64, size int64, err error)

func (*Storage) Close ΒΆ

func (s *Storage) Close() error

func (*Storage) Details ΒΆ added in v0.0.3

func (s *Storage) Details(ctx context.Context, namespace string) (string, error)

func (*Storage) Save ΒΆ added in v0.2.0

func (s *Storage) Save(namespace string, err *error)

Save performs a transactional commit/rollback for the most recent Append on the specified namespace. Intended to be used as: defer s.Save(namespace, &err)

func (*Storage) Stream ΒΆ

func (s *Storage) Stream(ctx context.Context, namespace string, startPos int64) Stream

func (*Storage) Verify ΒΆ added in v0.0.3

func (s *Storage) Verify(ctx context.Context, namespace string) error

type Stream ΒΆ

type Stream interface {
	// Creates a io.Reader and provide the size of the content ahead of time.
	// If there is no more content to read, it will blocked until there is more content or the context is done.
	Next(ctx context.Context) (r *Reader, size int64, err error)
	// Done should be called to release the reader.
	// the best practice is once an stream is created successfully, call Done in defer.
	Done()
}

Stream is an interface that deals with reading from the storage medium.

type Transformer ΒΆ added in v0.1.0

type Transformer func(r io.Reader) (io.Reader, error)

Transformer transforms a reader into another reader. This can be used for compression, encryption, or any other data transformation. Multiple transformers can be chained together.

func ChainTransformers ΒΆ added in v0.1.0

func ChainTransformers(transformers ...Transformer) Transformer

ChainTransformers chains multiple transformers together. Transformers are applied in order.

Directories ΒΆ

Path Synopsis
examples
basic command
compression command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL