plz4

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2025 License: BSD-2-Clause Imports: 11 Imported by: 0

README

Parallel LZ4

godoc license Build Status Go Coverage Go Report Card GitHub Tag

The plz4 package provides a fast and simple golang library to encode and decode the LZ4 Frame Format in parallel.

In addition, it provides the plz4 command line tool to generate and decode LZ4.

Features

The primary goal of the plz4 project is performance, speed in particular. Multi-core machines are now commonplace, and LZ4's independent block mode is well suited to fully take advantage of multiple cores with some caveats.

This project attempts to support all of the features enumerated in the LZ4 Frame Format specification. In addition to the baseline features such as checksums and variable compression levels, the library supports the following;

Design

The library is written in Go, which provides a fantastic framework for parallel execution. For maximum feature compatibility, the underlying engine leverages the canonical lz4 library via CGO. As an alternative; there is an excellent pure golang implementation by Pierre Curto.

The library runs in two modes; either synchronous or asynchronous mode. The asynchronous mode executes the encoding/decoding work in one or more goroutines to parallelize. In both modes, a memory pool is employed to minimize data allocations on the heap. Data blocks are reused from the memory pool when possible. On account of the minimal heap allocations, plz4 puts little pressure on the heap. As such, it performs well as a compression engine for long-running processes such as servers.

There is an inherent tradeoff between speed and memory in the LZ4 design. LZ4 compresses best with large blocks and as such the 4 Mib block size is the default. However, the more work done in parallel increases the amount of instantaneous RAM used. For example, a compression job using 8 cores and 4 MiB blocks could use upwards of 32 Mibs at one time (more than that when you consider both read and write blocks). A compression job using 8 cores and 64 KiB blocks would use much less, upwards of 512Kib.

To manage this tradeoff, there are a few knobs:

  • When compressing, tune the block size given the environment.
  • For each job, the maximum number of goroutines may be specified. This, coupled with the block size, will limit overall RAM usage.
  • There is additionally an option to provide a user-specified WorkerPool. The advantage here is that the overall number of cores is limited without having to manage the maximum parallel count on each job.

Caveats

Linked Blocks

While LZ4 Frames using independent blocks parallelizes well, the linked blocks feature does not. This is because each block is dependent on the data from the previous block. While plz4 can compress linked frames in parallel, it cannot decompress in parallel because of this dependency.

Content Checksum

There is another LZ4 Frame feature that is problematic at scale. By default, plz4 enables the content checksum feature, as recommended in the spec. This feature uses a 32-bit checksum to validate that the content stream produced during decompression has the same checksum as the original source. Because the checksum must be calculated in serial, a decompression job running highly parallel may fall behind during this calculation. To improve parallel throughput, disabled the content checksum feature on decompress.

Random read access

Another advantage of independent blocks is the potential to support random read access. This is possible because each block can be independently decompressed. To support this, plz4 provides an optional progress callback that emits both the source offset and corresponding block offset during compression. An implementation can use this information to build lookup tables that can later be used to skip ahead during decompression to a known block offset. plz4 provides the 'WithReadOffset' option on the NewReader API to skip ahead and start decompression at a known block offset.

Install

To install the 'plz4' command line tool use the following command:

go install github.com/prequel-dev/plz4/cmd/plz4

Use the 'bakeoff' command to determine whether your particular payload performs better using plz4 or the native go implementation. The two implementations differ on the relation between compression level and output size.

Documentation

Index

Examples

Constants

View Source
const (
	ErrClosed            = zerr.ErrClosed
	ErrCorrupted         = zerr.ErrCorrupted
	ErrMagic             = zerr.ErrMagic
	ErrVersion           = zerr.ErrVersion
	ErrHeaderHash        = zerr.ErrHeaderHash
	ErrBlockHash         = zerr.ErrBlockHash
	ErrContentHash       = zerr.ErrContentHash
	ErrHeaderRead        = zerr.ErrHeaderRead
	ErrHeaderWrite       = zerr.ErrHeaderWrite
	ErrDescriptorRead    = zerr.ErrDescriptorRead
	ErrBlockSizeRead     = zerr.ErrBlockSizeRead
	ErrBlockRead         = zerr.ErrBlockRead
	ErrBlockSizeOverflow = zerr.ErrBlockSizeOverflow
	ErrCompress          = zerr.ErrCompress
	ErrDecompress        = zerr.ErrDecompress
	ErrReserveBitSet     = zerr.ErrReserveBitSet
	ErrBlockDescriptor   = zerr.ErrBlockDescriptor
	ErrContentHashRead   = zerr.ErrContentHashRead
	ErrContentSize       = zerr.ErrContentSize
	ErrReadOffset        = zerr.ErrReadOffset
	ErrReadOffsetLinked  = zerr.ErrReadOffsetLinked
	ErrSkip              = zerr.ErrSkip
	ErrNibble            = zerr.ErrNibble
	ErrUnsupported       = zerr.ErrUnsupported
)
View Source
const (
	// 64 KiB block size
	BlockIdx64KB = descriptor.BlockIdx64KB

	// 256 KiB block size
	BlockIdx256KB = descriptor.BlockIdx256KB

	// 1 MiB block size
	BlockIdx1MB = descriptor.BlockIdx1MB

	// 4 MiB block size
	BlockIdx4MB = descriptor.BlockIdx4MB
)

Variables

This section is empty.

Functions

func Lz4Corrupted

func Lz4Corrupted(err error) bool

Returns true if 'err' indicates that the read input is corrupted.

Note that a short read is not considered corrupted. In that case the returned error will be a join of the error context, and the underlying error, either an io.EOF or io.ErrUnexpectedEOF.

func WriteSkipFrameHeader

func WriteSkipFrameHeader(wr io.Writer, nibble uint8, sz uint32) (int, error)

Write a skip frame header to 'wr'. A skip frame of exactly size 'sz' must follow the header.

'sz' 		32-bit unsigned long size of frame.
'nibble' 	4-bit value shifted into block magic field

Types

type BlockIdxT

type BlockIdxT = descriptor.BlockIdxT

BlockIdxT is a type for block size index.

type CbDictT

type CbDictT = opts.DictCallbackT

Dictionary callback function type.

type CbProgressT

type CbProgressT = opts.ProgressFuncT

Progress callback function type.

type CbSkipT

type CbSkipT = opts.SkipCallbackT

Skip callback function type.

type LevelT

type LevelT = compress.LevelT

LevelT is a type for compression level.

const (
	Level1 LevelT = iota + 1
	Level2
	Level3
	Level4
	Level5
	Level6
	Level7
	Level8
	Level9
	Level10
	Level11
	Level12
)

type OptT

type OptT func(*opts.OptsT)

OptT is a function that sets an option on the processor.

func WithBlockChecksum

func WithBlockChecksum(enable bool) OptT

Enable block checksums on write. Defaults to disabled.

func WithBlockLinked

func WithBlockLinked(enable bool) OptT

Enable linked blocks on write. Defaults to disabled.

func WithBlockSize

func WithBlockSize(idx BlockIdxT) OptT

Specify write block size. Defaults to BlockIdx4MB.

func WithContentChecksum

func WithContentChecksum(enable bool) OptT

Enable full content checksum. Defaults to enabled.

ReadMode: 	Calculate and append content checksum if enabled
WriteMode: 	Validate content checksum if provided; ignore if disabled.

func WithContentSize

func WithContentSize(sz uint64) OptT

Specify write content size to embed in header.

func WithContentSizeCheck

func WithContentSizeCheck(enabled bool) OptT

Enable content size check. Defaults to enabled.

According to spec, the content size is informational so in some cases it may be desirable to skip the check.

func WithDictCallback

func WithDictCallback(cb CbDictT) OptT

Specify optional dictionary callback.

Engine will emit callback when a dictionary identifier is read in the frame header. An optional dictionary may be returned from callback. This dictionary will overide any dictionary previously specified with the WithDictionary() option.

func WithDictionary

func WithDictionary(data []byte) OptT

Provide a dictionary for compress or decompress mode. Only last 64KiB is used.

func WithDictionaryId

func WithDictionaryId(id uint32) OptT

Specify dictionary identifer to embed in header on write.

func WithLevel

func WithLevel(lvl LevelT) OptT

Specify write compression level [1-12]. Defaults to Level1.

func WithParallel

func WithParallel(n int) OptT

Specify number of go routines to run in parallel. Defaults to 1.

0   Process synchronously
1+  Process asynchronously
<0  Process asynchronously with the number of goroutines up to the CPU count

func WithPendingSize

func WithPendingSize(n int) OptT

Specify the maximum pending buffer size. Defaults to nParallel * blockSz.

Larger maximum pending size improves parallel processing throughput at the expense of RAM. The default is the minimal allowed size. This option only applies to the asynchronous case. It is ignored in the synchronous case.

Setting the pending size to -1 enables auto mode. In auto mode, the processor will automatically scale the pending size for maximum speed based on the block size and nParallel.

func WithProgress

func WithProgress(cb CbProgressT) OptT

Processor will emit tuple (src_block_offset, dst_blk_offset) on each block boundary. Applies to both compress and decompress modes.

Offsets are relative to the start of the frame.

Note: Callback may be called from a secondary goroutine. However, offsets will emit in order from only that goroutine.

func WithReadOffset

func WithReadOffset(offset int64) OptT

Read block starting at byte 'offset'.

The offset is the first byte of the data block relative to the start of the frame.

func WithSkipCallback

func WithSkipCallback(cb CbSkipT) OptT

Specify skip block callback function.

Callback will emit on a skip frame. The callback must consume exactly 'sz' bytes from the reader.

func WithWorkerPool

func WithWorkerPool(wp WorkerPool) OptT

Optional worker pool for both compress and decompress modes.

type Reader

type Reader interface {
	// Read decompressed data into 'dst'.  Return number bytes read.
	Read(dst []byte) (n int, err error)

	// Decompress to 'wr'.  Return number bytes written.
	WriteTo(wr io.Writer) (int64, error)

	// Close the Reader to release underlying resources.
	// Close() *MUST* be called on completion whether or not
	//   the Reader is in an error state.
	Close() error
}

Reader is an interface for reading LZ4 compressed data.

It implements the io.ReadCloser and the io.WriterTo interfaces.

func NewReader

func NewReader(rd io.Reader, opts ...OptT) Reader

Construct a Reader to decompress the LZ4 frame from 'rd'.

Specify optional parameters in 'opts'.

Example
// LZ4 compressed frame containing the payload "hello"
lz4Data := []byte{0x04, 0x22, 0x4d, 0x18, 0x60, 0x70, 0x73, 0x06, 0x00, 0x00, 0x00, 0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x00}

// Create the Reader with an option
rd := NewReader(
	bytes.NewReader(lz4Data), // Wrap the src in a reader
	WithParallel(0),          // Run lz4 in synchronous mode
)

// Always close; double close is ok.
defer rd.Close()

// Use the io.WriterTo interface to decompress the src
var dst bytes.Buffer
if _, err := rd.WriteTo(&dst); err != nil {
	panic(err)
}

// Close the Reader and check for error.
// Reader must always be closed to release resources.
// It is ok to Close more than once, as is done in 'defer Close()' above.
if err := rd.Close(); err != nil {
	panic(err)
}

fmt.Println(dst.String())
Output:

hello

type WorkerPool

type WorkerPool = opts.WorkerPool

WorkerPool is an interface for a worker pool implementation.

type Writer

type Writer interface {
	// Compress 'src' data; return number of bytes written.
	// May be used in sequence with ReadFrom.
	Write(src []byte) (n int, err error)

	// Compress data from 'rd'; return number of bytes read.
	// May be used in sequence with Write.
	ReadFrom(rd io.Reader) (n int64, err error)

	// Flush pending data immediately to 'wr', generating
	//   a new LZ4 Frame block.  If no data is pending, no
	//   block is generated.
	//
	// This is a synchronous call; it will completely flush an
	//   asynchronous pipeline.
	Flush() error

	// Close the Writer to release underlying resources.
	// Close() *MUST* be called on completion whether or not
	//   the Writer is in an error state.
	Close() error
}

Writer is an interface for compressing data into an LZ4 frame.

It implements the io.WriteCloser and the io.ReaderFrom interfaces.

func NewWriter

func NewWriter(wr io.Writer, opts ...OptT) Writer

Construct a Writer to compress data into an LZ4 frame written to 'wr'.

Specify optional parameters in 'opts'.

Example
var dst bytes.Buffer

// Create the Writer with two options
wr := NewWriter(
	&dst,
	WithParallel(1),            // Run in asynchronous mode
	WithContentChecksum(false), // Disable content checksum
)

// Always close; double close is ok.
defer wr.Close()

// Write source data to be compressed
if _, err := wr.Write([]byte("hello")); err != nil {
	panic(err)
}

// Flush is not required but is shown here as an example
if err := wr.Flush(); err != nil {
	panic(err)
}

// Terminate the LZ4 frame with Close() and check for error
// Writer must always be closed to release resources.
// It is ok to Close more than once, as is done in 'defer Close()' above.
if err := wr.Close(); err != nil {
	panic(err)
}

fmt.Println(hex.EncodeToString(dst.Bytes()))
Output:

04224d18607073060000005068656c6c6f00000000

Directories

Path Synopsis
cmd
docs
internal
pkg/xxh32
Package xxh32 implements the very fast XXH hashing algorithm (32 bits version).
Package xxh32 implements the very fast XXH hashing algorithm (32 bits version).
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL