client

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Configuration errors
	ErrNoBrokers             = errors.New("no brokers specified")
	ErrInvalidTimeout        = errors.New("invalid timeout value")
	ErrInvalidMaxConnections = errors.New("invalid max connections value")
	ErrInvalidRetries        = errors.New("invalid retry count")

	// Connection errors
	ErrNoConnection         = errors.New("no connection available")
	ErrConnectionClosed     = errors.New("connection closed")
	ErrAllBrokersFailed     = errors.New("all brokers failed")
	ErrConnectionPoolClosed = errors.New("connection pool closed")

	// Request errors
	ErrRequestTimeout  = errors.New("request timeout")
	ErrInvalidResponse = errors.New("invalid response")
	ErrRequestFailed   = errors.New("request failed")

	// Client errors
	ErrClientClosed     = errors.New("client is closed")
	ErrInvalidTopic     = errors.New("invalid topic name")
	ErrInvalidPartition = errors.New("invalid partition")
	ErrInvalidOffset    = errors.New("invalid offset")

	// Producer errors
	ErrProducerClosed  = errors.New("producer is closed")
	ErrMessageTooLarge = errors.New("message too large")
	ErrBatchFull       = errors.New("batch is full")

	// Consumer errors
	ErrConsumerClosed = errors.New("consumer is closed")
	ErrNoMessages     = errors.New("no messages available")

	// Security errors
	ErrInvalidTLSConfig     = errors.New("invalid TLS configuration")
	ErrInvalidSASLConfig    = errors.New("invalid SASL configuration")
	ErrTLSHandshakeFailed   = errors.New("TLS handshake failed")
	ErrAuthenticationFailed = errors.New("authentication failed")
)

Functions

This section is empty.

Types

type BrokerStats

type BrokerStats struct {
	Total  int
	Active int
	Idle   int
}

BrokerStats holds per-broker statistics

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a StreamBus client

func New

func New(config *Config) (*Client, error)

New creates a new StreamBus client

func (*Client) Close

func (c *Client) Close() error

Close closes the client and releases resources

func (*Client) CreateTopic

func (c *Client) CreateTopic(topic string, numPartitions uint32, replicationFactor uint16) error

CreateTopic creates a new topic

func (*Client) DeleteTopic

func (c *Client) DeleteTopic(topic string) error

DeleteTopic deletes a topic

func (*Client) Fetch

func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error)

Fetch fetches messages from a topic partition

func (*Client) HealthCheck

func (c *Client) HealthCheck(broker string) error

HealthCheck sends a health check request to a broker

func (*Client) ListTopics

func (c *Client) ListTopics() ([]string, error)

ListTopics lists all topics

func (*Client) Stats

func (c *Client) Stats() ClientStats

Stats returns client statistics

type ClientStats

type ClientStats struct {
	RequestsSent   int64
	RequestsFailed int64
	BytesWritten   int64
	BytesRead      int64
	Uptime         time.Duration
	PoolStats      PoolStats
}

ClientStats holds client statistics

type Config

type Config struct {
	// Broker addresses (e.g., []string{"localhost:9092", "localhost:9093"})
	Brokers []string

	// Connection timeout
	ConnectTimeout time.Duration

	// Read timeout for requests
	ReadTimeout time.Duration

	// Write timeout for requests
	WriteTimeout time.Duration

	// Maximum number of connections per broker
	MaxConnectionsPerBroker int

	// Retry configuration
	MaxRetries    int
	RetryBackoff  time.Duration
	RetryMaxDelay time.Duration

	// Request timeout
	RequestTimeout time.Duration

	// Keep-alive configuration
	KeepAlive       bool
	KeepAlivePeriod time.Duration

	// Producer configuration
	ProducerConfig ProducerConfig

	// Consumer configuration
	ConsumerConfig ConsumerConfig

	// Security configuration
	Security *SecurityConfig
}

Config holds client configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns default client configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool manages connections to brokers

func NewConnectionPool

func NewConnectionPool(config *Config) *ConnectionPool

NewConnectionPool creates a new connection pool

func (*ConnectionPool) Close

func (p *ConnectionPool) Close() error

Close closes all connections in the pool

func (*ConnectionPool) Get

func (p *ConnectionPool) Get(broker string) (*connection, error)

Get gets a connection to the specified broker

func (*ConnectionPool) Put

func (p *ConnectionPool) Put(conn *connection)

Put returns a connection to the pool

func (*ConnectionPool) Remove

func (p *ConnectionPool) Remove(conn *connection) error

Remove removes a connection from the pool

func (*ConnectionPool) Stats

func (p *ConnectionPool) Stats() PoolStats

Stats returns pool statistics

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reads messages from StreamBus topics

func NewConsumer

func NewConsumer(client *Client, topic string, partitionID uint32) *Consumer

NewConsumer creates a new consumer

func NewConsumerWithConfig

func NewConsumerWithConfig(client *Client, topic string, partitionID uint32, config ConsumerConfig) *Consumer

NewConsumerWithConfig creates a new consumer with custom config

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer

func (*Consumer) CurrentOffset

func (c *Consumer) CurrentOffset() int64

CurrentOffset returns the current offset

func (*Consumer) Fetch

func (c *Consumer) Fetch() ([]protocol.Message, error)

Fetch fetches messages from the current offset

func (*Consumer) FetchN

func (c *Consumer) FetchN(maxMessages int) ([]protocol.Message, error)

FetchN fetches up to N messages

func (*Consumer) FetchOne

func (c *Consumer) FetchOne() (*protocol.Message, error)

FetchOne fetches a single message

func (*Consumer) GetEndOffset

func (c *Consumer) GetEndOffset() (int64, error)

GetEndOffset gets the end offset for the partition

func (*Consumer) Partition

func (c *Consumer) Partition() uint32

Partition returns the partition being consumed

func (*Consumer) Poll

func (c *Consumer) Poll(interval time.Duration, handler func([]protocol.Message) error) error

Poll continuously polls for messages with a callback

func (*Consumer) Seek

func (c *Consumer) Seek(offset int64) error

Seek seeks to a specific offset

func (*Consumer) SeekToBeginning

func (c *Consumer) SeekToBeginning() error

SeekToBeginning seeks to the beginning of the partition

func (*Consumer) SeekToEnd

func (c *Consumer) SeekToEnd() error

SeekToEnd seeks to the end of the partition

func (*Consumer) Stats

func (c *Consumer) Stats() ConsumerStats

Stats returns consumer statistics

func (*Consumer) Topic

func (c *Consumer) Topic() string

Topic returns the topic being consumed

type ConsumerConfig

type ConsumerConfig struct {
	// Consumer group ID
	GroupID string

	// Offset to start consuming from (earliest, latest, or specific offset)
	StartOffset int64

	// Maximum bytes to fetch per request
	MaxFetchBytes uint32

	// Minimum bytes before server responds
	MinFetchBytes uint32

	// Maximum wait time for server to accumulate min bytes
	MaxWaitTime time.Duration

	// Auto-commit offset interval
	AutoCommitInterval time.Duration
}

ConsumerConfig holds consumer-specific configuration

type ConsumerMetrics

type ConsumerMetrics struct {
	MessagesRead int64
	BytesRead    int64
	FetchCount   int64
}

ConsumerMetrics contains consumer metrics

type ConsumerRecord

type ConsumerRecord struct {
	Topic     string
	Partition int32
	Offset    int64
	Message   protocol.Message
}

ConsumerRecord represents a consumed message with metadata

type ConsumerState

type ConsumerState int

ConsumerState represents the state of a group consumer

const (
	StateUnjoined ConsumerState = iota
	StateJoining
	StateRebalancing
	StateStable
)

type ConsumerStats

type ConsumerStats struct {
	Topic         string
	PartitionID   uint32
	CurrentOffset int64
	MessagesRead  int64
	BytesRead     int64
	FetchCount    int64
}

ConsumerStats holds consumer statistics

type DefaultRebalanceListener

type DefaultRebalanceListener struct{}

DefaultRebalanceListener is a no-op rebalance listener

func (*DefaultRebalanceListener) OnPartitionsAssigned

func (l *DefaultRebalanceListener) OnPartitionsAssigned(partitions map[string][]int32)

func (*DefaultRebalanceListener) OnPartitionsRevoked

func (l *DefaultRebalanceListener) OnPartitionsRevoked(partitions map[string][]int32)

type FetchRequest

type FetchRequest struct {
	Topic     string
	Partition int32
	Offset    int64
	MaxBytes  int32
}

FetchRequest represents a fetch request

type FetchResponse

type FetchResponse struct {
	Topic         string
	Partition     int32
	Messages      []protocol.Message
	HighWaterMark int64
}

FetchResponse represents a fetch response

type GroupConsumer

type GroupConsumer struct {
	// contains filtered or unexported fields
}

GroupConsumer is a consumer that coordinates with other consumers in a group

func NewGroupConsumer

func NewGroupConsumer(client *Client, config GroupConsumerConfig) (*GroupConsumer, error)

NewGroupConsumer creates a new group consumer

func (*GroupConsumer) Assignment

func (gc *GroupConsumer) Assignment() map[string][]int32

Assignment returns the current partition assignment

func (*GroupConsumer) Close

func (gc *GroupConsumer) Close() error

Close closes the consumer and leaves the group

func (*GroupConsumer) CommitSync

func (gc *GroupConsumer) CommitSync(ctx context.Context, offsets map[string]map[int32]int64) error

CommitSync commits offsets synchronously

func (*GroupConsumer) Poll

func (gc *GroupConsumer) Poll(ctx context.Context) (map[string]map[int32][]protocol.Message, error)

Poll polls for messages from assigned partitions

func (*GroupConsumer) SetRebalanceListener

func (gc *GroupConsumer) SetRebalanceListener(listener RebalanceListener)

SetRebalanceListener sets a custom rebalance listener

func (*GroupConsumer) Stats

func (gc *GroupConsumer) Stats() GroupConsumerStats

Stats returns consumer statistics

func (*GroupConsumer) Subscribe

func (gc *GroupConsumer) Subscribe(ctx context.Context) error

Subscribe subscribes to the group and starts consuming

type GroupConsumerConfig

type GroupConsumerConfig struct {
	// Consumer group ID
	GroupID string

	// Topics to subscribe to
	Topics []string

	// Session timeout (how long coordinator waits before removing member)
	SessionTimeoutMs int32

	// Rebalance timeout (max time for rebalance)
	RebalanceTimeoutMs int32

	// Heartbeat interval (how often to send heartbeats)
	HeartbeatIntervalMs int32

	// Assignment strategy
	AssignmentStrategy string // "range", "roundrobin", "sticky"

	// Auto commit offsets
	AutoCommit bool

	// Auto commit interval
	AutoCommitIntervalMs int32

	// Client ID for identification
	ClientID string
}

GroupConsumerConfig holds group consumer configuration

func DefaultGroupConsumerConfig

func DefaultGroupConsumerConfig() GroupConsumerConfig

DefaultGroupConsumerConfig returns default configuration

type GroupConsumerStats

type GroupConsumerStats struct {
	GroupID          string
	MemberID         string
	State            ConsumerState
	RebalanceCount   int64
	MessagesRead     int64
	OffsetsCommitted int64
}

GroupConsumerStats holds statistics for a group consumer

type IsolationLevel

type IsolationLevel int

IsolationLevel represents the transaction isolation level for consumers

const (
	// ReadUncommitted reads all messages including those in open transactions
	ReadUncommitted IsolationLevel = iota
	// ReadCommitted only reads messages from committed transactions
	ReadCommitted
)

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

PartitionConsumer is a consumer that is aware of partitions and can consume from multiple partitions

func NewPartitionConsumer

func NewPartitionConsumer(client *Client, topic string, partitions []uint32) *PartitionConsumer

NewPartitionConsumer creates a new partition-aware consumer

func NewPartitionConsumerWithConfig

func NewPartitionConsumerWithConfig(client *Client, topic string, partitions []uint32, config ConsumerConfig) *PartitionConsumer

NewPartitionConsumerWithConfig creates a new partition consumer with custom config

func (*PartitionConsumer) Close

func (pc *PartitionConsumer) Close() error

Close closes the partition consumer

func (*PartitionConsumer) FetchAll

func (pc *PartitionConsumer) FetchAll() (map[uint32][]protocol.Message, error)

FetchAll fetches from all assigned partitions

func (*PartitionConsumer) FetchFromPartition

func (pc *PartitionConsumer) FetchFromPartition(partitionID uint32) ([]protocol.Message, error)

FetchFromPartition fetches messages from a specific partition

func (*PartitionConsumer) FetchRoundRobin

func (pc *PartitionConsumer) FetchRoundRobin() ([]protocol.Message, error)

FetchRoundRobin fetches from partitions in round-robin fashion

func (*PartitionConsumer) GetOffsets

func (pc *PartitionConsumer) GetOffsets() map[uint32]int64

GetOffsets returns current offsets for all partitions

func (*PartitionConsumer) GetPartitionInfo

func (pc *PartitionConsumer) GetPartitionInfo(partitionID uint32) (PartitionInfo, error)

GetPartitionInfo returns information about a specific partition

func (*PartitionConsumer) Metrics

func (pc *PartitionConsumer) Metrics() ConsumerMetrics

Metrics returns consumer metrics

func (*PartitionConsumer) PollPartitions

func (pc *PartitionConsumer) PollPartitions(ctx context.Context, interval time.Duration, handler func(uint32, []protocol.Message) error) error

PollPartitions polls all partitions for new messages

func (*PartitionConsumer) SeekAll

func (pc *PartitionConsumer) SeekAll(offset int64) error

SeekAll seeks all partitions to the same offset

func (*PartitionConsumer) SeekPartition

func (pc *PartitionConsumer) SeekPartition(partitionID uint32, offset int64) error

SeekPartition seeks to a specific offset in a partition

type PartitionInfo

type PartitionInfo struct {
	PartitionID uint32
	Offset      int64
	HighWater   int64
	Lag         int64
	LastFetch   time.Time
}

PartitionInfo contains information about a partition

type PendingMessage

type PendingMessage struct {
	Topic     string
	Partition int32
	Message   protocol.Message
}

PendingMessage represents a message pending in a transaction

type PoolStats

type PoolStats struct {
	TotalConnections  int
	ActiveConnections int
	IdleConnections   int
	BrokerStats       map[string]BrokerStats
}

PoolStats holds pool statistics

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer sends messages to StreamBus topics

func NewProducer

func NewProducer(client *Client) *Producer

NewProducer creates a new producer

func NewProducerWithConfig

func NewProducerWithConfig(client *Client, config ProducerConfig) *Producer

NewProducerWithConfig creates a new producer with custom config

func (*Producer) Close

func (p *Producer) Close() error

Close closes the producer

func (*Producer) Flush

func (p *Producer) Flush(topic string) error

Flush flushes all pending batches for a topic

func (*Producer) FlushAll

func (p *Producer) FlushAll() error

FlushAll flushes all pending batches

func (*Producer) Send

func (p *Producer) Send(topic string, key, value []byte) error

Send sends a single message

func (*Producer) SendMessages

func (p *Producer) SendMessages(topic string, messages []protocol.Message) error

SendMessages sends multiple messages

func (*Producer) SendMessagesToPartition

func (p *Producer) SendMessagesToPartition(topic string, partitionID uint32, messages []protocol.Message) error

SendMessagesToPartition sends multiple messages to a specific partition

func (*Producer) SendToPartition

func (p *Producer) SendToPartition(topic string, partitionID uint32, key, value []byte) error

SendToPartition sends a message to a specific partition

func (*Producer) Stats

func (p *Producer) Stats() ProducerStats

Stats returns producer statistics

type ProducerConfig

type ProducerConfig struct {
	// Whether to require acknowledgment from server
	RequireAck bool

	// Batch size for batching messages (0 = no batching)
	BatchSize int

	// Maximum time to wait before flushing batch
	BatchTimeout time.Duration

	// Compression type (none, gzip, snappy, lz4)
	Compression string

	// Maximum number of in-flight requests
	MaxInFlightRequests int
}

ProducerConfig holds producer-specific configuration

type ProducerState

type ProducerState int

ProducerState represents the state of a transactional producer

const (
	ProducerStateUninitialized ProducerState = iota
	ProducerStateReady
	ProducerStateInTransaction
	ProducerStateCommitting
	ProducerStateAborting
	ProducerStateFenced
	ProducerStateClosed
)

type ProducerStats

type ProducerStats struct {
	MessagesSent   int64
	MessagesFailed int64
	BatchesSent    int64
	BytesSent      int64
}

ProducerStats holds producer statistics

type RebalanceListener

type RebalanceListener interface {
	// OnPartitionsRevoked is called before partitions are revoked
	OnPartitionsRevoked(partitions map[string][]int32)

	// OnPartitionsAssigned is called after new partitions are assigned
	OnPartitionsAssigned(partitions map[string][]int32)
}

RebalanceListener is called during rebalancing events

type SASLConfig

type SASLConfig struct {
	// Enable SASL
	Enabled bool

	// SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)
	Mechanism string

	// Username
	Username string

	// Password
	Password string
}

SASLConfig holds SASL authentication configuration

type SecurityConfig

type SecurityConfig struct {
	// TLS configuration
	TLS *TLSConfig

	// SASL configuration
	SASL *SASLConfig

	// API Key
	APIKey string
}

SecurityConfig holds client security configuration

func (*SecurityConfig) Validate

func (s *SecurityConfig) Validate() error

Validate validates security configuration

type TLSConfig

type TLSConfig struct {
	// Enable TLS
	Enabled bool

	// Certificate file (for client certificate authentication)
	CertFile string

	// Key file (for client certificate authentication)
	KeyFile string

	// CA certificate file (to verify server certificate)
	CAFile string

	// Skip server certificate verification (not recommended for production)
	InsecureSkipVerify bool

	// Server name for certificate verification
	ServerName string
	// contains filtered or unexported fields
}

TLSConfig holds TLS configuration for the client

type Transaction

type Transaction struct {
	ID         transaction.TransactionID
	StartTime  time.Time
	Partitions map[string][]int32 // topic -> partitions
	Messages   []PendingMessage
}

Transaction represents an active transaction

type TransactionalConsumer

type TransactionalConsumer struct {
	// contains filtered or unexported fields
}

TransactionalConsumer provides read-committed isolation for consuming messages

func NewTransactionalConsumer

func NewTransactionalConsumer(config *TransactionalConsumerConfig) (*TransactionalConsumer, error)

NewTransactionalConsumer creates a new transactional consumer

func (*TransactionalConsumer) Close

func (tc *TransactionalConsumer) Close() error

Close closes the consumer

func (*TransactionalConsumer) Commit

func (tc *TransactionalConsumer) Commit(ctx context.Context) error

Commit commits the current position

func (*TransactionalConsumer) CommitSync

func (tc *TransactionalConsumer) CommitSync(ctx context.Context, offsets map[string]map[int32]int64) error

CommitSync commits offsets synchronously

func (*TransactionalConsumer) Committed

func (tc *TransactionalConsumer) Committed(topic string, partition int32) (int64, error)

Committed returns the last committed offset for a partition

func (*TransactionalConsumer) MarkTransactionAborted

func (tc *TransactionalConsumer) MarkTransactionAborted(txnID transaction.TransactionID)

MarkTransactionAborted marks a transaction as aborted Messages from aborted transactions will be filtered in read-committed mode

func (*TransactionalConsumer) Poll

Poll fetches messages from subscribed topics

func (*TransactionalConsumer) Position

func (tc *TransactionalConsumer) Position(topic string, partition int32) (int64, error)

Position returns the current position for a partition

func (*TransactionalConsumer) Seek

func (tc *TransactionalConsumer) Seek(topic string, partition int32, offset int64) error

Seek moves the consumer position to a specific offset

func (*TransactionalConsumer) Stats

Stats returns consumer statistics

func (*TransactionalConsumer) UpdateLastStableOffset

func (tc *TransactionalConsumer) UpdateLastStableOffset(topic string, partition int32, lso int64)

UpdateLastStableOffset updates the last stable offset for a partition This is called internally when transaction markers are read

type TransactionalConsumerConfig

type TransactionalConsumerConfig struct {
	// Client is the underlying StreamBus client
	Client *Client

	// Topics to consume from
	Topics []string

	// GroupID for consumer group membership (optional)
	GroupID string

	// IsolationLevel determines which messages are visible
	IsolationLevel IsolationLevel

	// MaxPollRecords limits records returned per fetch
	MaxPollRecords int

	// AutoCommitEnabled enables automatic offset commits
	AutoCommitEnabled bool
}

TransactionalConsumerConfig holds configuration for transactional consumer

func DefaultTransactionalConsumerConfig

func DefaultTransactionalConsumerConfig() *TransactionalConsumerConfig

DefaultTransactionalConsumerConfig returns default configuration

type TransactionalConsumerStats

type TransactionalConsumerStats struct {
	MessagesConsumed    int64
	MessagesFiltered    int64          // Messages filtered due to aborted transactions
	AbortedTransactions int            // Number of known aborted transactions
	IsolationLevel      IsolationLevel // Current isolation level
}

TransactionalConsumerStats holds consumer statistics

type TransactionalProducer

type TransactionalProducer struct {
	// contains filtered or unexported fields
}

TransactionalProducer provides exactly-once semantics for message production

func NewTransactionalProducer

func NewTransactionalProducer(client *Client, config TransactionalProducerConfig) (*TransactionalProducer, error)

NewTransactionalProducer creates a new transactional producer

func (*TransactionalProducer) AbortTransaction

func (tp *TransactionalProducer) AbortTransaction(ctx context.Context) error

AbortTransaction aborts the current transaction

func (*TransactionalProducer) BeginTransaction

func (tp *TransactionalProducer) BeginTransaction(ctx context.Context) error

BeginTransaction starts a new transaction

func (*TransactionalProducer) Close

func (tp *TransactionalProducer) Close() error

Close closes the transactional producer

func (*TransactionalProducer) CommitTransaction

func (tp *TransactionalProducer) CommitTransaction(ctx context.Context) error

CommitTransaction commits the current transaction

func (*TransactionalProducer) Send

func (tp *TransactionalProducer) Send(ctx context.Context, topic string, partition int32, message protocol.Message) error

Send sends a message within the current transaction

func (*TransactionalProducer) SendOffsetsToTransaction

func (tp *TransactionalProducer) SendOffsetsToTransaction(ctx context.Context, groupID string, offsets map[string]map[int32]int64) error

SendOffsetsToTransaction adds consumer group offsets to the transaction

func (*TransactionalProducer) Stats

Stats returns producer statistics

type TransactionalProducerConfig

type TransactionalProducerConfig struct {
	// Unique transaction ID for this producer
	TransactionID string

	// Transaction timeout
	TransactionTimeout time.Duration

	// Maximum time to wait for coordinator response
	RequestTimeout time.Duration
}

TransactionalProducerConfig holds configuration for transactional producer

func DefaultTransactionalProducerConfig

func DefaultTransactionalProducerConfig() TransactionalProducerConfig

DefaultTransactionalProducerConfig returns default configuration

type TransactionalProducerStats

type TransactionalProducerStats struct {
	ProducerID            transaction.ProducerID
	ProducerEpoch         transaction.ProducerEpoch
	State                 ProducerState
	TransactionsCommitted int64
	TransactionsAborted   int64
	MessagesProduced      int64
}

TransactionalProducerStats holds producer statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL