Documentation
¶
Overview ¶
Package batch provides batch processing for reading multiple entries from a blob archive.
Index ¶
Constants ¶
const ( CompressionNone = blobtype.CompressionNone CompressionZstd = blobtype.CompressionZstd )
Re-export compression constants for use in filesink.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BufferedSink ¶
BufferedSink allows sinks to handle decoded content without copying.
Implementations should not mutate the content slice.
type Committer ¶
type Committer interface {
io.Writer
// Commit finalizes the write, making content available.
// Must be called after successful hash verification.
Commit() error
// Discard aborts the write and cleans up any temporary resources.
// Must be called if verification fails or an error occurs.
Discard() error
}
Committer is a writer that can be committed or discarded.
Implementations should buffer or stage writes until Commit is called. For example, a file-based implementation might write to a temp file and rename it on Commit, or delete it on Discard.
type Compression ¶
type Compression = blobtype.Compression
Compression is an alias for blobtype.Compression.
type FileSink ¶
type FileSink struct {
// contains filtered or unexported fields
}
FileSink writes entries to the filesystem.
By default, files are written to a temporary file in the same directory and renamed to the final path on Commit. This ensures that partially written files are never visible at the final path.
func NewFileSink ¶
func NewFileSink(destDir string, opts ...FileSinkOption) *FileSink
NewFileSink creates a FileSink that writes to destDir.
destDir must be an absolute path or relative to the current directory. Parent directories are created automatically as needed.
func (*FileSink) ShouldProcess ¶
ShouldProcess returns false if the file already exists and overwrite is disabled.
type FileSinkOption ¶
type FileSinkOption func(*FileSink)
FileSinkOption configures a FileSink.
func WithDirectWrites ¶
func WithDirectWrites(enabled bool) FileSinkOption
WithDirectWrites disables temp files and writes directly to the final path.
func WithOverwrite ¶
func WithOverwrite(overwrite bool) FileSinkOption
WithOverwrite allows overwriting existing files. By default, existing files are skipped.
func WithPreserveMode ¶
func WithPreserveMode(preserve bool) FileSinkOption
WithPreserveMode preserves file permission modes from the archive. By default, modes are not preserved (files use umask defaults).
func WithPreserveTimes ¶
func WithPreserveTimes(preserve bool) FileSinkOption
WithPreserveTimes preserves file modification times from the archive. By default, times are not preserved (files use current time).
type ProcessStats ¶ added in v1.1.0
type ProcessStats struct {
// Processed is the number of entries successfully written to the sink.
Processed int
// Skipped is the number of entries skipped (ShouldProcess returned false).
Skipped int
// TotalBytes is the sum of OriginalSize for all processed entries.
TotalBytes uint64
}
ProcessStats contains statistics from a batch processing operation.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor handles batch reading and processing of entries from a blob archive.
It provides efficient reading by grouping adjacent entries and processing them together, minimizing the number of read operations on the underlying source.
func NewProcessor ¶
func NewProcessor(source file.ByteSource, pool *file.DecompressPool, maxFileSize uint64, opts ...ProcessorOption) *Processor
NewProcessor creates a new batch processor.
The source provides random access to the data blob. The pool provides reusable zstd decoders for compressed entries. maxFileSize limits the size of individual entries (0 for no limit).
func (*Processor) Process ¶
func (p *Processor) Process(entries []*Entry, sink Sink) (ProcessStats, error)
Process reads and processes entries, writing results to the sink.
Entries are filtered through sink.ShouldProcess, sorted by offset, grouped into contiguous ranges, and processed efficiently. For each entry, the content is decompressed (if needed), hash-verified, and written to the sink.
Processing stops on the first error encountered. The returned ProcessStats contains counts for processed and skipped entries, and total bytes written. On error, partial stats are returned reflecting work completed before the error.
type ProcessorOption ¶
type ProcessorOption func(*Processor)
ProcessorOption configures a Processor.
func WithProcessorLogger ¶
func WithProcessorLogger(logger *slog.Logger) ProcessorOption
WithProcessorLogger sets the logger for batch processing operations. If not set, logging is disabled.
func WithProcessorProgress ¶ added in v1.1.0
func WithProcessorProgress(fn blobtype.ProgressFunc) ProcessorOption
WithProcessorProgress sets a callback to receive progress updates during processing. The callback receives events for each file extracted.
func WithReadAheadBytes ¶
func WithReadAheadBytes(limit uint64) ProcessorOption
WithReadAheadBytes caps the total size of buffered group data. A value of 0 disables the byte budget.
func WithReadConcurrency ¶
func WithReadConcurrency(n int) ProcessorOption
WithReadConcurrency sets the number of concurrent range reads. Values < 1 force serial reads.
func WithWorkers ¶
func WithWorkers(n int) ProcessorOption
WithWorkers sets the number of workers for parallel processing. Values < 0 force serial processing. Zero uses automatic heuristics. Values > 0 force a specific worker count.
type Sink ¶
type Sink interface {
// ShouldProcess returns false if this entry should be skipped.
// This allows implementations to skip already-cached entries or existing files.
ShouldProcess(entry *Entry) bool
// Writer returns a writer for the entry's content.
// The returned Committer must have Commit() called after successful
// write and verification, or Discard() called on any error.
//
// The caller will:
// 1. Write decompressed content to the Committer
// 2. Verify the SHA256 hash matches entry.Hash
// 3. Call Commit() if verification succeeds, Discard() otherwise
Writer(entry *Entry) (Committer, error)
}
Sink receives decompressed and verified file content during batch processing.
Implementations determine where content is written (cache, filesystem, etc.) and can filter which entries to process.