Documentation
¶
Overview ¶
Package lfs provides Large File Support (LFS) for Kafka messages.
LFS enables storing large payloads (up to 5GB) in S3 while keeping small envelope pointers in Kafka topics. This implements the "Claim Check" pattern.
Overview ¶
When a Kafka producer sends a message with the LFS_BLOB header, the LFS proxy:
- Uploads the payload to S3
- Computes SHA256 checksum
- Creates a JSON envelope with metadata
- Forwards the envelope (not the payload) to Kafka
Consumers receive the envelope and can use this package to transparently fetch the original payload from S3.
Envelope Format ¶
The LFS envelope is a JSON object stored as the Kafka message value:
{
"kfs_lfs": 1,
"bucket": "my-bucket",
"key": "default/topic/lfs/2026/02/01/obj-uuid",
"size": 10485760,
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"content_type": "application/octet-stream",
"created_at": "2026-02-01T12:00:00Z",
"proxy_id": "lfs-proxy-0"
}
Consumer Usage ¶
Basic usage with franz-go:
// Create S3 client
s3Client, err := lfs.NewS3Client(ctx, lfs.S3Config{
Bucket: "my-bucket",
Region: "us-east-1",
Endpoint: "http://minio:9000", // optional
})
if err != nil {
log.Fatal(err)
}
// Create LFS consumer
consumer := lfs.NewConsumer(s3Client)
// Process Kafka records
for _, record := range kafkaRecords {
// Unwrap automatically fetches LFS blobs from S3.
// First return is the envelope (nil for non-LFS records).
_, data, err := consumer.Unwrap(ctx, record.Value)
if err != nil {
log.Error("failed to unwrap", "error", err)
continue
}
// data contains the original payload (or unchanged if not LFS)
processData(data)
}
Record Wrapper ¶
For lazy resolution with caching, use the Record wrapper:
s3Client, _ := lfs.NewS3Client(ctx, config)
consumer := lfs.NewConsumer(s3Client)
for _, kafkaRecord := range records {
rec := lfs.NewRecord(kafkaRecord.Value, consumer,
lfs.WithStreamFetcher(s3Client), // enables ValueStream()
)
// Check if this is an LFS record
if rec.IsLFS() {
// Get size without fetching
size, _ := rec.Size()
fmt.Printf("LFS blob size: %d\n", size)
}
// Lazy fetch with caching (second call uses cache)
data, err := rec.Value(ctx)
if err != nil {
log.Error("resolve failed", "error", err)
continue
}
processData(data)
}
Streaming Large Files ¶
For memory-efficient processing of large files:
rec := lfs.NewRecord(value, nil,
lfs.WithStreamFetcher(s3Client),
)
reader, size, err := rec.ValueStream(ctx)
if err != nil {
log.Fatal(err)
}
defer reader.Close()
// Stream directly to output
io.Copy(outputFile, reader)
// Close validates checksum
if err := reader.Close(); err != nil {
log.Error("checksum validation failed", "error", err)
}
Checksum Validation ¶
By default, fetched blobs are validated against the SHA256 checksum stored in the envelope. This can be disabled for performance:
consumer := lfs.NewConsumer(s3Client,
lfs.WithChecksumValidation(false),
)
Error Handling ¶
The package defines specific error types for common failures:
_, data, err := consumer.Unwrap(ctx, value)
if err != nil {
var checksumErr *lfs.ChecksumError
if errors.As(err, &checksumErr) {
log.Error("data corruption detected",
"expected", checksumErr.Expected,
"actual", checksumErr.Actual,
)
}
var lfsErr *lfs.LfsError
if errors.As(err, &lfsErr) {
log.Error("LFS operation failed",
"operation", lfsErr.Op,
"error", lfsErr.Err,
)
}
}
Detection ¶
Use IsLfsEnvelope for fast detection without parsing:
if lfs.IsLfsEnvelope(value) {
// This is an LFS envelope
env, _ := lfs.DecodeEnvelope(value)
fmt.Printf("Blob stored at: s3://%s/%s\n", env.Bucket, env.Key)
}
Producer Usage ¶
For producing large payloads via the LFS proxy HTTP endpoint:
// Create producer pointing to LFS proxy
producer := lfs.NewProducer("http://lfs-proxy:8080",
lfs.WithContentType("video/mp4"),
lfs.WithRetry(3, time.Second),
)
// Stream a file to the proxy
file, _ := os.Open("large-video.mp4")
defer file.Close()
result, err := producer.Produce(ctx, "video-uploads", "video-001", file)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Uploaded %d bytes to s3://%s/%s\n",
result.BytesSent, result.Envelope.Bucket, result.Envelope.Key)
Producer with Progress Tracking ¶
Monitor upload progress for large files:
producer := lfs.NewProducer("http://lfs-proxy:8080",
lfs.WithProgress(func(bytesSent int64) error {
fmt.Printf("Uploaded: %d bytes\n", bytesSent)
return nil // return error to cancel upload
}),
)
result, err := producer.Produce(ctx, "media", "file.dat", reader)
Producer with Checksum Validation ¶
Validate server-computed checksum against a pre-computed value:
// Pre-compute checksum
hasher := sha256.New()
io.Copy(hasher, file)
expectedSHA := hex.EncodeToString(hasher.Sum(nil))
file.Seek(0, 0)
// Upload with checksum validation
result, err := producer.ProduceWithChecksum(ctx, "topic", "key", file, expectedSHA)
if err != nil {
var checksumErr *lfs.ChecksumError
if errors.As(err, &checksumErr) {
log.Error("upload corrupted", "expected", checksumErr.Expected)
}
}
Producer Retry Behavior ¶
The producer automatically retries on transient failures (5xx errors, 429 rate limits, connection errors). Non-retryable errors (4xx client errors, checksum mismatches) fail immediately. The retry delay is linear based on the attempt number.
producer := lfs.NewProducer("http://lfs-proxy:8080",
lfs.WithRetry(5, 2*time.Second), // retry with linear backoff
)
Index ¶
- Variables
- func ComputeChecksum(alg ChecksumAlg, data []byte) (string, error)
- func EncodeEnvelope(env Envelope) ([]byte, error)
- func IsLfsEnvelope(value []byte) bool
- func NewChecksumHasher(alg ChecksumAlg) (hash.Hash, error)
- type BlobFetcher
- type ChecksumAlg
- type ChecksumError
- type Consumer
- type ConsumerOption
- type Envelope
- type LfsError
- type ProduceResult
- type Producer
- func (p *Producer) Produce(ctx context.Context, topic, key string, body io.Reader) (*ProduceResult, error)
- func (p *Producer) ProducePartitioned(ctx context.Context, topic string, partition int32, key string, body io.Reader) (*ProduceResult, error)
- func (p *Producer) ProduceWithChecksum(ctx context.Context, topic, key string, body io.Reader, expectedSHA256 string) (*ProduceResult, error)
- type ProducerOption
- type ProgressFunc
- type Record
- func (r *Record) ContentType() string
- func (r *Record) Envelope() (*Envelope, error)
- func (r *Record) IsLFS() bool
- func (r *Record) Raw() []byte
- func (r *Record) Size() (int64, error)
- func (r *Record) Value(ctx context.Context) ([]byte, error)
- func (r *Record) ValueStream(ctx context.Context) (io.ReadCloser, int64, error)
- type RecordOption
- type ResolvedRecord
- type Resolver
- type ResolverConfig
- type S3Client
- type S3Config
- type S3Reader
- type StreamFetcher
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoConsumer = errors.New("no consumer configured for LFS resolution") ErrNoStreamFetcher = errors.New("no stream fetcher configured for streaming access") )
Sentinel errors for LFS operations.
Functions ¶
func ComputeChecksum ¶
func ComputeChecksum(alg ChecksumAlg, data []byte) (string, error)
ComputeChecksum computes a checksum for the given data and algorithm.
func EncodeEnvelope ¶
EncodeEnvelope serializes an envelope to JSON.
func IsLfsEnvelope ¶
IsLfsEnvelope detects an LFS envelope via a quick JSON marker check.
func NewChecksumHasher ¶
func NewChecksumHasher(alg ChecksumAlg) (hash.Hash, error)
NewChecksumHasher returns a hash.Hash for the requested algorithm.
Types ¶
type BlobFetcher ¶
BlobFetcher downloads LFS blobs from storage.
type ChecksumAlg ¶
type ChecksumAlg string
ChecksumAlg describes the checksum algorithm used for LFS validation.
const ( ChecksumSHA256 ChecksumAlg = "sha256" ChecksumMD5 ChecksumAlg = "md5" ChecksumCRC32 ChecksumAlg = "crc32" ChecksumNone ChecksumAlg = "none" )
func EnvelopeChecksum ¶
func EnvelopeChecksum(env Envelope) (ChecksumAlg, string, bool, error)
EnvelopeChecksum returns the algorithm + expected checksum for an envelope. If alg is none, ok is false (no validation).
func NormalizeChecksumAlg ¶
func NormalizeChecksumAlg(raw string) (ChecksumAlg, error)
NormalizeChecksumAlg normalizes an algorithm name; empty defaults to sha256.
type ChecksumError ¶
ChecksumError indicates a SHA256 mismatch.
func (*ChecksumError) Error ¶
func (e *ChecksumError) Error() string
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer unwraps LFS envelope records by fetching the blob from storage.
func NewConsumer ¶
func NewConsumer(fetcher BlobFetcher, opts ...ConsumerOption) *Consumer
NewConsumer creates a Consumer that fetches LFS blobs.
func (*Consumer) Unwrap ¶
Unwrap checks if value is an LFS envelope and fetches the blob from storage. Returns (nil, original value, nil) for non-envelope values. Returns (envelope, blob, nil) for successfully resolved envelopes. Callers that don't need the envelope can ignore the first return value.
type ConsumerOption ¶
type ConsumerOption func(*Consumer)
ConsumerOption configures the Consumer.
func WithChecksumValidation ¶
func WithChecksumValidation(enabled bool) ConsumerOption
WithChecksumValidation enables SHA256 validation on fetched blobs.
type Envelope ¶
type Envelope struct {
Version int `json:"kfs_lfs"`
Bucket string `json:"bucket"`
Key string `json:"key"`
Size int64 `json:"size"`
SHA256 string `json:"sha256"`
Checksum string `json:"checksum,omitempty"`
ChecksumAlg string `json:"checksum_alg,omitempty"`
ContentType string `json:"content_type,omitempty"`
OriginalHeaders map[string]string `json:"original_headers,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
ProxyID string `json:"proxy_id,omitempty"`
}
Envelope describes the pointer metadata for an LFS payload stored in S3.
func DecodeEnvelope ¶
DecodeEnvelope parses JSON bytes into an Envelope.
type ProduceResult ¶
type ProduceResult struct {
Envelope Envelope // The LFS envelope with S3 location and checksum
Duration time.Duration // Time taken for the upload
BytesSent int64 // Total bytes uploaded
}
ProduceResult contains the result of a successful LFS produce operation.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer sends large payloads to the LFS proxy via HTTP streaming.
func NewProducer ¶
func NewProducer(endpoint string, opts ...ProducerOption) *Producer
NewProducer creates a Producer that sends blobs to the LFS proxy.
The endpoint should be the LFS proxy HTTP URL, e.g., "http://lfs-proxy:8080".
func (*Producer) Produce ¶
func (p *Producer) Produce(ctx context.Context, topic, key string, body io.Reader) (*ProduceResult, error)
Produce streams a payload to the LFS proxy for the given topic.
The reader is streamed directly to the proxy without buffering the entire payload in memory. The proxy uploads to S3 and returns an LFS envelope that is stored in Kafka.
Example:
producer := lfs.NewProducer("http://lfs-proxy:8080")
file, _ := os.Open("large-video.mp4")
defer file.Close()
result, err := producer.Produce(ctx, "video-uploads", "video-001", file)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Uploaded %d bytes, S3 key: %s\n", result.BytesSent, result.Envelope.Key)
func (*Producer) ProducePartitioned ¶
func (p *Producer) ProducePartitioned(ctx context.Context, topic string, partition int32, key string, body io.Reader) (*ProduceResult, error)
ProducePartitioned sends to a specific partition.
func (*Producer) ProduceWithChecksum ¶
func (p *Producer) ProduceWithChecksum(ctx context.Context, topic, key string, body io.Reader, expectedSHA256 string) (*ProduceResult, error)
ProduceWithChecksum streams a payload and validates the server-computed checksum.
If the server's SHA256 doesn't match the expected checksum, an error is returned. This is useful when the client has pre-computed the checksum.
type ProducerOption ¶
type ProducerOption func(*Producer)
ProducerOption configures the Producer.
func WithAPIKey ¶
func WithAPIKey(key string) ProducerOption
WithAPIKey sets the API key for authenticated requests.
func WithContentType ¶
func WithContentType(ct string) ProducerOption
WithContentType sets the Content-Type header for uploads.
func WithHTTPClient ¶
func WithHTTPClient(client *http.Client) ProducerOption
WithHTTPClient sets a custom HTTP client.
func WithProgress ¶
func WithProgress(fn ProgressFunc) ProducerOption
WithProgress sets a callback for upload progress.
type ProgressFunc ¶
ProgressFunc is called during upload with bytes sent so far. Returning an error cancels the upload.
type Record ¶
type Record struct {
// contains filtered or unexported fields
}
Record wraps a Kafka record value with lazy LFS resolution. If the value contains an LFS envelope, the actual blob is fetched from S3 on first access to Value() or ValueStream().
Example usage:
consumer := lfs.NewConsumer(s3Client)
for _, record := range kafkaRecords {
rec := lfs.NewRecord(record.Value, consumer)
data, err := rec.Value(ctx)
if err != nil {
log.Error("failed to resolve LFS", "error", err)
continue
}
// data contains the resolved blob (or original value if not LFS)
}
func NewRecord ¶
func NewRecord(raw []byte, consumer *Consumer, opts ...RecordOption) *Record
NewRecord creates a Record that wraps a raw Kafka message value. If the value is an LFS envelope, it will be resolved lazily on first access.
func (*Record) ContentType ¶
ContentType returns the content type from the LFS envelope. Returns empty string for non-LFS records.
func (*Record) Envelope ¶
Envelope returns the LFS envelope if present, nil otherwise. Does not fetch the blob, just parses the envelope metadata.
func (*Record) Size ¶
Size returns the size of the blob. For LFS records, returns the size from the envelope without fetching. For non-LFS records, returns the length of the raw value.
func (*Record) Value ¶
Value returns the resolved blob content. If the record is an LFS envelope, fetches the blob from S3. If not an LFS envelope, returns the original value. Results are cached after first resolution.
func (*Record) ValueStream ¶
ValueStream returns a streaming reader for the blob content. This is more memory-efficient for large blobs. Note: The caller must close the returned reader. If not an LFS envelope, returns a reader over the raw value.
type RecordOption ¶
type RecordOption func(*Record)
RecordOption configures a Record.
func WithRecordChecksumValidation ¶
func WithRecordChecksumValidation(enabled bool) RecordOption
WithRecordChecksumValidation enables/disables checksum validation.
func WithStreamFetcher ¶
func WithStreamFetcher(fetcher StreamFetcher) RecordOption
WithStreamFetcher sets a stream fetcher for ValueStream() support.
type ResolvedRecord ¶
type ResolvedRecord struct {
Envelope Envelope
Payload []byte
ContentType string
BlobSize int64
Checksum string
ChecksumAlg string
}
ResolvedRecord holds the resolved payload and metadata.
type Resolver ¶
type Resolver struct {
// contains filtered or unexported fields
}
Resolver fetches LFS payloads and validates integrity.
func NewResolver ¶
func NewResolver(cfg ResolverConfig, s3 S3Reader) *Resolver
NewResolver creates a resolver with the provided S3 reader.
type ResolverConfig ¶
ResolverConfig controls LFS resolution behavior.
type S3Client ¶
type S3Client struct {
// contains filtered or unexported fields
}
S3Client fetches LFS blobs from S3-compatible storage.
type S3Reader ¶
type S3Reader interface {
Fetch(ctx context.Context, key string) ([]byte, error)
Stream(ctx context.Context, key string) (io.ReadCloser, int64, error)
}
S3Reader fetches LFS blobs from S3-compatible storage.
type StreamFetcher ¶
type StreamFetcher interface {
Stream(ctx context.Context, key string) (io.ReadCloser, int64, error)
}
StreamFetcher downloads LFS blobs as streams from storage.