kafka

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package kafka provides functionality for interacting with Apache Kafka.

The kafka package offers a simplified interface for working with Kafka message brokers, providing connection management, message publishing, and consuming capabilities with a focus on reliability and ease of use.

Architecture

The package follows the "accept interfaces, return structs" Go idiom:

  • Client interface: Defines the contract for Kafka operations
  • KafkaClient struct: Concrete implementation of the Client interface
  • Message interface: Defines the contract for consumed messages
  • Constructor returns *KafkaClient (concrete type)
  • FX module provides both *KafkaClient and Client interface

This design allows:

  • Direct usage: Use *KafkaClient for simple cases
  • Interface usage: Depend on Client interface for testability and flexibility
  • Zero adapters needed: Consumer code can use type aliases

Core Features:

  • Robust connection management with automatic reconnection
  • Simple publishing interface with error handling
  • Consumer interface with automatic commit handling
  • Consumer group support
  • Separate schema_registry package for Confluent Schema Registry
  • Integration with the Logger package for structured logging
  • Distributed tracing support via message headers

Basic Usage (Direct)

import (
	"github.com/Aleph-Alpha/std/v1/kafka"
	"context"
	"sync"
)

// Create a new Kafka client (returns concrete *KafkaClient)
client, err := kafka.NewClient(kafka.Config{
	Brokers:    []string{"localhost:9092"},
	Topic:      "events",
	GroupID:    "my-consumer-group",
	IsConsumer: true,
	EnableAutoCommit: false,  // Manual commit for safety
	DataType:   "json",       // Automatic JSON serializer (default)
})
if err != nil {
	log.Fatal("Failed to connect to Kafka", err)
}
defer client.GracefulShutdown()

// Publish a message (automatically serialized as JSON)
ctx := context.Background()
event := map[string]interface{}{"id": "123", "name": "John"}
err = client.Publish(ctx, "key", event)
if err != nil {
	log.Printf("Failed to publish message: %v", err)
}

FX Module Integration

The package provides an FX module that injects both concrete and interface types:

import (
	"github.com/Aleph-Alpha/std/v1/kafka"
	"go.uber.org/fx"
)

app := fx.New(
	kafka.FXModule,
	fx.Provide(
		func() kafka.Config {
			return kafka.Config{
				Brokers:    []string{"localhost:9092"},
				Topic:      "events",
				IsConsumer: false,
				DataType:   "json",
			}
		},
	),
	fx.Invoke(func(k kafka.Client) {
		// Use the Client interface
		ctx := context.Background()
		event := map[string]interface{}{"id": "123"}
		k.Publish(ctx, "key", event)
	}),
)
app.Run()

Consumer applications can use type aliases to avoid creating adapters:

// In your application's messaging package
package messaging

import stdKafka "github.com/Aleph-Alpha/std/v1/kafka"

// Type aliases reference std interfaces directly
type Client = stdKafka.Client
type Message = stdKafka.Message

Then use these aliases throughout your application:

func MyService(kafka messaging.Client) {
	kafka.Publish(ctx, "key", data)
}

Consuming Messages

wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
	// Process the message
	fmt.Printf("Received: %s\n", string(msg.Body()))

	// Commit the message
	if err := msg.CommitMsg(); err != nil {
		log.Printf("Failed to commit: %v", err)
	}
}

High-Throughput Consumption with Parallel Workers

For high-volume topics, use ConsumeParallel to process messages concurrently:

wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Use 5 concurrent workers for better throughput
msgChan := client.ConsumeParallel(ctx, wg, 5)
for msg := range msgChan {
	// Process messages concurrently
	processMessage(msg)

	// Commit the message
	if err := msg.CommitMsg(); err != nil {
		log.Printf("Failed to commit: %v", err)
	}
}

Distributed Tracing with Message Headers

This package supports distributed tracing by allowing you to propagate trace context through message headers, enabling end-to-end visibility across services.

Publisher Example (sending trace context):

import (
	"github.com/Aleph-Alpha/std/v1/tracer"
	// other imports...
)

// Create a tracer client
tracerClient := tracer.NewClient(tracerConfig, log)

// Create a span for the operation that includes publishing
ctx, span := tracerClient.StartSpan(ctx, "process-and-publish")
defer span.End()

// Process data...

// Extract trace context as headers before publishing
traceHeaders := tracerClient.GetCarrier(ctx)

// Publish with trace headers
err = kafkaClient.Publish(ctx, "key", message, traceHeaders)
if err != nil {
	span.RecordError(err)
	log.Error("Failed to publish message", err, nil)
}

Consumer Example (continuing the trace):

msgChan := kafkaClient.Consume(ctx, wg)
for msg := range msgChan {
	// Extract trace headers from the message
	headers := msg.Header()

	// Create a new context with the trace information
	ctx = tracerClient.SetCarrierOnContext(ctx, headers)

	// Create a span as a child of the incoming trace
	ctx, span := tracerClient.StartSpan(ctx, "process-message")
	defer span.End()

	// Add relevant attributes to the span
	span.SetAttributes(map[string]interface{}{
		"message.size": len(msg.Body()),
		"message.key":  msg.Key(),
	})

	// Process the message...

	if err := processMessage(msg.Body()) {
		// Record any errors in the span
		span.RecordError(err)
		continue
	}

	// Commit successful processing
	if err := msg.CommitMsg(); err != nil {
		span.RecordError(err)
		log.Error("Failed to commit message", err, nil)
	}
}

FX Module Integration:

This package provides a fx module for easy integration:

app := fx.New(
	logger.FXModule, // Optional: provides std logger
	kafka.FXModule,
	// ... other modules
)
app.Run()

The Kafka module will automatically use the logger if it's available in the dependency injection container.

Configuration:

The kafka client can be configured via environment variables or explicitly:

KAFKA_BROKERS=localhost:9092,localhost:9093
KAFKA_TOPIC=events
KAFKA_GROUP_ID=my-consumer-group

Custom Logger Integration:

You can integrate the std/v1/logger for better error logging:

import (
	"github.com/Aleph-Alpha/std/v1/logger"
	"github.com/Aleph-Alpha/std/v1/kafka"
)

// Create logger
log := logger.NewLoggerClient(logger.Config{
	Level:       logger.Info,
	ServiceName: "my-service",
})

// Create Kafka client with logger
client, err := kafka.NewClient(kafka.Config{
	Brokers:    []string{"localhost:9092"},
	Topic:      "events",
	Logger:     log, // Kafka internal errors will use this logger
	IsConsumer: false,
})

Commit Modes and Acknowledgment:

Consumer Commit Modes:

The module supports different commit modes for consumers:

Manual Commit (Recommended for safety):

client, err := kafka.NewClient(kafka.Config{
    Brokers:          []string{"localhost:9092"},
    Topic:            "events",
    GroupID:          "my-group",
    IsConsumer:       true,
    EnableAutoCommit: false,  // Manual commit (default)
})

msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
    var event UserEvent
    if err := msg.BodyAs(&event); err != nil {
        continue  // Don't commit on error
    }

    if err := processEvent(event); err != nil {
        continue  // Don't commit on processing error
    }

    // Commit only after successful processing
    if err := msg.CommitMsg(); err != nil {
        log.Error("Failed to commit", err, nil)
    }
}

Auto-Commit (For high-throughput, at-least-once semantics):

client, err := kafka.NewClient(kafka.Config{
    Brokers:          []string{"localhost:9092"},
    Topic:            "events",
    GroupID:          "my-group",
    IsConsumer:       true,
    EnableAutoCommit: true,           // Enable auto-commit
    CommitInterval:   1 * time.Second,  // Commit every second
})

msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
    // Process message - no need to call msg.CommitMsg()
    // Offsets are committed automatically every CommitInterval
    processEvent(msg)
}

Producer Acknowledgment Modes:

The module supports different producer acknowledgment modes:

Fire-and-Forget (Fastest, least safe):

client, err := kafka.NewClient(kafka.Config{
    Brokers:      []string{"localhost:9092"},
    Topic:        "events",
    IsConsumer:   false,
    RequiredAcks: kafka.RequireNone,  // No acknowledgment (0)
})

// Fast but no guarantee of delivery
err := client.Publish(ctx, "key", event)

Leader Acknowledgment (Balanced):

client, err := kafka.NewClient(kafka.Config{
    Brokers:      []string{"localhost:9092"},
    Topic:        "events",
    IsConsumer:   false,
    RequiredAcks: kafka.RequireOne,  // Leader only (1)
    WriteTimeout: 5 * time.Second,
})

// Balanced speed and durability
err := client.Publish(ctx, "key", event)

All Replicas Acknowledgment (Most durable, default):

client, err := kafka.NewClient(kafka.Config{
    Brokers:      []string{"localhost:9092"},
    Topic:        "events",
    IsConsumer:   false,
    RequiredAcks: kafka.RequireAll,  // All in-sync replicas (-1)
    WriteTimeout: 10 * time.Second,
})

// Slowest but most durable
err := client.Publish(ctx, "key", event)

Automatic Serializer Selection:

The Kafka client can automatically select serializers based on the DataType config. This eliminates the need to manually configure serializers for common use cases.

JSON (default):

client, err := kafka.NewClient(kafka.Config{
    Brokers:  []string{"localhost:9092"},
    Topic:    "events",
    DataType: "json",  // Auto-uses JSONSerializer
})

// Can now publish structs directly
event := UserEvent{Name: "John", Age: 30}
err = client.Publish(ctx, "key", event)  // Auto-serialized to JSON

String:

client, err := kafka.NewClient(kafka.Config{
    Brokers:  []string{"localhost:9092"},
    Topic:    "logs",
    DataType: "string",  // Auto-uses StringSerializer
})

err = client.Publish(ctx, "key", "log message")

Gob (Go binary encoding):

client, err := kafka.NewClient(kafka.Config{
    Brokers:  []string{"localhost:9092"},
    Topic:    "data",
    DataType: "gob",  // Auto-uses GobSerializer
})

data := MyStruct{Field1: "value", Field2: 123}
err = client.Publish(ctx, "key", data)

Raw bytes (no serialization):

client, err := kafka.NewClient(kafka.Config{
    Brokers:  []string{"localhost:9092"},
    Topic:    "binary",
    DataType: "bytes",  // No-op serializer
})

rawBytes := []byte{0x01, 0x02, 0x03}
err = client.Publish(ctx, "key", rawBytes)

Supported DataTypes:

  • "json" (default): JSONSerializer/Deserializer
  • "string": StringSerializer/Deserializer
  • "gob": GobSerializer/Deserializer
  • "bytes": NoOpSerializer/Deserializer
  • "protobuf": Requires custom serializer
  • "avro": Requires custom serializer

Serialization Support:

The kafka package provides optional serialization support for automatic encoding/decoding of messages. Serializers are injected via dependency injection (FX) for better testability.

Using JSON Serialization (with FX):

import (
    "go.uber.org/fx"
    "github.com/Aleph-Alpha/std/v1/kafka"
)

type UserEvent struct {
    Event  string `json:"event"`
    UserID int    `json:"user_id"`
    Email  string `json:"email"`
}

app := fx.New(
    kafka.FXModule,
    fx.Provide(
        func() kafka.Config {
            return kafka.Config{
                Brokers:    []string{"localhost:9092"},
                Topic:      "user-events",
                IsConsumer: false,
            }
        },
        // Inject serializers
        func() kafka.Serializer {
            return &kafka.JSONSerializer{}
        },
        func() kafka.Deserializer {
            return &kafka.JSONDeserializer{}
        },
    ),
)

// Publishing with automatic serialization
event := UserEvent{
    Event:  "signup",
    UserID: 123,
    Email:  "user@example.com",
}
err := client.Publish(ctx, "user-123", event)  // Automatically serialized

// Consuming with automatic deserialization
msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
    var event UserEvent
    if err := msg.BodyAs(&event); err != nil {  // Automatic JSON fallback
        log.Error("Failed to deserialize", err, nil)
        continue
    }
    fmt.Printf("User %d signed up: %s\n", event.UserID, event.Email)
    msg.CommitMsg()
}

Using Protocol Buffers (with FX):

import (
    "google.golang.org/protobuf/proto"
    "github.com/Aleph-Alpha/std/v1/kafka"
)

app := fx.New(
    kafka.FXModule,
    fx.Provide(
        func() kafka.Config { /* ... */ },
        // Inject Protobuf serializers
        func() kafka.Serializer {
            return &kafka.ProtobufSerializer{
                MarshalFunc: proto.Marshal,
            }
        },
        func() kafka.Deserializer {
            return &kafka.ProtobufDeserializer{
                UnmarshalFunc: proto.Unmarshal,
            }
        },
    ),
)

// Publish protobuf message
protoMsg := &pb.UserEvent{
    Event:  "signup",
    UserId: 123,
    Email:  "user@example.com",
}
err = client.Publish(ctx, "user-123", protoMsg)  // Automatically serialized

// Consume protobuf messages
msgChan := consumer.Consume(ctx, wg)
for msg := range msgChan {
    var event pb.UserEvent
    if err := msg.BodyAs(&event); err != nil{
        log.Error("Failed to deserialize", err, nil)
        continue
    }
    fmt.Printf("User %d signed up: %s\n", event.UserId, event.Email)
    msg.CommitMsg()
}

Using Raw Bytes (no serialization):

// No serializer needed - []byte is always passed through
client, err := kafka.NewClient(kafka.Config{
    Brokers:    []string{"localhost:9092"},
    Topic:      "events",
    IsConsumer: false,
})

// Publish raw bytes
message := []byte(`{"event":"signup","user_id":123}`)
err = client.Publish(ctx, "user-123", message)  // Passed through directly

Custom Serializers:

You can implement your own serializer for custom formats:

Apache Avro Example:

import "github.com/linkedin/goavro/v2"

codec, _ := goavro.NewCodec(`{
    "type": "record",
    "name": "UserEvent",
    "fields": [
        {"name": "event", "type": "string"},
        {"name": "user_id", "type": "int"}
    ]
}`)

client, err := kafka.NewClient(kafka.Config{
    Brokers: []string{"localhost:9092"},
    Topic:   "events",
    Serializer: &kafka.AvroSerializer{
        MarshalFunc: func(data interface{}) ([]byte, error) {
            return codec.BinaryFromNative(nil, data)
        },
    },
    Deserializer: &kafka.AvroDeserializer{
        UnmarshalFunc: func(data []byte, target interface{}) error {
            native, _, err := codec.NativeFromBinary(data)
            if err != nil {
                return err
            }
            if targetMap, ok := target.(*map[string]interface{}); ok {
                *targetMap = native.(map[string]interface{})
            }
            return nil
        },
    },
})

// Publish and consume work the same way
eventData := map[string]interface{}{"event": "signup", "user_id": 123}
err = client.Publish(ctx, "user-123", eventData)

Schema Registry Integration:

For production schema management with Confluent Schema Registry, use the separate schema_registry package:

import (
    "github.com/Aleph-Alpha/std/v1/kafka"
    "github.com/Aleph-Alpha/std/v1/schema_registry"
)

// Create schema registry client
registry, _ := schema_registry.NewClient(schema_registry.Config{
    URL: "http://localhost:8081",
})

// Create serializer with schema registry
serializer, _ := schema_registry.NewAvroSerializer(
    schema_registry.AvroSerializerConfig{
        Registry:    registry,
        Subject:     "users-value",
        Schema:      avroSchema,
        MarshalFunc: marshalFunc,
    },
)

// Inject via FX
fx.Provide(
    func() (schema_registry.Registry, error) {
        return schema_registry.NewClient(schema_registry.Config{
            URL: "http://localhost:8081",
        })
    },
    func(registry schema_registry.Registry) (kafka.Serializer, error) {
        return schema_registry.NewAvroSerializer(...)
    },
)

See the schema_registry package documentation for complete examples.

Multi-Format Support:

Use MultiFormatSerializer to support multiple formats:

multiSerializer := kafka.NewMultiFormatSerializer()
multiSerializer.RegisterSerializer("protobuf", &kafka.ProtobufSerializer{
    MarshalFunc: proto.Marshal,
})

client, err := kafka.NewClient(kafka.Config{
    Brokers:    []string{"localhost:9092"},
    Topic:      "events",
    Serializer: multiSerializer,
})

Built-in Serializers:

  • JSONSerializer: JSON format (default fallback)
  • ProtobufSerializer: Protocol Buffers
  • AvroSerializer: Apache Avro
  • StringSerializer: Plain text
  • GobSerializer: Go gob encoding (Go-to-Go only)
  • NoOpSerializer: Raw []byte passthrough
  • MultiFormatSerializer: Multiple formats with dynamic selection

Observability

The Kafka client supports optional observability through the Observer interface from std/v1/observability. This allows external code to track all Kafka operations for metrics, tracing, and logging without tight coupling.

Basic Observability Setup

To enable observability, provide an Observer implementation in the configuration:

import "github.com/Aleph-Alpha/std/v1/observability"

// Implement the Observer interface
type MyObserver struct {
    metrics *prometheus.Metrics
}

func (o *MyObserver) ObserveOperation(ctx observability.OperationContext) {
    if ctx.Component == "kafka" {
        switch ctx.Operation {
        case "produce":
            o.metrics.RecordKafkaPublish(
                ctx.Resource,  // topic name
                ctx.Size,      // message bytes
                ctx.Duration,  // operation duration
                ctx.Error,     // error if any
            )
        case "consume":
            o.metrics.RecordKafkaConsume(
                ctx.Resource,    // topic name
                ctx.SubResource, // partition
                ctx.Size,        // message bytes
                ctx.Duration,    // fetch duration
                ctx.Error,       // error if any
            )
        }
    }
}

// Configure Kafka client with observer
observer := &MyObserver{metrics: myMetrics}

client, err := kafka.NewClient(kafka.Config{
    Brokers:  []string{"localhost:9092"},
    Topic:    "events",
    Observer: observer, // ← Enable observability
})

Observed Operations

The observer is notified of the following operations:

Produce Operation:

  • Component: "kafka"
  • Operation: "produce"
  • Resource: Topic name (e.g., "user-events")
  • SubResource: "" (empty)
  • Duration: Time taken to publish message
  • Size: Message size in bytes
  • Error: Any error that occurred during publishing
  • Metadata: nil (reserved for future use)

Consume Operation:

  • Component: "kafka"
  • Operation: "consume"
  • Resource: Topic name (e.g., "user-events")
  • SubResource: Partition number (e.g., "3")
  • Duration: Time taken to fetch message from Kafka
  • Size: Message size in bytes
  • Error: Any error that occurred during consumption
  • Metadata: nil (reserved for future use)

Example OperationContext for Publish:

observability.OperationContext{
    Component:   "kafka",
    Operation:   "produce",
    Resource:    "user-events",
    SubResource: "",
    Duration:    12 * time.Millisecond,
    Size:        2048,
    Error:       nil,
}

Example OperationContext for Consume:

observability.OperationContext{
    Component:   "kafka",
    Operation:   "consume",
    Resource:    "user-events",
    SubResource: "3", // partition
    Duration:    8 * time.Millisecond,
    Size:        1024,
    Error:       nil,
}

Integration with Prometheus

Example: Track Kafka metrics with Prometheus:

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/Aleph-Alpha/std/v1/observability"
)

type KafkaObserver struct {
    publishTotal    *prometheus.CounterVec
    publishDuration *prometheus.HistogramVec
    publishBytes    *prometheus.CounterVec
    consumeTotal    *prometheus.CounterVec
    consumeDuration *prometheus.HistogramVec
    consumeBytes    *prometheus.CounterVec
}

func NewKafkaObserver() *KafkaObserver {
    return &KafkaObserver{
        publishTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "kafka_messages_published_total",
                Help: "Total number of messages published to Kafka",
            },
            []string{"topic", "status"},
        ),
        publishDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "kafka_publish_duration_seconds",
                Help:    "Duration of Kafka publish operations",
                Buckets: prometheus.DefBuckets,
            },
            []string{"topic"},
        ),
        publishBytes: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "kafka_publish_bytes_total",
                Help: "Total bytes published to Kafka",
            },
            []string{"topic"},
        ),
        consumeTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "kafka_messages_consumed_total",
                Help: "Total number of messages consumed from Kafka",
            },
            []string{"topic", "partition", "status"},
        ),
        consumeDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "kafka_consume_duration_seconds",
                Help:    "Duration of Kafka consume operations",
                Buckets: prometheus.DefBuckets,
            },
            []string{"topic"},
        ),
        consumeBytes: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "kafka_consume_bytes_total",
                Help: "Total bytes consumed from Kafka",
            },
            []string{"topic"},
        ),
    }
}

func (o *KafkaObserver) ObserveOperation(ctx observability.OperationContext) {
    if ctx.Component != "kafka" {
        return
    }

    status := "success"
    if ctx.Error != nil {
        status = "error"
    }

    switch ctx.Operation {
    case "produce":
        o.publishTotal.WithLabelValues(ctx.Resource, status).Inc()
        o.publishDuration.WithLabelValues(ctx.Resource).Observe(ctx.Duration.Seconds())
        if ctx.Error == nil {
            o.publishBytes.WithLabelValues(ctx.Resource).Add(float64(ctx.Size))
        }

    case "consume":
        o.consumeTotal.WithLabelValues(ctx.Resource, ctx.SubResource, status).Inc()
        o.consumeDuration.WithLabelValues(ctx.Resource).Observe(ctx.Duration.Seconds())
        if ctx.Error == nil {
            o.consumeBytes.WithLabelValues(ctx.Resource).Add(float64(ctx.Size))
        }
    }
}

FX Module with Observer

When using FX, you can inject an observer into the Kafka client:

import (
    "go.uber.org/fx"
    "github.com/Aleph-Alpha/std/v1/kafka"
    "github.com/Aleph-Alpha/std/v1/observability"
)

app := fx.New(
    kafka.FXModule,
    fx.Provide(
        // Provide your observer implementation
        func() observability.Observer {
            return NewKafkaObserver()
        },
        // Provide Kafka config with observer
        func(observer observability.Observer) kafka.Config {
            return kafka.Config{
                Brokers:  []string{"localhost:9092"},
                Topic:    "events",
                Observer: observer, // Inject observer
            }
        },
    ),
)
app.Run()

No-Op Observer

If you don't provide an observer, there's zero overhead:

client, err := kafka.NewClient(kafka.Config{
    Brokers:  []string{"localhost:9092"},
    Topic:    "events",
    Observer: nil, // ← No observer, no overhead
})

The client checks if Observer is nil before calling it, so there's no performance impact when observability is disabled.

Thread Safety

All methods on the Kafka type are safe for concurrent use by multiple goroutines, except for Close() which should only be called once.

Index

Constants

View Source
const (
	DefaultMinBytes         = 1
	DefaultMaxBytes         = 10e6 // 10MB
	DefaultMaxWait          = 10 * time.Second
	DefaultCommitInterval   = 1 * time.Second
	DefaultStartOffset      = -2 // FirstOffset
	DefaultPartition        = -1 // Automatic partition assignment
	DefaultRequiredAcks     = -1 // WaitForAll
	DefaultBatchSize        = 100
	DefaultBatchTimeout     = 1 * time.Second
	DefaultRebalanceTimeout = 30 * time.Second
	DefaultMaxAttempts      = 10
	DefaultWriteTimeout     = 10 * time.Second

	// Producer acknowledgment modes
	RequireNone = 0  // Fire-and-forget (no acknowledgment)
	RequireOne  = 1  // Wait for leader only
	RequireAll  = -1 // Wait for all in-sync replicas (most durable)

	// Consumer offset modes
	FirstOffset = -2 // Start from the beginning
	LastOffset  = -1 // Start from the end
)

Default values for configuration

Variables

View Source
var (
	// ErrConnectionFailed is returned when connection to Kafka cannot be established
	ErrConnectionFailed = errors.New("connection failed")

	// ErrConnectionLost is returned when connection to Kafka is lost
	ErrConnectionLost = errors.New("connection lost")

	// ErrBrokerNotAvailable is returned when broker is not available
	ErrBrokerNotAvailable = errors.New("broker not available")

	// ErrReplicaNotAvailable is returned when replica is not available
	ErrReplicaNotAvailable = errors.New("replica not available")

	// ErrAuthenticationFailed is returned when authentication fails
	ErrAuthenticationFailed = errors.New("authentication failed")

	// ErrAuthorizationFailed is returned when authorization fails
	ErrAuthorizationFailed = errors.New("authorization failed")

	// ErrInvalidCredentials is returned when credentials are invalid
	ErrInvalidCredentials = errors.New("invalid credentials")

	// ErrTopicNotFound is returned when topic doesn't exist
	ErrTopicNotFound = errors.New("topic not found")

	// ErrTopicAlreadyExists is returned when topic already exists
	ErrTopicAlreadyExists = errors.New("topic already exists")

	// ErrPartitionNotFound is returned when partition doesn't exist
	ErrPartitionNotFound = errors.New("partition not found")

	// ErrInvalidPartition is returned when partition is invalid
	ErrInvalidPartition = errors.New("invalid partition")

	// ErrGroupNotFound is returned when consumer group doesn't exist
	ErrGroupNotFound = errors.New("consumer group not found")

	// ErrGroupCoordinatorNotAvailable is returned when group coordinator is not available
	ErrGroupCoordinatorNotAvailable = errors.New("group coordinator not available")

	// ErrNotGroupCoordinator is returned when broker is not the group coordinator
	ErrNotGroupCoordinator = errors.New("not group coordinator")

	// ErrInvalidGroupID is returned when group ID is invalid
	ErrInvalidGroupID = errors.New("invalid group id")

	// ErrUnknownMemberID is returned when member ID is unknown
	ErrUnknownMemberID = errors.New("unknown member id")

	// ErrInvalidSessionTimeout is returned when session timeout is invalid
	ErrInvalidSessionTimeout = errors.New("invalid session timeout")

	// ErrRebalanceInProgress is returned when rebalance is in progress
	ErrRebalanceInProgress = errors.New("rebalance in progress")

	// ErrInvalidCommitOffset is returned when commit offset is invalid
	ErrInvalidCommitOffset = errors.New("invalid commit offset")

	// ErrMessageTooLarge is returned when message exceeds size limits
	ErrMessageTooLarge = errors.New("message too large")

	// ErrInvalidMessage is returned when message format is invalid
	ErrInvalidMessage = errors.New("invalid message")

	// ErrInvalidMessageSize is returned when message size is invalid
	ErrInvalidMessageSize = errors.New("invalid message size")

	// ErrOffsetOutOfRange is returned when offset is out of range
	ErrOffsetOutOfRange = errors.New("offset out of range")

	// ErrInvalidFetchSize is returned when fetch size is invalid
	ErrInvalidFetchSize = errors.New("invalid fetch size")

	// ErrLeaderNotAvailable is returned when leader is not available
	ErrLeaderNotAvailable = errors.New("leader not available")

	// ErrNotLeaderForPartition is returned when broker is not the leader for partition
	ErrNotLeaderForPartition = errors.New("not leader for partition")

	// ErrRequestTimedOut is returned when request times out
	ErrRequestTimedOut = errors.New("request timed out")

	// ErrNetworkError is returned for network-related errors
	ErrNetworkError = errors.New("network error")

	// ErrProducerFenced is returned when producer is fenced
	ErrProducerFenced = errors.New("producer fenced")

	// ErrTransactionCoordinatorFenced is returned when transaction coordinator is fenced
	ErrTransactionCoordinatorFenced = errors.New("transaction coordinator fenced")

	// ErrInvalidTransactionState is returned when transaction state is invalid
	ErrInvalidTransactionState = errors.New("invalid transaction state")

	// ErrInvalidProducerEpoch is returned when producer epoch is invalid
	ErrInvalidProducerEpoch = errors.New("invalid producer epoch")

	// ErrInvalidTxnState is returned when transaction state is invalid
	ErrInvalidTxnState = errors.New("invalid transaction state")

	// ErrInvalidProducerIDMapping is returned when producer ID mapping is invalid
	ErrInvalidProducerIDMapping = errors.New("invalid producer id mapping")

	// ErrInvalidTransaction is returned when transaction is invalid
	ErrInvalidTransaction = errors.New("invalid transaction")

	// ErrUnsupportedVersion is returned when version is not supported
	ErrUnsupportedVersion = errors.New("unsupported version")

	// ErrUnsupportedForMessageFormat is returned when message format is not supported
	ErrUnsupportedForMessageFormat = errors.New("unsupported for message format")

	// ErrInvalidRequest is returned when request is invalid
	ErrInvalidRequest = errors.New("invalid request")

	// ErrInvalidConfig is returned when configuration is invalid
	ErrInvalidConfig = errors.New("invalid config")

	// ErrNotController is returned when broker is not the controller
	ErrNotController = errors.New("not controller")

	// ErrInvalidReplicationFactor is returned when replication factor is invalid
	ErrInvalidReplicationFactor = errors.New("invalid replication factor")

	// ErrInvalidReplica is returned when replica is invalid
	ErrInvalidReplica = errors.New("invalid replica")

	// ErrReplicaNotAvailableError is returned when replica is not available
	ErrReplicaNotAvailableError = errors.New("replica not available")

	// ErrPolicyViolation is returned when policy is violated
	ErrPolicyViolation = errors.New("policy violation")

	// ErrOutOfOrderSequence is returned when sequence is out of order
	ErrOutOfOrderSequence = errors.New("out of order sequence")

	// ErrDuplicateSequence is returned when sequence is duplicated
	ErrDuplicateSequence = errors.New("duplicate sequence")

	// ErrTooManyInFlightRequests is returned when there are too many in-flight requests
	ErrTooManyInFlightRequests = errors.New("too many in-flight requests")

	// ErrWriterNotInitialized is returned when writer is not initialized
	ErrWriterNotInitialized = errors.New("writer not initialized")

	// ErrReaderNotInitialized is returned when reader is not initialized
	ErrReaderNotInitialized = errors.New("reader not initialized")

	// ErrContextCanceled is returned when context is canceled
	ErrContextCanceled = errors.New("context canceled")

	// ErrContextDeadlineExceeded is returned when context deadline is exceeded
	ErrContextDeadlineExceeded = errors.New("context deadline exceeded")

	// ErrUnknownError is returned for unknown/unhandled errors
	ErrUnknownError = errors.New("unknown error")
)

Common Kafka error types that can be used by consumers of this package. These provide a standardized set of errors that abstract away the underlying Kafka-specific error details.

View Source
var FXModule = fx.Module("kafka",
	fx.Provide(
		NewClientWithDI,

		fx.Annotate(
			func(k *KafkaClient) Client { return k },
			fx.As(new(Client)),
		),
	),
	fx.Invoke(RegisterKafkaLifecycle),
)

FXModule is an fx.Module that provides and configures the Kafka client. This module registers the Kafka client with the Fx dependency injection framework, making it available to other components in the application.

The module provides: 1. *KafkaClient (concrete type) for direct use 2. Client interface for dependency injection 3. Lifecycle management for graceful startup and shutdown

Usage:

app := fx.New(
    kafka.FXModule,
    // other modules...
)

Functions

func RegisterKafkaLifecycle

func RegisterKafkaLifecycle(params KafkaLifecycleParams)

RegisterKafkaLifecycle registers the Kafka client with the fx lifecycle system. This function sets up proper initialization and graceful shutdown of the Kafka client.

Parameters:

  • params: The lifecycle parameters containing the Kafka client

The function:

  1. On application start: Ensures the client is ready for use
  2. On application stop: Triggers a graceful shutdown of the Kafka client, closing producers and consumers cleanly.

This ensures that the Kafka client remains available throughout the application's lifetime and is properly cleaned up during shutdown.

func ValidateDataType

func ValidateDataType(dataType string, hasSerializer, hasDeserializer bool) error

ValidateDataType checks if the provided DataType is supported Returns error if DataType requires custom serializer but none is provided

Types

type AvroDeserializer

type AvroDeserializer struct {
	// UnmarshalFunc is the function used to unmarshal from Avro format.
	UnmarshalFunc func([]byte, interface{}) error
}

AvroDeserializer implements Deserializer for Apache Avro format.

func (*AvroDeserializer) Deserialize

func (a *AvroDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize converts Avro bytes to the target structure.

type AvroSerializer

type AvroSerializer struct {
	// MarshalFunc is the function used to marshal to Avro format.
	MarshalFunc func(interface{}) ([]byte, error)
}

AvroSerializer implements Serializer for Apache Avro format.

This serializer requires an external Avro library. It works with any type that implements the AvroMarshaler interface or you can provide a custom MarshalFunc.

Example with linkedin/goavro:

codec, _ := goavro.NewCodec(schemaJSON)
serializer := &kafka.AvroSerializer{
    MarshalFunc: func(data interface{}) ([]byte, error) {
        return codec.BinaryFromNative(nil, data)
    },
}

func (*AvroSerializer) Serialize

func (a *AvroSerializer) Serialize(data interface{}) ([]byte, error)

Serialize converts data to Avro bytes.

type Client added in v0.12.0

type Client interface {

	// Publish sends a single message to Kafka with the specified key and body.
	// The body is serialized using the configured serializer.
	// Optional headers can be provided as the last parameter.
	Publish(ctx context.Context, key string, data interface{}, headers ...map[string]interface{}) error

	// Consume starts consuming messages from Kafka with a single worker.
	// Returns a channel that delivers consumed messages.
	Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message

	// ConsumeParallel starts consuming messages with multiple concurrent workers.
	// Provides better throughput for high-volume topics.
	ConsumeParallel(ctx context.Context, wg *sync.WaitGroup, numWorkers int) <-chan Message

	// Deserialize converts a Message to an object using the configured deserializer.
	Deserialize(msg Message, target interface{}) error

	// SetSerializer sets the serializer for outgoing messages.
	SetSerializer(s Serializer)

	// SetDeserializer sets the deserializer for incoming messages.
	SetDeserializer(d Deserializer)

	// SetDefaultSerializers configures default serializers based on the DataType config.
	SetDefaultSerializers()

	// TranslateError translates Kafka errors to more user-friendly messages.
	TranslateError(err error) error

	// IsRetryableError checks if an error can be retried.
	IsRetryableError(err error) bool

	// IsTemporaryError checks if an error is temporary.
	IsTemporaryError(err error) bool

	// IsPermanentError checks if an error is permanent.
	IsPermanentError(err error) bool

	// IsAuthenticationError checks if an error is authentication-related.
	IsAuthenticationError(err error) bool

	// GracefulShutdown closes all Kafka connections cleanly.
	GracefulShutdown()
}

Client provides a high-level interface for interacting with Apache Kafka. It abstracts producer and consumer operations with a simplified API.

This interface is implemented by the concrete *KafkaClient type.

type Config

type Config struct {
	// Brokers is a list of Kafka broker addresses
	Brokers []string

	// Topic is the Kafka topic to publish to or consume from
	Topic string

	// GroupID is the consumer group ID for coordinated consumption
	// Only used when IsConsumer is true
	GroupID string

	// IsConsumer determines whether this client will act as a consumer
	// Set to true for consumers, false for publishers
	IsConsumer bool

	// MinBytes is the minimum number of bytes to fetch in a single request
	// Default: 1 byte
	MinBytes int

	// MaxBytes is the maximum number of bytes to fetch in a single request
	// Default: 10MB
	MaxBytes int

	// MaxWait is the maximum amount of time to wait for MinBytes to become available
	// Default: 10s
	MaxWait time.Duration

	// CommitInterval is how often to commit offsets automatically
	// Only used when EnableAutoCommit is true
	// Default: 1s
	CommitInterval time.Duration

	// EnableAutoCommit determines whether offsets are committed automatically
	// When true, offsets are committed at CommitInterval
	// When false, you must call msg.CommitMsg() manually
	// Default: false (manual commit for safety)
	EnableAutoCommit bool

	// EnableAutoOffsetStore determines whether to automatically store offsets
	// When true with EnableAutoCommit false, offsets are stored but not committed
	// This allows you to control commit timing while still tracking progress
	// Default: true
	EnableAutoOffsetStore bool

	// StartOffset determines where to start consuming from when there's no committed offset
	// Options: FirstOffset (-2), LastOffset (-1)
	// Default: FirstOffset
	StartOffset int64

	// Partition is the partition to produce to or consume from
	// Set to -1 for automatic partition assignment
	// Default: -1
	Partition int

	// RequiredAcks determines how many replica acknowledgments to wait for
	// Options:
	//   RequireNone (0): Don't wait for acknowledgment (fire-and-forget, fastest but least safe)
	//   RequireOne (1): Wait for leader only (balance of speed and durability)
	//   RequireAll (-1): Wait for all in-sync replicas (slowest but most durable)
	// Default: RequireAll (-1)
	RequiredAcks int

	// WriteTimeout is the timeout for write operations
	// If RequiredAcks > 0, this is how long to wait for acknowledgment
	// Default: 10s
	WriteTimeout time.Duration

	// Async enables async write mode for producer
	// When true, writes are batched and sent asynchronously
	// Default: false
	Async bool

	// BatchSize is the maximum number of messages to batch together
	// Only used when Async is true
	// Default: 100
	BatchSize int

	// BatchTimeout is the maximum time to wait before sending a batch
	// Only used when Async is true
	// Default: 1s
	BatchTimeout time.Duration

	// CompressionCodec specifies the compression algorithm to use
	// Options: nil (no compression), gzip, snappy, lz4, zstd
	// Default: nil
	CompressionCodec string

	// MaxAttempts is the maximum number of attempts to deliver a message
	// Default: 10
	MaxAttempts int

	// AllowAutoTopicCreation determines whether to allow automatic topic creation
	// Default: false
	AllowAutoTopicCreation bool

	// TLS contains TLS/SSL configuration
	TLS TLSConfig

	// SASL contains SASL authentication configuration
	SASL SASLConfig

	// DataType specifies the default data type for automatic serializer selection
	// When no explicit serializer is provided, the client will use a default serializer
	// based on this type.
	// Options: "json" (default), "string", "protobuf", "avro", "gob", "bytes"
	// Default: "json"
	DataType string
}

Config defines the top-level configuration structure for the Kafka client. It contains all the necessary configuration sections for establishing connections, setting up producers and consumers.

type ConsumerMessage

type ConsumerMessage struct {
	// contains filtered or unexported fields
}

ConsumerMessage implements the Message interface and wraps a Kafka message.

func (*ConsumerMessage) Body

func (cm *ConsumerMessage) Body() []byte

Body returns the message payload as a byte slice.

func (*ConsumerMessage) BodyAs

func (cm *ConsumerMessage) BodyAs(target interface{}) error

BodyAs deserializes the message body into the target structure. It uses the configured Deserializer, or falls back to JSONDeserializer if none is configured.

This is a convenience method that makes consuming messages more intuitive:

msgChan := consumer.Consume(ctx, wg)
for msg := range msgChan {
    var event UserEvent
    if err := msg.BodyAs(&event); err != nil {
        log.Printf("Failed to deserialize: %v", err)
        continue
    }
    fmt.Printf("Event: %s, UserID: %d\n", event.Event, event.UserID)
    msg.CommitMsg()
}

func (*ConsumerMessage) CommitMsg

func (cm *ConsumerMessage) CommitMsg() error

CommitMsg commits the message, informing Kafka that the message has been successfully processed.

Returns an error if the commit fails.

func (*ConsumerMessage) Header

func (cm *ConsumerMessage) Header() map[string]interface{}

Header returns the headers associated with the message.

func (*ConsumerMessage) Key

func (cm *ConsumerMessage) Key() string

Key returns the message key as a string.

func (*ConsumerMessage) Offset

func (cm *ConsumerMessage) Offset() int64

Offset returns the offset of this message.

func (*ConsumerMessage) Partition

func (cm *ConsumerMessage) Partition() int

Partition returns the partition this message came from.

type Deserializer

type Deserializer interface {
	// Deserialize converts a byte slice into the target data structure
	Deserialize(data []byte, target interface{}) error
}

Deserializer defines the interface for deserializing data received from Kafka. Implementations can provide custom deserialization logic.

type GobDeserializer

type GobDeserializer struct{}

GobDeserializer implements Deserializer using Go's gob decoding.

func (*GobDeserializer) Deserialize

func (g *GobDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize converts gob bytes to the target structure.

type GobSerializer

type GobSerializer struct{}

GobSerializer implements Serializer using Go's gob encoding. This is useful for Go-to-Go communication where both sides use the same types.

Note: Gob encoding is Go-specific and not interoperable with other languages.

func (*GobSerializer) Serialize

func (g *GobSerializer) Serialize(data interface{}) ([]byte, error)

Serialize converts data to gob bytes.

type JSONDeserializer

type JSONDeserializer struct{}

JSONDeserializer implements Deserializer using JSON decoding. This is the default deserializer provided by the Kafka module.

func (*JSONDeserializer) Deserialize

func (j *JSONDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize converts JSON bytes to the target structure.

type JSONSerializer

type JSONSerializer struct{}

JSONSerializer implements Serializer using JSON encoding. This is the default serializer provided by the Kafka module.

Features:

  • Handles any Go type that can be marshaled to JSON
  • Automatically passes through []byte without modification
  • Thread-safe

func (*JSONSerializer) Serialize

func (j *JSONSerializer) Serialize(data interface{}) ([]byte, error)

Serialize converts data to JSON bytes.

type KafkaClient added in v0.12.0

type KafkaClient struct {
	// contains filtered or unexported fields
}

KafkaClient represents a client for interacting with Apache Kafka. It manages connections and provides methods for publishing and consuming messages.

KafkaClient implements the Client interface.

func NewClient

func NewClient(cfg Config) (*KafkaClient, error)

NewClient creates and initializes a new KafkaClient with the provided configuration. This function sets up the producer and/or consumer based on the configuration.

Parameters:

  • cfg: Configuration for connecting to Kafka

Returns a new KafkaClient instance that is ready to use.

Example:

client, err := kafka.NewClient(config)
if err != nil {
	log.Printf("ERROR: failed to create Kafka client: %v", err)
	return nil, err
}
defer client.GracefulShutdown()

func NewClientWithDI

func NewClientWithDI(params KafkaParams) (*KafkaClient, error)

NewClientWithDI creates a new Kafka client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where dependencies are automatically provided via the KafkaParams struct.

Parameters:

  • params: A KafkaParams struct that contains the Config instance and optionally a Logger, Serializer, Deserializer, and Observer instances required to initialize the Kafka client. This struct embeds fx.In to enable automatic injection of these dependencies.

Returns:

  • *KafkaClient: A fully initialized Kafka client ready for use.

Example usage with fx:

app := fx.New(
    kafka.FXModule,
    logger.FXModule,  // Optional: provides logger
    fx.Provide(
        func() kafka.Config {
            return loadKafkaConfig() // Your config loading function
        },
        func() kafka.Serializer {
            return &kafka.JSONSerializer{}  // Or ProtobufSerializer, etc.
        },
        func() kafka.Deserializer {
            return &kafka.JSONDeserializer{}
        },
        func(metrics *prometheus.Metrics) observability.Observer {
            return &MyObserver{metrics: metrics}  // Optional observer
        },
    ),
)

Under the hood, this function injects the optional logger, serializer, deserializer, and observer before delegating to the standard NewClient function.

func (*KafkaClient) Consume added in v0.12.0

func (k *KafkaClient) Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message

Consume starts consuming messages from the topic specified in the configuration. This method provides a channel where consumed messages will be delivered.

Parameters:

  • ctx: Context for cancellation control
  • wg: WaitGroup for coordinating shutdown

Returns a channel that delivers Message interfaces for each consumed message.

Example:

wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

msgChan := kafkaClient.Consume(ctx, wg)
for msg := range msgChan {
    // Option 1: Use BodyAs for automatic deserialization
    var event UserEvent
    if err := msg.BodyAs(&event); err != nil {
        log.Printf("Failed to deserialize: %v", err)
        continue
    }

    // Option 2: Use Body() for raw bytes
    rawBytes := msg.Body()
    fmt.Println("Received:", string(rawBytes))

    // Commit successful processing
    if err := msg.CommitMsg(); err != nil {
        log.Printf("Failed to commit message: %v", err)
    }
}

func (*KafkaClient) ConsumeParallel added in v0.12.0

func (k *KafkaClient) ConsumeParallel(ctx context.Context, wg *sync.WaitGroup, numWorkers int) <-chan Message

ConsumeParallel starts consuming messages from the topic with multiple concurrent goroutines. This method provides better throughput for high-volume topics by processing messages in parallel.

Parameters:

  • ctx: Context for cancellation control
  • wg: WaitGroup for coordinating shutdown
  • numWorkers: Number of concurrent goroutines to use for consuming (recommended: 1-10)

Returns a channel that delivers Message interfaces for each consumed message.

Example:

wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Use 5 concurrent workers for high throughput
msgChan := kafkaClient.ConsumeParallel(ctx, wg, 5)
for msg := range msgChan {
    // Process the message
    fmt.Println("Received:", string(msg.Body()))

    // Commit successful processing
    if err := msg.CommitMsg(); err != nil {
        log.Printf("Failed to commit message: %v", err)
    }
}

func (*KafkaClient) Deserialize added in v0.12.0

func (k *KafkaClient) Deserialize(msg Message, target interface{}) error

Deserialize is a helper method to deserialize a message body. It automatically uses the injected Deserializer or falls back to JSONDeserializer.

Parameters:

  • msg: The message to deserialize
  • target: Pointer to the target structure to deserialize into

Returns an error if deserialization fails.

Example with FX-injected deserializer:

// In your FX app:
fx.Provide(func() kafka.Deserializer {
    return &kafka.JSONDeserializer{}
})

// Usage:
msgChan := kafkaClient.Consume(ctx, wg)
for msg := range msgChan {
    var event UserEvent
    if err := kafkaClient.Deserialize(msg, &event); err != nil {
        log.Printf("Failed to deserialize: %v", err)
        continue
    }
    fmt.Printf("Event: %s, UserID: %d\n", event.Event, event.UserID)
    msg.CommitMsg()
}

func (*KafkaClient) GracefulShutdown added in v0.12.0

func (k *KafkaClient) GracefulShutdown()

GracefulShutdown closes the Kafka client's connections cleanly. This method ensures that all resources are properly released when the application is shutting down.

The shutdown process: 1. Signals all goroutines to stop by closing the shutdownSignal channel 2. Closes the writer if it exists 3. Closes the reader if it exists

Any errors during shutdown are logged but not propagated, as they typically cannot be handled at this stage of application shutdown.

func (*KafkaClient) IsAuthenticationError added in v0.12.0

func (k *KafkaClient) IsAuthenticationError(err error) bool

IsAuthenticationError returns true if the error is authentication-related

func (*KafkaClient) IsPermanentError added in v0.12.0

func (k *KafkaClient) IsPermanentError(err error) bool

IsPermanentError returns true if the error is permanent and should not be retried

func (*KafkaClient) IsRetryableError added in v0.12.0

func (k *KafkaClient) IsRetryableError(err error) bool

IsRetryableError returns true if the error is retryable

func (*KafkaClient) IsTemporaryError added in v0.12.0

func (k *KafkaClient) IsTemporaryError(err error) bool

IsTemporaryError returns true if the error is temporary

func (*KafkaClient) Publish added in v0.12.0

func (k *KafkaClient) Publish(ctx context.Context, key string, data interface{}, headers ...map[string]interface{}) error

Publish sends a message to the Kafka topic specified in the configuration. This method is thread-safe and respects context cancellation.

Parameters:

  • ctx: Context for cancellation control
  • key: Message key for partitioning
  • msg: Message payload as a byte slice
  • headers: Optional message headers as a map of key-value pairs; can be used for metadata and distributed tracing propagation

The headers parameter is particularly useful for distributed tracing, allowing trace context to be propagated across service boundaries through message queues. When using with the tracer package, you can extract trace headers and include them in the message:

traceHeaders := tracerClient.GetCarrier(ctx)
err := kafkaClient.Publish(ctx, "key", message, traceHeaders)

Returns an error if publishing fails or if the context is canceled.

Example:

ctx := context.Background()
message := []byte("Hello, Kafka!")

// Basic publishing without headers
err := kafkaClient.Publish(ctx, "my-key", message, nil)
if err != nil {
    log.Printf("Failed to publish message: %v", err)
}

Example with distributed tracing:

// Create a span for the publish operation
ctx, span := tracer.StartSpan(ctx, "publish-message")
defer span.End()

// Extract trace context to include in the message headers
traceHeaders := tracerClient.GetCarrier(ctx)

// Publish the message with trace headers
err := kafkaClient.Publish(ctx, "my-key", message, traceHeaders)
if err != nil {
    span.RecordError(err)
    log.Printf("Failed to publish message: %v", err)
}

func (*KafkaClient) SetDefaultSerializers added in v0.12.0

func (k *KafkaClient) SetDefaultSerializers()

SetDefaultSerializers sets default serializers on the Kafka client based on config DataType This is called automatically during client creation if no serializers are provided

func (*KafkaClient) SetDeserializer added in v0.12.0

func (k *KafkaClient) SetDeserializer(d Deserializer)

SetDeserializer sets the deserializer for the Kafka client. This is typically called by the FX module during initialization.

func (*KafkaClient) SetSerializer added in v0.12.0

func (k *KafkaClient) SetSerializer(s Serializer)

SetSerializer sets the serializer for the Kafka client. This is typically called by the FX module during initialization.

func (*KafkaClient) TranslateError added in v0.12.0

func (k *KafkaClient) TranslateError(err error) error

TranslateError converts Kafka-specific errors into standardized application errors. This function provides abstraction from the underlying Kafka implementation details, allowing application code to handle errors in a Kafka-agnostic way.

It maps common Kafka errors to the standardized error types defined above. If an error doesn't match any known type, it's returned unchanged.

func (*KafkaClient) WithDeserializer added in v0.13.0

func (k *KafkaClient) WithDeserializer(deserializer Deserializer) *KafkaClient

WithDeserializer attaches a deserializer to the Kafka client for decoding messages. This method uses the builder pattern and returns the client for method chaining.

The deserializer will be used to decode messages when consuming from Kafka. If not set, msg.BodyAs() will use JSONDeserializer as a fallback.

This is useful for non-FX usage where you want to set deserializers after creating the client. When using FX, deserializers can be injected via NewClientWithDI.

Example:

client, err := kafka.NewClient(config)
if err != nil {
    return err
}
client = client.WithDeserializer(&kafka.JSONDeserializer{})
defer client.GracefulShutdown()

func (*KafkaClient) WithLogger added in v0.13.0

func (k *KafkaClient) WithLogger(logger Logger) *KafkaClient

WithLogger attaches a logger to the Kafka client for internal logging. This method uses the builder pattern and returns the client for method chaining.

The logger will be used for lifecycle events, background worker logs, and cleanup errors. This is particularly useful for debugging and monitoring consumer worker behavior.

This is useful for non-FX usage where you want to enable logging after creating the client. When using FX, the logger is automatically injected via NewClientWithDI.

Example:

client, err := kafka.NewClient(config)
if err != nil {
    return err
}
client = client.WithLogger(myLogger)
defer client.GracefulShutdown()

func (*KafkaClient) WithObserver added in v0.13.0

func (k *KafkaClient) WithObserver(observer observability.Observer) *KafkaClient

WithObserver attaches an observer to the Kafka client for tracking operations. This method uses the builder pattern and returns the client for method chaining.

The observer will be notified of all produce and consume operations, allowing external code to collect metrics, create traces, or log operations.

This is useful for non-FX usage where you want to enable observability after creating the client. When using FX, use NewClientWithDI instead, which automatically injects the observer.

Example:

client, err := kafka.NewClient(config)
if err != nil {
    return err
}
client = client.WithObserver(myObserver)
defer client.GracefulShutdown()

func (*KafkaClient) WithSerializer added in v0.13.0

func (k *KafkaClient) WithSerializer(serializer Serializer) *KafkaClient

WithSerializer attaches a serializer to the Kafka client for encoding messages. This method uses the builder pattern and returns the client for method chaining.

The serializer will be used to encode messages before publishing to Kafka. If not set, you can only publish []byte data directly.

This is useful for non-FX usage where you want to set serializers after creating the client. When using FX, serializers can be injected via NewClientWithDI.

Example:

client, err := kafka.NewClient(config)
if err != nil {
    return err
}
client = client.WithSerializer(&kafka.JSONSerializer{})
defer client.GracefulShutdown()

type KafkaLifecycleParams

type KafkaLifecycleParams struct {
	fx.In

	Lifecycle fx.Lifecycle
	Client    *KafkaClient
}

KafkaLifecycleParams groups the dependencies needed for Kafka lifecycle management

type KafkaParams

type KafkaParams struct {
	fx.In

	Config       Config
	Logger       Logger                 `optional:"true"` // Optional logger from std/v1/logger
	Serializer   Serializer             `optional:"true"` // Optional serializer
	Deserializer Deserializer           `optional:"true"` // Optional deserializer
	Observer     observability.Observer `optional:"true"` // Optional observer for metrics/tracing
}

KafkaParams groups the dependencies needed to create a Kafka client

type Logger

type Logger interface {
	// InfoWithContext logs an informational message with trace context.
	InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

	// WarnWithContext logs a warning message with trace context.
	WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

	// ErrorWithContext logs an error message with trace context.
	ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}

Logger is an interface that matches the std/v1/logger.Logger interface. It provides context-aware structured logging with optional error and field parameters.

type Message

type Message interface {
	// CommitMsg commits the message, informing Kafka that the message
	// has been successfully processed.
	CommitMsg() error

	// Body returns the message payload as a byte slice.
	Body() []byte

	// BodyAs deserializes the message body into the target structure.
	// It uses the configured Deserializer, or falls back to JSONDeserializer.
	// This is a convenience method equivalent to calling Deserialize(msg, target).
	BodyAs(target interface{}) error

	// Key returns the message key as a string.
	Key() string

	// Header returns the headers associated with the message.
	Header() map[string]interface{}

	// Partition returns the partition this message came from.
	Partition() int

	// Offset returns the offset of this message.
	Offset() int64
}

Message defines the interface for consumed messages from Kafka. This interface abstracts the underlying Kafka message structure and provides methods for committing messages.

type MultiFormatDeserializer

type MultiFormatDeserializer struct {
	// DefaultFormat is the format to use when not specified (default: "json")
	DefaultFormat string

	// Deserializers maps format names to their deserializer implementations
	Deserializers map[string]Deserializer
}

MultiFormatDeserializer supports multiple deserialization formats.

func NewMultiFormatDeserializer

func NewMultiFormatDeserializer() *MultiFormatDeserializer

NewMultiFormatDeserializer creates a new multi-format deserializer with default formats.

func (*MultiFormatDeserializer) Deserialize

func (m *MultiFormatDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize converts data using the appropriate deserializer based on format.

func (*MultiFormatDeserializer) DeserializeWithFormat

func (m *MultiFormatDeserializer) DeserializeWithFormat(format string, data []byte, target interface{}) error

DeserializeWithFormat explicitly specifies the format to use.

func (*MultiFormatDeserializer) RegisterDeserializer

func (m *MultiFormatDeserializer) RegisterDeserializer(format string, deserializer Deserializer)

RegisterDeserializer adds a custom deserializer for a specific format.

type MultiFormatSerializer

type MultiFormatSerializer struct {
	// DefaultFormat is the format to use when not specified (default: "json")
	DefaultFormat string

	// Serializers maps format names to their serializer implementations
	Serializers map[string]Serializer
}

MultiFormatSerializer supports multiple serialization formats with automatic detection. It can handle JSON, Protobuf, Avro, and custom formats based on configuration or content type.

func NewMultiFormatSerializer

func NewMultiFormatSerializer() *MultiFormatSerializer

NewMultiFormatSerializer creates a new multi-format serializer with default formats.

func (*MultiFormatSerializer) RegisterSerializer

func (m *MultiFormatSerializer) RegisterSerializer(format string, serializer Serializer)

RegisterSerializer adds a custom serializer for a specific format.

func (*MultiFormatSerializer) Serialize

func (m *MultiFormatSerializer) Serialize(data interface{}) ([]byte, error)

Serialize converts data using the appropriate serializer based on format. The format can be specified as a prefix in the form "format:data" or uses DefaultFormat.

func (*MultiFormatSerializer) SerializeWithFormat

func (m *MultiFormatSerializer) SerializeWithFormat(format string, data interface{}) ([]byte, error)

SerializeWithFormat explicitly specifies the format to use.

type NoOpDeserializer

type NoOpDeserializer struct{}

NoOpDeserializer does not perform any deserialization. The target must be a *[]byte to receive the raw bytes.

func (*NoOpDeserializer) Deserialize

func (n *NoOpDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize copies the raw bytes to the target if it's a *[]byte.

type NoOpSerializer

type NoOpSerializer struct{}

NoOpSerializer passes through byte slices without modification. Use this when you want to handle serialization yourself or work with raw bytes.

func (*NoOpSerializer) Serialize

func (n *NoOpSerializer) Serialize(data interface{}) ([]byte, error)

Serialize returns the data as-is if it's a byte slice, otherwise returns an error.

type ProtoMarshaler

type ProtoMarshaler interface {
	Marshal() ([]byte, error)
}

ProtoMarshaler is an interface for types that can marshal themselves to protobuf format. This is useful for custom protobuf implementations.

type ProtoMessage

type ProtoMessage interface {
	ProtoReflect() interface{} // This matches proto.Message
	Reset()
	String() string
}

ProtoMessage is an interface that matches the proto.Message interface from google.golang.org/protobuf. This allows the ProtobufSerializer to work with any protobuf message without requiring a direct dependency.

type ProtoUnmarshaler

type ProtoUnmarshaler interface {
	Unmarshal([]byte) error
}

ProtoUnmarshaler is an interface for types that can unmarshal themselves from protobuf format.

type ProtobufDeserializer

type ProtobufDeserializer struct {
	// UnmarshalFunc is the function used to unmarshal protobuf messages.
	// If nil, will attempt to use the ProtoUnmarshaler interface.
	// For google protobuf, use: proto.Unmarshal
	UnmarshalFunc func([]byte, interface{}) error
}

ProtobufDeserializer implements Deserializer for Protocol Buffer messages.

Example usage with google protobuf:

import "google.golang.org/protobuf/proto"

deserializer := &kafka.ProtobufDeserializer{
    UnmarshalFunc: proto.Unmarshal,
}

func (*ProtobufDeserializer) Deserialize

func (p *ProtobufDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize converts protobuf bytes to the target structure.

type ProtobufSerializer

type ProtobufSerializer struct {
	// MarshalFunc is the function used to marshal protobuf messages.
	// If nil, will attempt to use the ProtoMarshaler interface.
	// For google protobuf, use: proto.Marshal
	MarshalFunc func(interface{}) ([]byte, error)
}

ProtobufSerializer implements Serializer for Protocol Buffer messages.

This serializer works with any type that implements:

  • ProtoMessage interface (google.golang.org/protobuf/proto.Message)
  • ProtoMarshaler interface (custom types with Marshal() method)

Example usage with google protobuf:

import "google.golang.org/protobuf/proto"

serializer := &kafka.ProtobufSerializer{
    MarshalFunc: proto.Marshal,
}

Example with custom marshal function:

serializer := &kafka.ProtobufSerializer{
    MarshalFunc: func(m interface{}) ([]byte, error) {
        return myCustomProtoMarshal(m)
    },
}

func (*ProtobufSerializer) Serialize

func (p *ProtobufSerializer) Serialize(data interface{}) ([]byte, error)

Serialize converts a protobuf message to bytes.

type SASLConfig

type SASLConfig struct {
	// Enabled determines whether to use SASL authentication
	Enabled bool

	// Mechanism specifies the SASL mechanism to use
	// Options: "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"
	Mechanism string

	// Username is the SASL username
	Username string

	// Password is the SASL password
	Password string
}

SASLConfig contains SASL authentication configuration parameters.

type Serializer

type Serializer interface {
	// Serialize converts the input data to a byte slice
	Serialize(data interface{}) ([]byte, error)
}

Serializer defines the interface for serializing data before publishing to Kafka. Implementations can provide custom serialization logic (e.g., JSON, Protobuf, Avro, etc.).

type StringDeserializer

type StringDeserializer struct{}

StringDeserializer implements Deserializer for string data.

func (*StringDeserializer) Deserialize

func (s *StringDeserializer) Deserialize(data []byte, target interface{}) error

Deserialize converts bytes to string.

type StringSerializer

type StringSerializer struct {
	// Encoding specifies the character encoding (default: UTF-8)
	Encoding string
}

StringSerializer implements Serializer for string data. This is useful for text-based messages.

func (*StringSerializer) Serialize

func (s *StringSerializer) Serialize(data interface{}) ([]byte, error)

Serialize converts data to bytes.

type TLSConfig

type TLSConfig struct {
	// Enabled determines whether to use TLS/SSL for the connection
	Enabled bool

	// CACertPath is the file path to the CA certificate for verifying the broker
	CACertPath string

	// ClientCertPath is the file path to the client certificate
	ClientCertPath string

	// ClientKeyPath is the file path to the client certificate's private key
	ClientKeyPath string

	// InsecureSkipVerify controls whether to skip verification of the server's certificate
	// WARNING: Setting this to true is insecure and should only be used in testing
	InsecureSkipVerify bool
}

TLSConfig contains TLS/SSL configuration parameters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL