cfsread

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: MIT Imports: 13 Imported by: 0

README

cfsread

Go Reference Go Version Go Report Card CI Release codecov

Cached file reader with transparent decompression and singleflight coalescing for Go.

What it does

cfsread reads files from any io/fs.FS source, transparently decompresses them based on magic-byte detection, and caches results in a bounded LRU cache. Concurrent reads of the same file are coalesced via singleflight so only one goroutine performs I/O.

Designed for high-throughput, read-mostly workloads — static asset serving, configuration loading, template rendering — where the same files are read repeatedly.

Installation

go get github.com/lbe/cfsread

Quick start

package main

import (
    "context"
    "embed"
    "fmt"
    "io/fs"

    "github.com/lbe/cfsread"
    "github.com/lbe/cfsread/decompress"
    "github.com/lbe/cfsread/decompress/lz4"
)

//go:embed static/*
var embedded embed.FS

func main() {
    sub, err := fs.Sub(embedded, "static")
    if err != nil {
        panic(err)
    }
    src := cfsread.Source{ID: "app", FS: sub}

    // Register decompressors (optional — plain files pass through).
    reg := decompress.NewRegistry()
    if err := reg.Register(lz4.New()); err != nil {
        panic(err)
    }

    r := cfsread.New(cfsread.Options{
        MaxEntries: 1000,
        Registry:   reg,
    })
    defer r.Close()

    data, err := r.Read(context.Background(), src, "index.html")
    if err != nil {
        panic(err)
    }
    fmt.Println(string(data))
}

Sources

A Source pairs an io/fs.FS with a stable string ID used for cache namespacing. The ID must be non-empty and unique per logical filesystem.

// embed.FS — construct directly.
src := cfsread.Source{ID: "embed", FS: myEmbedFS}

// os.Root — confined to a directory, rejects traversal and symlink escapes.
src, closer, err := cfsread.NewRootSource("disk", "/var/data")
if err != nil {
    // handle error
}
defer closer.Close()

Cache configuration

r := cfsread.New(cfsread.Options{
    MaxBytes:   1 << 20,            // entries > 1 MB bypass the cache
    MaxEntries: 500,                // cap at 500 cached items
    MaxIdleAge: 10 * time.Minute,   // evict entries idle > 10 min
})

All bounds are optional. Zero means unlimited.

Invalidation

// Remove one cached entry.
r.Invalidate("disk", "config.yaml")

// Remove all entries for a filesystem.
r.InvalidateFS("disk")

Observability

Implement the Metrics and/or Logger interfaces to observe cache behaviour:

type Metrics interface {
    IncCacheHit()
    IncCacheMiss()
    IncCacheBypass()
    IncEviction(reason EvictionReason)
    ObserveDecompress(name string, inBytes, outBytes int64, d time.Duration)
    ObserveRead(d time.Duration)
}

type Logger interface {
    Debugf(format string, args ...any)
    Infof(format string, args ...any)
}

Pass them via Options.Metrics and Options.Logger. No-op implementations (NopMetrics, NopLogger) are used by default.

Plugin authoring

Implement the decompress.Decompressor interface and register it:

package zstd

type Decompressor struct{}

func (Decompressor) Name() string    { return "zstd" }
func (Decompressor) Magic() [][]byte { return [][]byte{{0x28, 0xB5, 0x2F, 0xFD}} }
func (Decompressor) Decompress(dst, src []byte) ([]byte, error) {
    // decompress src into dst, reusing dst's capacity if possible
    // ...
}

// Registration:
reg := decompress.NewRegistry()
if err := reg.Register(zstd.Decompressor{}); err != nil {
    // handle validation error (nil decompressor, empty/duplicate magic, etc.)
}

Requirements:

  • Implementations must be safe for concurrent use.
  • Magic must return at least one non-empty byte sequence.
  • If two decompressors share a magic prefix, Register returns an error.
  • Magic bytes are defensively copied at registration — mutation after Register has no effect.

Error handling

Read returns typed sentinel errors for validation failures, supporting errors.Is:

data, err := r.Read(ctx, src, name)
switch {
case errors.Is(err, cfsread.ErrReaderClosed):
    // reader was closed
case errors.Is(err, cfsread.ErrEmptySourceID):
    // Source.ID was empty
case errors.Is(err, cfsread.ErrNilSourceFS):
    // Source.FS was nil
}

The decompress package also exports sentinel errors for registration validation: ErrNilDecompressor, ErrNoMagic, ErrEmptyMagic, ErrDuplicateMagic.

Performance characteristics

  • Zero-alloc cache hits: Once a file is cached, Read returns the cached byte slice without allocation. The caller must not mutate the returned slice.
  • Singleflight coalescing: Concurrent reads of the same (source ID, path) pair share a single in-flight I/O + decompression call. Other goroutines await the result.
  • Lazy eviction: Idle-age eviction is checked on access rather than via a background timer, avoiding goroutine overhead.
  • Read buffer preallocation: Stat size is used to preallocate the read buffer, avoiding incremental growth on cache misses.

API overview

Type / Function Description
Reader Core cached reader (thread-safe)
Reader.Read(ctx, src, name) Read a file, using cache and decompression
Reader.Invalidate(fsName, path) Remove one cached entry
Reader.InvalidateFS(fsName) Remove all entries for a filesystem
Reader.Close() Mark reader as closed
Source io/fs.FS + string ID pair
NewRootSource(id, dir) Confined os.Root source
Options Cache bounds, registry, metrics, logger
decompress.Registry Magic-byte dispatch table
decompress.Decompressor Plugin interface
lz4.Decompressor Official LZ4 frame-format plugin
Metrics / Logger Observation interfaces

Pre-compressing assets with cfsread-lz4

The cfsread-lz4 command recursively compresses files in a directory using LZ4 frame format, preserving file names. This is intended for use as a go:generate directive in consuming applications:

//go:generate go run github.com/lbe/cfsread/cmd/cfsread-lz4 ./assets

When the LZ4 decompressor is registered in the Reader, these files are transparently decompressed on read — no special handling needed in application code. Files already compressed in a known format (gzip, bzip2, xz, zstd, zip, etc.) are skipped automatically.

See cmd/cfsread-lz4/README.md for full documentation.

Architecture

See docs/ARCHITECTURE.md for package layout, data flow diagrams, and internal design.

Documentation

Overview

Package cfsread provides a cached file reader with transparent decompression and singleflight coalescing for concurrent access.

Purpose

cfsread reads files from filesystem sources (embed.FS, os.Root, fstest.MapFS, or any io/fs.FS), transparently decompresses them based on magic-byte detection, and caches the results in a bounded LRU cache. It is designed for high-throughput read-mostly workloads where the same files are read repeatedly — for example, serving static assets or loading configuration templates.

Concurrency Model

All Reader methods are safe for concurrent use. A singleflight.Group coalesces concurrent reads of the same (source ID, path) pair so that only one goroutine performs the underlying filesystem I/O and decompression; the rest wait for and share the result. This eliminates thundering-herd behaviour under load.

Cache and Eviction

The cache is an LRU with three optional bounds:

  • MaxBytes: per-entry size limit; entries exceeding this bypass the cache.
  • MaxEntries: maximum number of cached items.
  • MaxIdleAge: maximum idle time for an entry before it becomes eligible for eviction during the next access (lazy sweep).

Evictions are reported through the Metrics interface.

Decompression Plugins

Decompression is pluggable. A codec implements the decompress.Decompressor interface (Name, Magic, Decompress) and is registered via decompress.Registry. Matching is based on leading magic bytes; the longest match wins. An official LZ4 frame-format plugin is provided in the github.com/lbe/cfsread/decompress/lz4 sub-package.

Context Support

The Read method accepts a context.Context. Cancellation is checked before work begins and again after the file has been read into memory (before decompression output is used). An already-cancelled context returns immediately. However, once I/O or decompression has started, the operation runs to completion even if the context is cancelled mid-flight.

Cache Invalidation

Two invalidation methods are provided:

  • Invalidate(fsName, path) removes a single cached entry.
  • InvalidateFS(fsName) removes all entries for a given filesystem.

Both are useful when the underlying files change externally and the cache must be selectively refreshed.

Package cfsread provides tools for reading and decompressing files from filesystem sources.

Example (Basic)
package main

import (
	"context"
	"embed"
	"fmt"
	"io/fs"
	"os"

	"github.com/lbe/cfsread"
)

//go:embed testdata/*
var exampleEmbedded embed.FS

func exampleEmbedSub() (fs.FS, error) {
	return fs.Sub(exampleEmbedded, "testdata")
}

func main() {
	sub, err := exampleEmbedSub()
	if err != nil {
		fmt.Fprintf(os.Stderr, "embed sub: %v\n", err)
		return
	}
	src := cfsread.Source{ID: "embed", FS: sub}
	r := cfsread.New(cfsread.Options{})

	data, err := r.Read(context.Background(), src, "hello.txt")
	if err != nil {
		fmt.Fprintf(os.Stderr, "read: %v\n", err)
		return
	}
	fmt.Printf("%s", data)

	// Subsequent reads are served from the LRU cache (zero alloc).
	if _, err := r.Read(context.Background(), src, "hello.txt"); err != nil {
		fmt.Fprintf(os.Stderr, "cached read: %v\n", err)
		return
	}

}
Output:
hello world
Example (Invalidation)
package main

import (
	"context"
	"fmt"
	"io"
	"io/fs"
	"time"

	"github.com/lbe/cfsread"
)

func main() {
	// Use an in-memory filesystem so we can mutate it.
	memFS := &mutableMapFS{files: map[string][]byte{
		"config.txt": []byte("v1"),
	}}
	src := cfsread.Source{ID: "mem", FS: memFS}
	r := cfsread.New(cfsread.Options{})

	data, _ := r.Read(context.Background(), src, "config.txt")
	fmt.Printf("first read: %s\n", data)

	// External change — update the underlying file.
	memFS.files["config.txt"] = []byte("v2")

	// Cache still holds the old data.
	data, _ = r.Read(context.Background(), src, "config.txt")
	fmt.Printf("cached: %s\n", data)

	// Invalidate the stale entry and re-read.
	r.Invalidate("mem", "config.txt")
	data, _ = r.Read(context.Background(), src, "config.txt")
	fmt.Printf("refreshed: %s\n", data)

}

// mutableMapFS is a minimal writable fs.FS for Example_invalidation.
type mutableMapFS struct {
	files map[string][]byte
}

func (m *mutableMapFS) Open(name string) (fs.File, error) {
	data, ok := m.files[name]
	if !ok {
		return nil, &fs.PathError{Op: "open", Path: name, Err: fs.ErrNotExist}
	}
	return &byteReader{name: name, data: data}, nil
}

// byteReader implements fs.File backed by a []byte slice.
type byteReader struct {
	name string
	data []byte
	off  int
}

func (f *byteReader) Stat() (fs.FileInfo, error) {
	return &memFileInfo{name: f.name, size: int64(len(f.data))}, nil
}

func (f *byteReader) Read(b []byte) (int, error) {
	if f.off >= len(f.data) {
		return 0, io.EOF
	}
	n := copy(b, f.data[f.off:])
	f.off += n
	return n, nil
}

func (f *byteReader) Close() error { return nil }

// memFileInfo implements fs.FileInfo for in-memory files.
type memFileInfo struct {
	name string
	size int64
}

func (i *memFileInfo) Name() string       { return i.name }
func (i *memFileInfo) Size() int64        { return i.size }
func (i *memFileInfo) Mode() fs.FileMode  { return 0o444 }
func (i *memFileInfo) ModTime() time.Time { return time.Time{} }
func (i *memFileInfo) IsDir() bool        { return false }
func (i *memFileInfo) Sys() any           { return nil }
Output:
first read: v1
cached: v1
refreshed: v2
Example (RootSource)
package main

import (
	"context"
	"fmt"
	"os"
	"path/filepath"

	"github.com/lbe/cfsread"
)

func main() {
	// Create a temporary directory with a file.
	dir, err := os.MkdirTemp("", "cfsread-example")
	if err != nil {
		fmt.Fprintf(os.Stderr, "temp dir: %v\n", err)
		return
	}
	defer func() { _ = os.RemoveAll(dir) }()

	if err := os.WriteFile(filepath.Join(dir, "msg.txt"), []byte("from root\n"), 0o600); err != nil {
		fmt.Fprintf(os.Stderr, "write: %v\n", err)
		return
	}

	// NewRootSource confines reads to dir — path traversal and symlink
	// escapes are rejected by the operating system.
	src, closer, err := cfsread.NewRootSource("app-data", dir)
	if err != nil {
		fmt.Fprintf(os.Stderr, "root source: %v\n", err)
		return
	}
	defer func() { _ = closer.Close() }()

	r := cfsread.New(cfsread.Options{})
	data, err := r.Read(context.Background(), src, "msg.txt")
	if err != nil {
		fmt.Fprintf(os.Stderr, "read: %v\n", err)
		return
	}
	fmt.Printf("%s", data)

}
Output:
from root
Example (WithLZ4)
package main

import (
	"bytes"
	"context"
	"fmt"
	"os"
	"path/filepath"

	lz4pkg "github.com/pierrec/lz4/v4"

	"github.com/lbe/cfsread"
	"github.com/lbe/cfsread/decompress"
	"github.com/lbe/cfsread/decompress/lz4"
)

func main() {
	// Create a temporary filesystem with an LZ4-compressed file.
	dir, err := os.MkdirTemp("", "cfsread-example")
	if err != nil {
		fmt.Fprintf(os.Stderr, "temp dir: %v\n", err)
		return
	}
	defer func() { _ = os.RemoveAll(dir) }()

	// Compress data using the LZ4 frame format.
	plain := []byte("decompressed content")
	var buf bytes.Buffer
	w := lz4pkg.NewWriter(&buf)
	if _, err := w.Write(plain); err != nil {
		fmt.Fprintf(os.Stderr, "lz4 write: %v\n", err)
		return
	}
	if err := w.Close(); err != nil {
		fmt.Fprintf(os.Stderr, "lz4 close: %v\n", err)
		return
	}
	compressed := buf.Bytes()

	if err := os.WriteFile(filepath.Join(dir, "data.lz4"), compressed, 0o600); err != nil {
		fmt.Fprintf(os.Stderr, "write: %v\n", err)
		return
	}

	// Register the LZ4 decompressor and read.
	reg := decompress.NewRegistry()
	if err := reg.Register(lz4.New()); err != nil {
		fmt.Fprintf(os.Stderr, "register: %v\n", err)
		return
	}

	src, closer, err := cfsread.NewRootSource("tmp", dir)
	if err != nil {
		fmt.Fprintf(os.Stderr, "root source: %v\n", err)
		return
	}
	defer func() { _ = closer.Close() }()

	r := cfsread.New(cfsread.Options{Registry: reg})
	data, err := r.Read(context.Background(), src, "data.lz4")
	if err != nil {
		fmt.Fprintf(os.Stderr, "read: %v\n", err)
		return
	}
	fmt.Printf("%s", data)

}
Output:
decompressed content

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrReaderClosed is returned by Read after Close has been called.
	ErrReaderClosed = errors.New("cfsread: reader closed")
	// ErrInvalidSource is returned by Read when the Source is not usable.
	ErrInvalidSource = errors.New("cfsread: invalid source")
	// ErrEmptySourceID is returned by Read when Source.ID is empty.
	ErrEmptySourceID = errors.New("cfsread: empty source id")
	// ErrNilSourceFS is returned by Read when Source.FS is nil.
	ErrNilSourceFS = errors.New("cfsread: nil source fs")
)

Sentinel errors returned by Reader methods.

Functions

This section is empty.

Types

type EvictionReason

type EvictionReason int

EvictionReason describes why a cache entry was removed.

const (
	// EvictionSize indicates the entry exceeded the size limit.
	EvictionSize EvictionReason = iota
	// EvictionCount indicates the cache entry count limit was reached.
	EvictionCount
	// EvictionIdle indicates the entry was idle too long.
	EvictionIdle
	// EvictionManual indicates an explicit eviction.
	EvictionManual
)

type Logger

type Logger interface {
	// Debugf logs a debug-level message with printf-style formatting.
	Debugf(format string, args ...any)
	// Infof logs an info-level message with printf-style formatting.
	Infof(format string, args ...any)
}

Logger outputs informational messages. Implement this interface to capture debug and info messages from the Reader. Pass an instance via Options.Logger; if nil, NopLogger is used.

type Metrics

type Metrics interface {
	// IncCacheHit increments the counter for cache hits.
	IncCacheHit()
	// IncCacheMiss increments the counter for cache misses that trigger I/O.
	IncCacheMiss()
	// IncCacheBypass increments the counter for entries that skip the cache
	// because they exceed MaxBytes.
	IncCacheBypass()
	// IncEviction records an eviction with the given reason.
	IncEviction(reason EvictionReason)
	// ObserveDecompress records a completed decompression: the codec name,
	// compressed and decompressed sizes, and wall-clock duration.
	ObserveDecompress(name string, inBytes, outBytes int64, d time.Duration)
	// ObserveRead records the wall-clock duration of a full cache-miss read
	// (open + read + decompress + cache store).
	ObserveRead(d time.Duration)
}

Metrics records cache and decompression events. All methods must be safe for concurrent use. Implement this interface to observe cache behaviour for monitoring or alerting. Pass an instance via Options.Metrics; if nil, NopMetrics is used.

type NopLogger

type NopLogger struct{}

NopLogger is a no-op Logger implementation.

func (NopLogger) Debugf

func (NopLogger) Debugf(string, ...any)

Debugf logs a debug-level message.

func (NopLogger) Infof

func (NopLogger) Infof(string, ...any)

Infof logs an info-level message.

type NopMetrics

type NopMetrics struct{}

NopMetrics is a no-op Metrics implementation.

func (NopMetrics) IncCacheBypass

func (NopMetrics) IncCacheBypass()

IncCacheBypass increments the cache bypass counter.

func (NopMetrics) IncCacheHit

func (NopMetrics) IncCacheHit()

IncCacheHit increments the cache hit counter.

func (NopMetrics) IncCacheMiss

func (NopMetrics) IncCacheMiss()

IncCacheMiss increments the cache miss counter.

func (NopMetrics) IncEviction

func (NopMetrics) IncEviction(EvictionReason)

IncEviction increments the eviction counter for the given reason.

func (NopMetrics) ObserveDecompress

func (NopMetrics) ObserveDecompress(string, int64, int64, time.Duration)

ObserveDecompress records a decompression observation.

func (NopMetrics) ObserveRead

func (NopMetrics) ObserveRead(time.Duration)

ObserveRead records a read latency observation.

type Options

type Options struct {
	// MaxBytes sets the per-entry byte limit; entries larger than this bypass
	// the cache. Zero means unlimited.
	MaxBytes int64
	// MaxEntries caps the number of cached items. Zero means unlimited.
	MaxEntries int
	// MaxIdleAge evicts entries idle longer than this duration. Zero means no
	// idle eviction.
	MaxIdleAge time.Duration
	// Registry supplies decompressors. A new empty registry is used if nil.
	Registry *decompress.Registry
	// Metrics receives cache and decompression observations. NopMetrics is
	// used if nil.
	Metrics Metrics
	// Logger receives debug/info messages. NopLogger is used if nil.
	Logger Logger
}

Options configures a Reader.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads files from filesystem sources with magic-dispatch decompression, an LRU byte cache, and singleflight coalescing for concurrent reads.

func New

func New(opts Options) *Reader

New creates a Reader with the given options.

func (*Reader) Close

func (r *Reader) Close() error

Close marks the reader as closed. Subsequent Read calls return an error. It is safe to call Close more than once.

func (*Reader) Invalidate

func (r *Reader) Invalidate(fsName, path string)

Invalidate removes a single cached entry identified by fsName (the Source.ID) and path. Does nothing if the entry does not exist.

func (*Reader) InvalidateFS

func (r *Reader) InvalidateFS(fsName string)

InvalidateFS removes all cached entries whose Source.ID matches fsName. Use this when an entire filesystem's contents have been refreshed.

func (*Reader) Read

func (r *Reader) Read(ctx context.Context, src Source, name string) ([]byte, error)

Read returns the (possibly decompressed) contents of name within src. The returned byte slice is owned by the cache and must not be mutated.

On cache hit, the slice is returned directly with zero allocations. On cache miss, the file is opened, read, optionally decompressed via magic-byte dispatch, and stored in the cache before being returned. Concurrent reads of the same (src.ID, name) are coalesced via singleflight so only one goroutine performs I/O.

Returns an error if the Reader is closed, the context is cancelled, the file cannot be opened, or decompression fails.

type Source

type Source struct {
	// ID is the cache namespace for this source. Must be non-empty and unique
	// per logical filesystem if cache isolation is desired.
	ID string
	// FS is the underlying filesystem. Any io/fs.FS implementation is accepted.
	FS fs.FS
}

Source pairs an io/fs.FS with a stable identity used for cache namespacing. Two Sources with the same ID share cache entries; distinct IDs are isolated.

Use a Source directly with an embed.FS or fstest.MapFS:

src := cfsread.Source{ID: "assets", FS: myEmbedFS}

For filesystem-confined access, use NewRootSource.

func NewRootSource

func NewRootSource(id, dir string) (Source, io.Closer, error)

NewRootSource opens dir as an *os.Root and returns a Source whose FS is confined to dir. Path traversal ("../") and symlink escapes are rejected by the OS. The returned io.Closer must be called when the Source is no longer needed to release the underlying file descriptor.

Returns an error if id is empty or dir does not exist.

Directories

Path Synopsis
cmd
cfsread-lz4 command
cfsread-lz4 recursively compresses files in a directory using LZ4 frame format.
cfsread-lz4 recursively compresses files in a directory using LZ4 frame format.
Package decompress defines a pluggable decompression interface and a registry that dispatches on leading magic bytes.
Package decompress defines a pluggable decompression interface and a registry that dispatches on leading magic bytes.
lz4
Package lz4 provides an LZ4 frame-format decompressor plugin for the decompress registry.
Package lz4 provides an LZ4 frame-format decompressor plugin for the decompress registry.
internal
cache
Package cache provides an in-memory LRU bounded by total byte size, item count, and entry age.
Package cache provides an in-memory LRU bounded by total byte size, item count, and entry age.
leakcheck
Package leakcheck installs a TestMain helper that fails the suite if any goroutine leaks past test completion, using runtime/pprof's goroutineleak profile (Go 1.26+).
Package leakcheck installs a TestMain helper that fails the suite if any goroutine leaks past test completion, using runtime/pprof's goroutineleak profile (Go 1.26+).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL