relay

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RelayMetricPrefix = "kafka_relay_"

	// Metric names. Target{} implementations should use these to emit metrics.
	MetricSourceErrors      = "source_errors_total"
	MetricSourceUnhealthy   = "source_unhealthy_total"
	MetricSourceKafkaErrors = "source_kafka_errors_total"
	MetricSourceHighwater   = "source_highwatermark"
	MetricTargetErrors      = "target_errors_total"
	MetricTargetKafkaErrors = "target_kafka_errors_total"
	MetricMsgsRelayed       = "messages_relayed_total"

	MetricFilteredMsgs         = "messages_filtered_total"
	MetricInletBlocks          = "target_inlet_blocks_total"
	MetricFlushBatchSize       = "target_flush_batch_size"
	MetricFlushDuration        = "target_flush_duration_seconds"
	MetricFlushRetries         = "target_flush_retries_total"
	MetricCandidateSwitches    = "source_candidate_switches_total"
	MetricSourceConnections    = "source_connections_total"
	MetricLagThresholdExceeded = "source_lag_threshold_exceeded_total"

	// Canonical error labels for metrics.
	ErrLabelConnectionFailed   = "connection_failed"
	ErrLabelClientClosed       = "client_closed"
	ErrLabelFetch              = "fetch_error"
	ErrLabelTLSConfig          = "tls_config_error"
	ErrLabelClientCreation     = "client_creation_failed"
	ErrLabelProducerConnection = "connection_failed"
	ErrLabelProduce            = "produce_failed"
	ErrLabelProduceRetries     = "produce_retries_exhausted"

	// SASL mechanisms.
	SASLMechanismPlain       = "PLAIN"
	SASLMechanismScramSHA256 = "SCRAM-SHA-256"
	SASLMechanismScramSHA512 = "SCRAM-SHA-512"

	// Relay modes.
	ModeFailover    = "failover"
	ModeSingle      = "single"
	IndefiniteRetry = -1
)
View Source
const (
	StateDisconnected = iota
	StateConnecting
)

Connection states (iota-based, must stay in a separate block).

Variables

View Source
var (
	ErrLaggingBehind = errors.New("topic end offset is lagging behind")
)
View Source
var (
	ErrorNoHealthy = errors.New("no healthy node")
)

Functions

func AddSASLConfig

func AddSASLConfig(opts []kgo.Opt, cfg KafkaCfg) []kgo.Opt

func CheckTCP

func CheckTCP(ctx context.Context, addrs []string, timeout time.Duration) bool

CheckTCP dials a TCP address and checks if it's reachable.

func GetAckPolicy

func GetAckPolicy(ack string) kgo.Acks

GetAckPolicy generates franz-go's commit ack for the given stream.CommitAck.

func GetBackoffFn

func GetBackoffFn(enabled bool, min, max time.Duration) func(int) time.Duration

GetBackoffFn returns a backoff function based on the config. Even when backoff is disabled, a minimum 1-second delay is used to prevent CPU spinning.

func GetCompressionCodec

func GetCompressionCodec(codec string) kgo.CompressionCodec

func GetHighWatermark

func GetHighWatermark(ctx context.Context, client *kgo.Client, topics []string, timeout time.Duration) (kadm.ListedOffsets, error)

GetHighWatermark returns the highest watermark / offsets for the given topics as of the moment of requesting.

func GetTLSConfig

func GetTLSConfig(ca, cl, key string) (kgo.Opt, error)

func MetricName

func MetricName(base string, labels ...Label) string

MetricName builds a Prometheus metric name with optional labels. E.g. MetricName("errors_total", Label{"node_id", "1"}) → `kafka_relay_errors_total{node_id="1"}`

func ValidateConn

func ValidateConn(client *kgo.Client, timeout time.Duration, topics []string, partitions map[string]uint) error

ValidateConn tests if the connection is active and confirms the existence of topics (and optionally, specific partitions).

func WaitTries

func WaitTries(ctx context.Context, waitDuration time.Duration)

WaitTries waits for the given backoff duration, respecting context cancellation.

Types

type ConsumerCfg

type ConsumerCfg struct {
	KafkaCfg `koanf:",squash"`
}

ConsumerCfg is the direct consumer config.

type Header struct {
	Key   string
	Value []byte
}

Header is a key-value pair attached to a Message.

type KafkaCfg

type KafkaCfg struct {
	// Namespace
	Name string `koanf:"name"`

	// Broker.
	BootstrapBrokers []string      `koanf:"servers"`
	SessionTimeout   time.Duration `koanf:"session_timeout"`

	// Auth.
	EnableAuth bool `koanf:"enable_auth"`
	// PLAIN/SCRAM-SHA-256/SCRAM-SHA-512
	SASLMechanism string `koanf:"sasl_mechanism"`
	Username      string `koanf:"username"`
	Password      string `koanf:"password"`

	// If enabled and the three files are passed, will
	// use the relevant certs and keys. If enabled but all three
	// file paths are empty, it will default to using DialTLS()
	EnableTLS      bool   `koanf:"enable_tls"`
	ClientKeyPath  string `koanf:"client_key_path"`
	ClientCertPath string `koanf:"client_cert_path"`
	CACertPath     string `koanf:"ca_cert_path"`

	EnableLog bool `koanf:"enable_log"`
}

KafkaCfg is the message broker's client config.

type Label

type Label struct {
	Key, Value string
}

Label is a key-value pair for Prometheus metric labels.

type Message

type Message struct {
	Key             []byte
	Value           []byte
	Headers         []Header
	Topic           string // Target topic
	Partition       int32  // Target partition (-1 for auto)
	Offset          int64  // Source message offset.
	SourcePartition int32  // Source partition (for offset tracking by targets).
}

Message is a single Kafka message flowing through the relay pipeline. The relay converts source Kafka records into Message{}s before passing them to a Target.

type Offsets

type Offsets map[string]map[int32]int64

Offsets maps topic -> partition -> offset. This represents the progress of message offsets at a target.

type ProducerCfg

type ProducerCfg struct {
	KafkaCfg `koanf:",squash"`

	EnableIdempotency bool          `koanf:"enable_idempotency"`
	CommitAck         string        `koanf:"commit_ack_type"` // tcp|leader|cluster|default
	MaxRetries        int           `koanf:"max_retries"`
	FlushFrequency    time.Duration `koanf:"flush_frequency"`
	MaxMessageBytes   int           `koanf:"max_message_bytes"`
	BatchSize         int           `koanf:"batch_size"`
	BufferSize        int           `koanf:"buffer_size"`
	FlushBatchSize    int           `koanf:"flush_batch_size"`
	Compression       string        `koanf:"compression"` // gzip|snappy|lz4|zstd|none
}

ProducerCfg is the Kafka producer config.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay orchestrates consumption from a SourcePool and writing to a Target.

func NewRelay

func NewRelay(cfg RelayCfg, src *SourcePool, target Target, topic Topic, filters map[string]filter.Provider, m *metrics.Set, log *slog.Logger) (*Relay, error)

func (*Relay) Start

func (re *Relay) Start(globalCtx context.Context) error

Start starts the consumer loop on kafka (A), fetch messages and relays over to kafka (B) using an async

type RelayCfg

type RelayCfg struct {
	StopAtEnd bool
}

type Server

type Server struct {
	Config ConsumerCfg
	ID     int

	// Weight is the cumulative high watermark (offset) of every single topic
	// on a source. This is used for comparing lags between different sources
	// based on a threshold. If a server is unhealthy, the weight is marked as -1.
	Weight int64

	Healthy bool

	// This is only set when a new live Kafka consumer connection is established
	// on demand via Get(), where a server{} is returned. Internally, no connections
	// are maintained on SourcePool.[]servers and only the config, weight etc.
	// params are used to keep track of healthy servers.
	Client *kgo.Client
}

Server represents a source Server's config with health and weight parameters which are used for tracking health status.

type SourcePool

type SourcePool struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SourcePool manages the source Kafka instances and consumption.

func NewSourcePool

func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerCfg, topic Topic, targetOffsets TopicOffsets, m *metrics.Set, log *slog.Logger) (*SourcePool, error)

NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer) servers. The pool always attempts to find one healthy node for the relay to consume from.

func (*SourcePool) Get

func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error)

Get attempts return a healthy source Kafka client connection. It internally applies backoff/retries between connection attempts and thus can take indefinitely long to return based on the config.

func (*SourcePool) GetFetches

func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error)

GetFetches retrieves a Kafka fetch iterator to retrieve individual messages from.

func (*SourcePool) GetHighWatermark

func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error)

func (*SourcePool) RecordOffsets

func (sp *SourcePool) RecordOffsets(rec *kgo.Record) error

RecordOffsets records the offsets of the latest fetched records per topic. This is used to resume consumption on new connections/reconnections from the source during runtime.

type SourcePoolCfg

type SourcePoolCfg struct {
	HealthCheckInterval time.Duration
	ReqTimeout          time.Duration
	LagThreshold        int64
	MaxRetries          int
	EnableBackoff       bool
	BackoffMin          time.Duration
	BackoffMax          time.Duration
}

type Target

type Target interface {
	// GetHighWatermark returns the target's current offsets per topic-partition, which is then
	// used by the relay to resume consumption from the source.
	// Targets that don't/can't track offsets should return empty Offsets and nil, NOT an error.
	GetHighWatermark(ctx context.Context) (Offsets, error)

	// Start starts the target's background worker that batches and flushes source messages
	// to the target.
	Start() error

	// Write is a blocking function that queues a message for writing to target. Blocking is necessary
	// to wait on source consumption so that the target can catch up. It returns an error if the target is closed.
	Write(ctx context.Context, msg Message) error

	// Close closes the target and waits until all pending messages are flushed.
	Close() error
}

Target is the interface for a relay target/destination. The bundled `kafkatarget` package implements this for Kafka. This interface can be implemented to relay messages to other systems (Redis, HTTP, etc.).

type TargetCfg

type TargetCfg struct {
	ReqTimeout    time.Duration `koanf:"request_timeout"`
	EnableBackoff bool          `koanf:"enable_backoff"`
	BackoffMin    time.Duration `koanf:"backoff_min"`
	BackoffMax    time.Duration `koanf:"backoff_max"`
}

TargetCfg is the target/writer config (backoff, timeouts).

type Topic

type Topic struct {
	SourceTopic         string `koanf:"source_topic"`
	TargetTopic         string `koanf:"target_topic"`
	TargetPartition     uint   `koanf:"target_partition"`
	AutoTargetPartition bool   `koanf:"auto_target_partition"`
}

Topic represents a source->target topic configuration.

type TopicOffsets

type TopicOffsets map[int32]int64

TopicOffsets defines partition->offset map for any single src/target kafka topic. Values are plain int64 offsets, keeping the public API free of kgo types.

type Topics

type Topics map[string]Topic

Topics is an abstraction over source->target topic map.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL