Documentation
¶
Index ¶
- Constants
- Variables
- func AddSASLConfig(opts []kgo.Opt, cfg KafkaCfg) []kgo.Opt
- func CheckTCP(ctx context.Context, addrs []string, timeout time.Duration) bool
- func GetAckPolicy(ack string) kgo.Acks
- func GetBackoffFn(enabled bool, min, max time.Duration) func(int) time.Duration
- func GetCompressionCodec(codec string) kgo.CompressionCodec
- func GetHighWatermark(ctx context.Context, client *kgo.Client, topics []string, ...) (kadm.ListedOffsets, error)
- func GetTLSConfig(ca, cl, key string) (kgo.Opt, error)
- func MetricName(base string, labels ...Label) string
- func ValidateConn(client *kgo.Client, timeout time.Duration, topics []string, ...) error
- func WaitTries(ctx context.Context, waitDuration time.Duration)
- type ConsumerCfg
- type Header
- type KafkaCfg
- type Label
- type Message
- type Offsets
- type ProducerCfg
- type Relay
- type RelayCfg
- type Server
- type SourcePool
- type SourcePoolCfg
- type Target
- type TargetCfg
- type Topic
- type TopicOffsets
- type Topics
Constants ¶
const ( RelayMetricPrefix = "kafka_relay_" // Metric names. Target{} implementations should use these to emit metrics. MetricSourceErrors = "source_errors_total" MetricSourceUnhealthy = "source_unhealthy_total" MetricSourceKafkaErrors = "source_kafka_errors_total" MetricSourceHighwater = "source_highwatermark" MetricTargetErrors = "target_errors_total" MetricTargetKafkaErrors = "target_kafka_errors_total" MetricMsgsRelayed = "messages_relayed_total" MetricFilteredMsgs = "messages_filtered_total" MetricInletBlocks = "target_inlet_blocks_total" MetricFlushBatchSize = "target_flush_batch_size" MetricFlushDuration = "target_flush_duration_seconds" MetricFlushRetries = "target_flush_retries_total" MetricCandidateSwitches = "source_candidate_switches_total" MetricSourceConnections = "source_connections_total" MetricLagThresholdExceeded = "source_lag_threshold_exceeded_total" // Canonical error labels for metrics. ErrLabelConnectionFailed = "connection_failed" ErrLabelClientClosed = "client_closed" ErrLabelFetch = "fetch_error" ErrLabelTLSConfig = "tls_config_error" ErrLabelClientCreation = "client_creation_failed" ErrLabelProducerConnection = "connection_failed" ErrLabelProduce = "produce_failed" ErrLabelProduceRetries = "produce_retries_exhausted" // SASL mechanisms. SASLMechanismPlain = "PLAIN" SASLMechanismScramSHA256 = "SCRAM-SHA-256" SASLMechanismScramSHA512 = "SCRAM-SHA-512" // Relay modes. ModeFailover = "failover" ModeSingle = "single" IndefiniteRetry = -1 )
const ( StateDisconnected = iota StateConnecting )
Connection states (iota-based, must stay in a separate block).
Variables ¶
var (
ErrLaggingBehind = errors.New("topic end offset is lagging behind")
)
var (
ErrorNoHealthy = errors.New("no healthy node")
)
Functions ¶
func GetAckPolicy ¶
GetAckPolicy generates franz-go's commit ack for the given stream.CommitAck.
func GetBackoffFn ¶
GetBackoffFn returns a backoff function based on the config. Even when backoff is disabled, a minimum 1-second delay is used to prevent CPU spinning.
func GetCompressionCodec ¶
func GetCompressionCodec(codec string) kgo.CompressionCodec
func GetHighWatermark ¶
func GetHighWatermark(ctx context.Context, client *kgo.Client, topics []string, timeout time.Duration) (kadm.ListedOffsets, error)
GetHighWatermark returns the highest watermark / offsets for the given topics as of the moment of requesting.
func MetricName ¶
MetricName builds a Prometheus metric name with optional labels. E.g. MetricName("errors_total", Label{"node_id", "1"}) → `kafka_relay_errors_total{node_id="1"}`
Types ¶
type ConsumerCfg ¶
type ConsumerCfg struct {
KafkaCfg `koanf:",squash"`
}
ConsumerCfg is the direct consumer config.
type KafkaCfg ¶
type KafkaCfg struct {
// Namespace
Name string `koanf:"name"`
// Broker.
BootstrapBrokers []string `koanf:"servers"`
SessionTimeout time.Duration `koanf:"session_timeout"`
// Auth.
EnableAuth bool `koanf:"enable_auth"`
// PLAIN/SCRAM-SHA-256/SCRAM-SHA-512
SASLMechanism string `koanf:"sasl_mechanism"`
Username string `koanf:"username"`
Password string `koanf:"password"`
// If enabled and the three files are passed, will
// use the relevant certs and keys. If enabled but all three
// file paths are empty, it will default to using DialTLS()
EnableTLS bool `koanf:"enable_tls"`
ClientKeyPath string `koanf:"client_key_path"`
ClientCertPath string `koanf:"client_cert_path"`
CACertPath string `koanf:"ca_cert_path"`
EnableLog bool `koanf:"enable_log"`
}
KafkaCfg is the message broker's client config.
type Label ¶
type Label struct {
Key, Value string
}
Label is a key-value pair for Prometheus metric labels.
type Message ¶
type Message struct {
Key []byte
Value []byte
Headers []Header
Topic string // Target topic
Partition int32 // Target partition (-1 for auto)
Offset int64 // Source message offset.
SourcePartition int32 // Source partition (for offset tracking by targets).
}
Message is a single Kafka message flowing through the relay pipeline. The relay converts source Kafka records into Message{}s before passing them to a Target.
type Offsets ¶
Offsets maps topic -> partition -> offset. This represents the progress of message offsets at a target.
type ProducerCfg ¶
type ProducerCfg struct {
KafkaCfg `koanf:",squash"`
EnableIdempotency bool `koanf:"enable_idempotency"`
CommitAck string `koanf:"commit_ack_type"` // tcp|leader|cluster|default
MaxRetries int `koanf:"max_retries"`
FlushFrequency time.Duration `koanf:"flush_frequency"`
MaxMessageBytes int `koanf:"max_message_bytes"`
BatchSize int `koanf:"batch_size"`
BufferSize int `koanf:"buffer_size"`
FlushBatchSize int `koanf:"flush_batch_size"`
Compression string `koanf:"compression"` // gzip|snappy|lz4|zstd|none
}
ProducerCfg is the Kafka producer config.
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
Relay orchestrates consumption from a SourcePool and writing to a Target.
type Server ¶
type Server struct {
Config ConsumerCfg
ID int
// Weight is the cumulative high watermark (offset) of every single topic
// on a source. This is used for comparing lags between different sources
// based on a threshold. If a server is unhealthy, the weight is marked as -1.
Weight int64
Healthy bool
// This is only set when a new live Kafka consumer connection is established
// on demand via Get(), where a server{} is returned. Internally, no connections
// are maintained on SourcePool.[]servers and only the config, weight etc.
// params are used to keep track of healthy servers.
Client *kgo.Client
}
Server represents a source Server's config with health and weight parameters which are used for tracking health status.
type SourcePool ¶
SourcePool manages the source Kafka instances and consumption.
func NewSourcePool ¶
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerCfg, topic Topic, targetOffsets TopicOffsets, m *metrics.Set, log *slog.Logger) (*SourcePool, error)
NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer) servers. The pool always attempts to find one healthy node for the relay to consume from.
func (*SourcePool) Get ¶
func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error)
Get attempts return a healthy source Kafka client connection. It internally applies backoff/retries between connection attempts and thus can take indefinitely long to return based on the config.
func (*SourcePool) GetFetches ¶
func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error)
GetFetches retrieves a Kafka fetch iterator to retrieve individual messages from.
func (*SourcePool) GetHighWatermark ¶
func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error)
func (*SourcePool) RecordOffsets ¶
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) error
RecordOffsets records the offsets of the latest fetched records per topic. This is used to resume consumption on new connections/reconnections from the source during runtime.
type SourcePoolCfg ¶
type Target ¶
type Target interface {
// GetHighWatermark returns the target's current offsets per topic-partition, which is then
// used by the relay to resume consumption from the source.
// Targets that don't/can't track offsets should return empty Offsets and nil, NOT an error.
GetHighWatermark(ctx context.Context) (Offsets, error)
// Start starts the target's background worker that batches and flushes source messages
// to the target.
Start() error
// Write is a blocking function that queues a message for writing to target. Blocking is necessary
// to wait on source consumption so that the target can catch up. It returns an error if the target is closed.
Write(ctx context.Context, msg Message) error
// Close closes the target and waits until all pending messages are flushed.
Close() error
}
Target is the interface for a relay target/destination. The bundled `kafkatarget` package implements this for Kafka. This interface can be implemented to relay messages to other systems (Redis, HTTP, etc.).
type TargetCfg ¶
type TargetCfg struct {
ReqTimeout time.Duration `koanf:"request_timeout"`
EnableBackoff bool `koanf:"enable_backoff"`
BackoffMin time.Duration `koanf:"backoff_min"`
BackoffMax time.Duration `koanf:"backoff_max"`
}
TargetCfg is the target/writer config (backoff, timeouts).
type Topic ¶
type Topic struct {
SourceTopic string `koanf:"source_topic"`
TargetTopic string `koanf:"target_topic"`
TargetPartition uint `koanf:"target_partition"`
AutoTargetPartition bool `koanf:"auto_target_partition"`
}
Topic represents a source->target topic configuration.
type TopicOffsets ¶
TopicOffsets defines partition->offset map for any single src/target kafka topic. Values are plain int64 offsets, keeping the public API free of kgo types.