lfs

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package lfs provides Large File Support (LFS) for Kafka messages.

LFS enables storing large payloads (up to 5GB) in S3 while keeping small envelope pointers in Kafka topics. This implements the "Claim Check" pattern.

Overview

When a Kafka producer sends a message with the LFS_BLOB header, the LFS proxy:

  1. Uploads the payload to S3
  2. Computes SHA256 checksum
  3. Creates a JSON envelope with metadata
  4. Forwards the envelope (not the payload) to Kafka

Consumers receive the envelope and can use this package to transparently fetch the original payload from S3.

Envelope Format

The LFS envelope is a JSON object stored as the Kafka message value:

{
  "kfs_lfs": 1,
  "bucket": "my-bucket",
  "key": "default/topic/lfs/2026/02/01/obj-uuid",
  "size": 10485760,
  "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
  "content_type": "application/octet-stream",
  "created_at": "2026-02-01T12:00:00Z",
  "proxy_id": "lfs-proxy-0"
}

Consumer Usage

Basic usage with franz-go:

// Create S3 client
s3Client, err := lfs.NewS3Client(ctx, lfs.S3Config{
    Bucket:   "my-bucket",
    Region:   "us-east-1",
    Endpoint: "http://minio:9000",  // optional
})
if err != nil {
    log.Fatal(err)
}

// Create LFS consumer
consumer := lfs.NewConsumer(s3Client)

// Process Kafka records
for _, record := range kafkaRecords {
    // Unwrap automatically fetches LFS blobs from S3.
    // First return is the envelope (nil for non-LFS records).
    _, data, err := consumer.Unwrap(ctx, record.Value)
    if err != nil {
        log.Error("failed to unwrap", "error", err)
        continue
    }
    // data contains the original payload (or unchanged if not LFS)
    processData(data)
}

Record Wrapper

For lazy resolution with caching, use the Record wrapper:

s3Client, _ := lfs.NewS3Client(ctx, config)
consumer := lfs.NewConsumer(s3Client)

for _, kafkaRecord := range records {
    rec := lfs.NewRecord(kafkaRecord.Value, consumer,
        lfs.WithStreamFetcher(s3Client),  // enables ValueStream()
    )

    // Check if this is an LFS record
    if rec.IsLFS() {
        // Get size without fetching
        size, _ := rec.Size()
        fmt.Printf("LFS blob size: %d\n", size)
    }

    // Lazy fetch with caching (second call uses cache)
    data, err := rec.Value(ctx)
    if err != nil {
        log.Error("resolve failed", "error", err)
        continue
    }
    processData(data)
}

Streaming Large Files

For memory-efficient processing of large files:

rec := lfs.NewRecord(value, nil,
    lfs.WithStreamFetcher(s3Client),
)

reader, size, err := rec.ValueStream(ctx)
if err != nil {
    log.Fatal(err)
}
defer reader.Close()

// Stream directly to output
io.Copy(outputFile, reader)

// Close validates checksum
if err := reader.Close(); err != nil {
    log.Error("checksum validation failed", "error", err)
}

Checksum Validation

By default, fetched blobs are validated against the SHA256 checksum stored in the envelope. This can be disabled for performance:

consumer := lfs.NewConsumer(s3Client,
    lfs.WithChecksumValidation(false),
)

Error Handling

The package defines specific error types for common failures:

_, data, err := consumer.Unwrap(ctx, value)
if err != nil {
    var checksumErr *lfs.ChecksumError
    if errors.As(err, &checksumErr) {
        log.Error("data corruption detected",
            "expected", checksumErr.Expected,
            "actual", checksumErr.Actual,
        )
    }

    var lfsErr *lfs.LfsError
    if errors.As(err, &lfsErr) {
        log.Error("LFS operation failed",
            "operation", lfsErr.Op,
            "error", lfsErr.Err,
        )
    }
}

Detection

Use IsLfsEnvelope for fast detection without parsing:

if lfs.IsLfsEnvelope(value) {
    // This is an LFS envelope
    env, _ := lfs.DecodeEnvelope(value)
    fmt.Printf("Blob stored at: s3://%s/%s\n", env.Bucket, env.Key)
}

Producer Usage

For producing large payloads via the LFS proxy HTTP endpoint:

// Create producer pointing to LFS proxy
producer := lfs.NewProducer("http://lfs-proxy:8080",
    lfs.WithContentType("video/mp4"),
    lfs.WithRetry(3, time.Second),
)

// Stream a file to the proxy
file, _ := os.Open("large-video.mp4")
defer file.Close()

result, err := producer.Produce(ctx, "video-uploads", "video-001", file)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Uploaded %d bytes to s3://%s/%s\n",
    result.BytesSent, result.Envelope.Bucket, result.Envelope.Key)

Producer with Progress Tracking

Monitor upload progress for large files:

producer := lfs.NewProducer("http://lfs-proxy:8080",
    lfs.WithProgress(func(bytesSent int64) error {
        fmt.Printf("Uploaded: %d bytes\n", bytesSent)
        return nil  // return error to cancel upload
    }),
)

result, err := producer.Produce(ctx, "media", "file.dat", reader)

Producer with Checksum Validation

Validate server-computed checksum against a pre-computed value:

// Pre-compute checksum
hasher := sha256.New()
io.Copy(hasher, file)
expectedSHA := hex.EncodeToString(hasher.Sum(nil))
file.Seek(0, 0)

// Upload with checksum validation
result, err := producer.ProduceWithChecksum(ctx, "topic", "key", file, expectedSHA)
if err != nil {
    var checksumErr *lfs.ChecksumError
    if errors.As(err, &checksumErr) {
        log.Error("upload corrupted", "expected", checksumErr.Expected)
    }
}

Producer Retry Behavior

The producer automatically retries on transient failures (5xx errors, 429 rate limits, connection errors). Non-retryable errors (4xx client errors, checksum mismatches) fail immediately. The retry delay is linear based on the attempt number.

producer := lfs.NewProducer("http://lfs-proxy:8080",
    lfs.WithRetry(5, 2*time.Second),  // retry with linear backoff
)

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoConsumer      = errors.New("no consumer configured for LFS resolution")
	ErrNoStreamFetcher = errors.New("no stream fetcher configured for streaming access")
)

Sentinel errors for LFS operations.

Functions

func ComputeChecksum

func ComputeChecksum(alg ChecksumAlg, data []byte) (string, error)

ComputeChecksum computes a checksum for the given data and algorithm.

func EncodeEnvelope

func EncodeEnvelope(env Envelope) ([]byte, error)

EncodeEnvelope serializes an envelope to JSON.

func IsLfsEnvelope

func IsLfsEnvelope(value []byte) bool

IsLfsEnvelope detects an LFS envelope via a quick JSON marker check.

func NewChecksumHasher

func NewChecksumHasher(alg ChecksumAlg) (hash.Hash, error)

NewChecksumHasher returns a hash.Hash for the requested algorithm.

Types

type BlobFetcher

type BlobFetcher interface {
	Fetch(ctx context.Context, key string) ([]byte, error)
}

BlobFetcher downloads LFS blobs from storage.

type ChecksumAlg

type ChecksumAlg string

ChecksumAlg describes the checksum algorithm used for LFS validation.

const (
	ChecksumSHA256 ChecksumAlg = "sha256"
	ChecksumMD5    ChecksumAlg = "md5"
	ChecksumCRC32  ChecksumAlg = "crc32"
	ChecksumNone   ChecksumAlg = "none"
)

func EnvelopeChecksum

func EnvelopeChecksum(env Envelope) (ChecksumAlg, string, bool, error)

EnvelopeChecksum returns the algorithm + expected checksum for an envelope. If alg is none, ok is false (no validation).

func NormalizeChecksumAlg

func NormalizeChecksumAlg(raw string) (ChecksumAlg, error)

NormalizeChecksumAlg normalizes an algorithm name; empty defaults to sha256.

type ChecksumError

type ChecksumError struct {
	Expected string
	Actual   string
}

ChecksumError indicates a SHA256 mismatch.

func (*ChecksumError) Error

func (e *ChecksumError) Error() string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer unwraps LFS envelope records by fetching the blob from storage.

func NewConsumer

func NewConsumer(fetcher BlobFetcher, opts ...ConsumerOption) *Consumer

NewConsumer creates a Consumer that fetches LFS blobs.

func (*Consumer) Unwrap

func (c *Consumer) Unwrap(ctx context.Context, value []byte) (*Envelope, []byte, error)

Unwrap checks if value is an LFS envelope and fetches the blob from storage. Returns (nil, original value, nil) for non-envelope values. Returns (envelope, blob, nil) for successfully resolved envelopes. Callers that don't need the envelope can ignore the first return value.

type ConsumerOption

type ConsumerOption func(*Consumer)

ConsumerOption configures the Consumer.

func WithChecksumValidation

func WithChecksumValidation(enabled bool) ConsumerOption

WithChecksumValidation enables SHA256 validation on fetched blobs.

type Envelope

type Envelope struct {
	Version         int               `json:"kfs_lfs"`
	Bucket          string            `json:"bucket"`
	Key             string            `json:"key"`
	Size            int64             `json:"size"`
	SHA256          string            `json:"sha256"`
	Checksum        string            `json:"checksum,omitempty"`
	ChecksumAlg     string            `json:"checksum_alg,omitempty"`
	ContentType     string            `json:"content_type,omitempty"`
	OriginalHeaders map[string]string `json:"original_headers,omitempty"`
	CreatedAt       string            `json:"created_at,omitempty"`
	ProxyID         string            `json:"proxy_id,omitempty"`
}

Envelope describes the pointer metadata for an LFS payload stored in S3.

func DecodeEnvelope

func DecodeEnvelope(data []byte) (Envelope, error)

DecodeEnvelope parses JSON bytes into an Envelope.

type LfsError

type LfsError struct {
	Op  string
	Err error
}

LfsError wraps lower-level LFS errors with context.

func (*LfsError) Error

func (e *LfsError) Error() string

func (*LfsError) Unwrap

func (e *LfsError) Unwrap() error

type ProduceResult

type ProduceResult struct {
	Envelope  Envelope      // The LFS envelope with S3 location and checksum
	Duration  time.Duration // Time taken for the upload
	BytesSent int64         // Total bytes uploaded
}

ProduceResult contains the result of a successful LFS produce operation.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer sends large payloads to the LFS proxy via HTTP streaming.

func NewProducer

func NewProducer(endpoint string, opts ...ProducerOption) *Producer

NewProducer creates a Producer that sends blobs to the LFS proxy.

The endpoint should be the LFS proxy HTTP URL, e.g., "http://lfs-proxy:8080".

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, topic, key string, body io.Reader) (*ProduceResult, error)

Produce streams a payload to the LFS proxy for the given topic.

The reader is streamed directly to the proxy without buffering the entire payload in memory. The proxy uploads to S3 and returns an LFS envelope that is stored in Kafka.

Example:

producer := lfs.NewProducer("http://lfs-proxy:8080")
file, _ := os.Open("large-video.mp4")
defer file.Close()

result, err := producer.Produce(ctx, "video-uploads", "video-001", file)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Uploaded %d bytes, S3 key: %s\n", result.BytesSent, result.Envelope.Key)

func (*Producer) ProducePartitioned

func (p *Producer) ProducePartitioned(ctx context.Context, topic string, partition int32, key string, body io.Reader) (*ProduceResult, error)

ProducePartitioned sends to a specific partition.

func (*Producer) ProduceWithChecksum

func (p *Producer) ProduceWithChecksum(ctx context.Context, topic, key string, body io.Reader, expectedSHA256 string) (*ProduceResult, error)

ProduceWithChecksum streams a payload and validates the server-computed checksum.

If the server's SHA256 doesn't match the expected checksum, an error is returned. This is useful when the client has pre-computed the checksum.

type ProducerOption

type ProducerOption func(*Producer)

ProducerOption configures the Producer.

func WithAPIKey

func WithAPIKey(key string) ProducerOption

WithAPIKey sets the API key for authenticated requests.

func WithContentType

func WithContentType(ct string) ProducerOption

WithContentType sets the Content-Type header for uploads.

func WithHTTPClient

func WithHTTPClient(client *http.Client) ProducerOption

WithHTTPClient sets a custom HTTP client.

func WithProgress

func WithProgress(fn ProgressFunc) ProducerOption

WithProgress sets a callback for upload progress.

func WithRetry

func WithRetry(maxRetries int, delay time.Duration) ProducerOption

WithRetry configures retry behavior for transient failures.

type ProgressFunc

type ProgressFunc func(bytesSent int64) error

ProgressFunc is called during upload with bytes sent so far. Returning an error cancels the upload.

type Record

type Record struct {
	// contains filtered or unexported fields
}

Record wraps a Kafka record value with lazy LFS resolution. If the value contains an LFS envelope, the actual blob is fetched from S3 on first access to Value() or ValueStream().

Example usage:

consumer := lfs.NewConsumer(s3Client)
for _, record := range kafkaRecords {
    rec := lfs.NewRecord(record.Value, consumer)
    data, err := rec.Value(ctx)
    if err != nil {
        log.Error("failed to resolve LFS", "error", err)
        continue
    }
    // data contains the resolved blob (or original value if not LFS)
}

func NewRecord

func NewRecord(raw []byte, consumer *Consumer, opts ...RecordOption) *Record

NewRecord creates a Record that wraps a raw Kafka message value. If the value is an LFS envelope, it will be resolved lazily on first access.

func (*Record) ContentType

func (r *Record) ContentType() string

ContentType returns the content type from the LFS envelope. Returns empty string for non-LFS records.

func (*Record) Envelope

func (r *Record) Envelope() (*Envelope, error)

Envelope returns the LFS envelope if present, nil otherwise. Does not fetch the blob, just parses the envelope metadata.

func (*Record) IsLFS

func (r *Record) IsLFS() bool

IsLFS returns true if this record contains an LFS envelope.

func (*Record) Raw

func (r *Record) Raw() []byte

Raw returns the original record value without resolution.

func (*Record) Size

func (r *Record) Size() (int64, error)

Size returns the size of the blob. For LFS records, returns the size from the envelope without fetching. For non-LFS records, returns the length of the raw value.

func (*Record) Value

func (r *Record) Value(ctx context.Context) ([]byte, error)

Value returns the resolved blob content. If the record is an LFS envelope, fetches the blob from S3. If not an LFS envelope, returns the original value. Results are cached after first resolution.

func (*Record) ValueStream

func (r *Record) ValueStream(ctx context.Context) (io.ReadCloser, int64, error)

ValueStream returns a streaming reader for the blob content. This is more memory-efficient for large blobs. Note: The caller must close the returned reader. If not an LFS envelope, returns a reader over the raw value.

type RecordOption

type RecordOption func(*Record)

RecordOption configures a Record.

func WithRecordChecksumValidation

func WithRecordChecksumValidation(enabled bool) RecordOption

WithRecordChecksumValidation enables/disables checksum validation.

func WithStreamFetcher

func WithStreamFetcher(fetcher StreamFetcher) RecordOption

WithStreamFetcher sets a stream fetcher for ValueStream() support.

type ResolvedRecord

type ResolvedRecord struct {
	Envelope    Envelope
	Payload     []byte
	ContentType string
	BlobSize    int64
	Checksum    string
	ChecksumAlg string
}

ResolvedRecord holds the resolved payload and metadata.

type Resolver

type Resolver struct {
	// contains filtered or unexported fields
}

Resolver fetches LFS payloads and validates integrity.

func NewResolver

func NewResolver(cfg ResolverConfig, s3 S3Reader) *Resolver

NewResolver creates a resolver with the provided S3 reader.

func (*Resolver) Resolve

func (r *Resolver) Resolve(ctx context.Context, value []byte) (ResolvedRecord, bool, error)

Resolve resolves a record value. It returns ok=false if the value is not an LFS envelope.

type ResolverConfig

type ResolverConfig struct {
	MaxSize          int64
	ValidateChecksum bool
}

ResolverConfig controls LFS resolution behavior.

type S3Client

type S3Client struct {
	// contains filtered or unexported fields
}

S3Client fetches LFS blobs from S3-compatible storage.

func NewS3Client

func NewS3Client(ctx context.Context, cfg S3Config) (*S3Client, error)

func (*S3Client) Fetch

func (c *S3Client) Fetch(ctx context.Context, key string) ([]byte, error)

Fetch downloads the object contents into memory.

func (*S3Client) Stream

func (c *S3Client) Stream(ctx context.Context, key string) (io.ReadCloser, int64, error)

Stream returns the object body for streaming callers.

type S3Config

type S3Config struct {
	Bucket          string
	Region          string
	Endpoint        string
	AccessKeyID     string
	SecretAccessKey string
	SessionToken    string
	ForcePathStyle  bool
}

type S3Reader

type S3Reader interface {
	Fetch(ctx context.Context, key string) ([]byte, error)
	Stream(ctx context.Context, key string) (io.ReadCloser, int64, error)
}

S3Reader fetches LFS blobs from S3-compatible storage.

type StreamFetcher

type StreamFetcher interface {
	Stream(ctx context.Context, key string) (io.ReadCloser, int64, error)
}

StreamFetcher downloads LFS blobs as streams from storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL