Documentation
¶
Index ¶
- Variables
- type BrokerStats
- type Client
- func (c *Client) Close() error
- func (c *Client) CreateTopic(topic string, numPartitions uint32, replicationFactor uint16) error
- func (c *Client) DeleteTopic(topic string) error
- func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error)
- func (c *Client) HealthCheck(broker string) error
- func (c *Client) ListTopics() ([]string, error)
- func (c *Client) Stats() ClientStats
- type ClientStats
- type Config
- type ConnectionPool
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) CurrentOffset() int64
- func (c *Consumer) Fetch() ([]protocol.Message, error)
- func (c *Consumer) FetchN(maxMessages int) ([]protocol.Message, error)
- func (c *Consumer) FetchOne() (*protocol.Message, error)
- func (c *Consumer) GetEndOffset() (int64, error)
- func (c *Consumer) Partition() uint32
- func (c *Consumer) Poll(interval time.Duration, handler func([]protocol.Message) error) error
- func (c *Consumer) Seek(offset int64) error
- func (c *Consumer) SeekToBeginning() error
- func (c *Consumer) SeekToEnd() error
- func (c *Consumer) Stats() ConsumerStats
- func (c *Consumer) Topic() string
- type ConsumerConfig
- type ConsumerMetrics
- type ConsumerRecord
- type ConsumerState
- type ConsumerStats
- type DefaultRebalanceListener
- type FetchRequest
- type FetchResponse
- type GroupConsumer
- func (gc *GroupConsumer) Assignment() map[string][]int32
- func (gc *GroupConsumer) Close() error
- func (gc *GroupConsumer) CommitSync(ctx context.Context, offsets map[string]map[int32]int64) error
- func (gc *GroupConsumer) Poll(ctx context.Context) (map[string]map[int32][]protocol.Message, error)
- func (gc *GroupConsumer) SetRebalanceListener(listener RebalanceListener)
- func (gc *GroupConsumer) Stats() GroupConsumerStats
- func (gc *GroupConsumer) Subscribe(ctx context.Context) error
- type GroupConsumerConfig
- type GroupConsumerStats
- type IsolationLevel
- type PartitionConsumer
- func (pc *PartitionConsumer) Close() error
- func (pc *PartitionConsumer) FetchAll() (map[uint32][]protocol.Message, error)
- func (pc *PartitionConsumer) FetchFromPartition(partitionID uint32) ([]protocol.Message, error)
- func (pc *PartitionConsumer) FetchRoundRobin() ([]protocol.Message, error)
- func (pc *PartitionConsumer) GetOffsets() map[uint32]int64
- func (pc *PartitionConsumer) GetPartitionInfo(partitionID uint32) (PartitionInfo, error)
- func (pc *PartitionConsumer) Metrics() ConsumerMetrics
- func (pc *PartitionConsumer) PollPartitions(ctx context.Context, interval time.Duration, ...) error
- func (pc *PartitionConsumer) SeekAll(offset int64) error
- func (pc *PartitionConsumer) SeekPartition(partitionID uint32, offset int64) error
- type PartitionInfo
- type PendingMessage
- type PoolStats
- type Producer
- func (p *Producer) Close() error
- func (p *Producer) Flush(topic string) error
- func (p *Producer) FlushAll() error
- func (p *Producer) Send(topic string, key, value []byte) error
- func (p *Producer) SendMessages(topic string, messages []protocol.Message) error
- func (p *Producer) SendMessagesToPartition(topic string, partitionID uint32, messages []protocol.Message) error
- func (p *Producer) SendToPartition(topic string, partitionID uint32, key, value []byte) error
- func (p *Producer) Stats() ProducerStats
- type ProducerConfig
- type ProducerState
- type ProducerStats
- type RebalanceListener
- type SASLConfig
- type SecurityConfig
- type TLSConfig
- type Transaction
- type TransactionalConsumer
- func (tc *TransactionalConsumer) Close() error
- func (tc *TransactionalConsumer) Commit(ctx context.Context) error
- func (tc *TransactionalConsumer) CommitSync(ctx context.Context, offsets map[string]map[int32]int64) error
- func (tc *TransactionalConsumer) Committed(topic string, partition int32) (int64, error)
- func (tc *TransactionalConsumer) MarkTransactionAborted(txnID transaction.TransactionID)
- func (tc *TransactionalConsumer) Poll(ctx context.Context) ([]ConsumerRecord, error)
- func (tc *TransactionalConsumer) Position(topic string, partition int32) (int64, error)
- func (tc *TransactionalConsumer) Seek(topic string, partition int32, offset int64) error
- func (tc *TransactionalConsumer) Stats() TransactionalConsumerStats
- func (tc *TransactionalConsumer) UpdateLastStableOffset(topic string, partition int32, lso int64)
- type TransactionalConsumerConfig
- type TransactionalConsumerStats
- type TransactionalProducer
- func (tp *TransactionalProducer) AbortTransaction(ctx context.Context) error
- func (tp *TransactionalProducer) BeginTransaction(ctx context.Context) error
- func (tp *TransactionalProducer) Close() error
- func (tp *TransactionalProducer) CommitTransaction(ctx context.Context) error
- func (tp *TransactionalProducer) Send(ctx context.Context, topic string, partition int32, message protocol.Message) error
- func (tp *TransactionalProducer) SendOffsetsToTransaction(ctx context.Context, groupID string, offsets map[string]map[int32]int64) error
- func (tp *TransactionalProducer) Stats() TransactionalProducerStats
- type TransactionalProducerConfig
- type TransactionalProducerStats
Constants ¶
This section is empty.
Variables ¶
var ( // Configuration errors ErrNoBrokers = errors.New("no brokers specified") ErrInvalidTimeout = errors.New("invalid timeout value") ErrInvalidMaxConnections = errors.New("invalid max connections value") ErrInvalidRetries = errors.New("invalid retry count") // Connection errors ErrNoConnection = errors.New("no connection available") ErrConnectionClosed = errors.New("connection closed") ErrAllBrokersFailed = errors.New("all brokers failed") ErrConnectionPoolClosed = errors.New("connection pool closed") // Request errors ErrRequestTimeout = errors.New("request timeout") ErrInvalidResponse = errors.New("invalid response") ErrRequestFailed = errors.New("request failed") // Client errors ErrClientClosed = errors.New("client is closed") ErrInvalidTopic = errors.New("invalid topic name") ErrInvalidPartition = errors.New("invalid partition") ErrInvalidOffset = errors.New("invalid offset") // Producer errors ErrProducerClosed = errors.New("producer is closed") ErrMessageTooLarge = errors.New("message too large") ErrBatchFull = errors.New("batch is full") // Consumer errors ErrConsumerClosed = errors.New("consumer is closed") ErrNoMessages = errors.New("no messages available") // Security errors ErrInvalidTLSConfig = errors.New("invalid TLS configuration") ErrInvalidSASLConfig = errors.New("invalid SASL configuration") ErrTLSHandshakeFailed = errors.New("TLS handshake failed") ErrAuthenticationFailed = errors.New("authentication failed") )
Functions ¶
This section is empty.
Types ¶
type BrokerStats ¶
BrokerStats holds per-broker statistics
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents a StreamBus client
func (*Client) CreateTopic ¶
CreateTopic creates a new topic
func (*Client) DeleteTopic ¶
DeleteTopic deletes a topic
func (*Client) Fetch ¶
func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error)
Fetch fetches messages from a topic partition
func (*Client) HealthCheck ¶
HealthCheck sends a health check request to a broker
func (*Client) ListTopics ¶
ListTopics lists all topics
type ClientStats ¶
type ClientStats struct {
RequestsSent int64
RequestsFailed int64
BytesWritten int64
BytesRead int64
Uptime time.Duration
PoolStats PoolStats
}
ClientStats holds client statistics
type Config ¶
type Config struct {
// Broker addresses (e.g., []string{"localhost:9092", "localhost:9093"})
Brokers []string
// Connection timeout
ConnectTimeout time.Duration
// Read timeout for requests
ReadTimeout time.Duration
// Write timeout for requests
WriteTimeout time.Duration
// Maximum number of connections per broker
MaxConnectionsPerBroker int
// Retry configuration
MaxRetries int
RetryBackoff time.Duration
RetryMaxDelay time.Duration
// Request timeout
RequestTimeout time.Duration
// Keep-alive configuration
KeepAlive bool
KeepAlivePeriod time.Duration
// Producer configuration
ProducerConfig ProducerConfig
// Consumer configuration
ConsumerConfig ConsumerConfig
// Security configuration
Security *SecurityConfig
}
Config holds client configuration
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns default client configuration
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages connections to brokers
func NewConnectionPool ¶
func NewConnectionPool(config *Config) *ConnectionPool
NewConnectionPool creates a new connection pool
func (*ConnectionPool) Close ¶
func (p *ConnectionPool) Close() error
Close closes all connections in the pool
func (*ConnectionPool) Get ¶
func (p *ConnectionPool) Get(broker string) (*connection, error)
Get gets a connection to the specified broker
func (*ConnectionPool) Put ¶
func (p *ConnectionPool) Put(conn *connection)
Put returns a connection to the pool
func (*ConnectionPool) Remove ¶
func (p *ConnectionPool) Remove(conn *connection) error
Remove removes a connection from the pool
func (*ConnectionPool) Stats ¶
func (p *ConnectionPool) Stats() PoolStats
Stats returns pool statistics
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reads messages from StreamBus topics
func NewConsumer ¶
NewConsumer creates a new consumer
func NewConsumerWithConfig ¶
func NewConsumerWithConfig(client *Client, topic string, partitionID uint32, config ConsumerConfig) *Consumer
NewConsumerWithConfig creates a new consumer with custom config
func (*Consumer) CurrentOffset ¶
CurrentOffset returns the current offset
func (*Consumer) GetEndOffset ¶
GetEndOffset gets the end offset for the partition
func (*Consumer) SeekToBeginning ¶
SeekToBeginning seeks to the beginning of the partition
type ConsumerConfig ¶
type ConsumerConfig struct {
// Consumer group ID
GroupID string
// Offset to start consuming from (earliest, latest, or specific offset)
StartOffset int64
// Maximum bytes to fetch per request
MaxFetchBytes uint32
// Minimum bytes before server responds
MinFetchBytes uint32
// Maximum wait time for server to accumulate min bytes
MaxWaitTime time.Duration
// Auto-commit offset interval
AutoCommitInterval time.Duration
}
ConsumerConfig holds consumer-specific configuration
type ConsumerMetrics ¶
ConsumerMetrics contains consumer metrics
type ConsumerRecord ¶
ConsumerRecord represents a consumed message with metadata
type ConsumerState ¶
type ConsumerState int
ConsumerState represents the state of a group consumer
const ( StateUnjoined ConsumerState = iota StateJoining StateRebalancing StateStable )
type ConsumerStats ¶
type ConsumerStats struct {
Topic string
PartitionID uint32
CurrentOffset int64
MessagesRead int64
BytesRead int64
FetchCount int64
}
ConsumerStats holds consumer statistics
type DefaultRebalanceListener ¶
type DefaultRebalanceListener struct{}
DefaultRebalanceListener is a no-op rebalance listener
func (*DefaultRebalanceListener) OnPartitionsAssigned ¶
func (l *DefaultRebalanceListener) OnPartitionsAssigned(partitions map[string][]int32)
func (*DefaultRebalanceListener) OnPartitionsRevoked ¶
func (l *DefaultRebalanceListener) OnPartitionsRevoked(partitions map[string][]int32)
type FetchRequest ¶
FetchRequest represents a fetch request
type FetchResponse ¶
type FetchResponse struct {
Topic string
Partition int32
Messages []protocol.Message
HighWaterMark int64
}
FetchResponse represents a fetch response
type GroupConsumer ¶
type GroupConsumer struct {
// contains filtered or unexported fields
}
GroupConsumer is a consumer that coordinates with other consumers in a group
func NewGroupConsumer ¶
func NewGroupConsumer(client *Client, config GroupConsumerConfig) (*GroupConsumer, error)
NewGroupConsumer creates a new group consumer
func (*GroupConsumer) Assignment ¶
func (gc *GroupConsumer) Assignment() map[string][]int32
Assignment returns the current partition assignment
func (*GroupConsumer) Close ¶
func (gc *GroupConsumer) Close() error
Close closes the consumer and leaves the group
func (*GroupConsumer) CommitSync ¶
CommitSync commits offsets synchronously
func (*GroupConsumer) SetRebalanceListener ¶
func (gc *GroupConsumer) SetRebalanceListener(listener RebalanceListener)
SetRebalanceListener sets a custom rebalance listener
func (*GroupConsumer) Stats ¶
func (gc *GroupConsumer) Stats() GroupConsumerStats
Stats returns consumer statistics
type GroupConsumerConfig ¶
type GroupConsumerConfig struct {
// Consumer group ID
GroupID string
// Topics to subscribe to
Topics []string
// Session timeout (how long coordinator waits before removing member)
SessionTimeoutMs int32
// Rebalance timeout (max time for rebalance)
RebalanceTimeoutMs int32
// Heartbeat interval (how often to send heartbeats)
HeartbeatIntervalMs int32
// Assignment strategy
AssignmentStrategy string // "range", "roundrobin", "sticky"
// Auto commit offsets
AutoCommit bool
// Auto commit interval
AutoCommitIntervalMs int32
// Client ID for identification
ClientID string
}
GroupConsumerConfig holds group consumer configuration
func DefaultGroupConsumerConfig ¶
func DefaultGroupConsumerConfig() GroupConsumerConfig
DefaultGroupConsumerConfig returns default configuration
type GroupConsumerStats ¶
type GroupConsumerStats struct {
GroupID string
MemberID string
State ConsumerState
RebalanceCount int64
MessagesRead int64
OffsetsCommitted int64
}
GroupConsumerStats holds statistics for a group consumer
type IsolationLevel ¶
type IsolationLevel int
IsolationLevel represents the transaction isolation level for consumers
const ( // ReadUncommitted reads all messages including those in open transactions ReadUncommitted IsolationLevel = iota // ReadCommitted only reads messages from committed transactions ReadCommitted )
type PartitionConsumer ¶
type PartitionConsumer struct {
// contains filtered or unexported fields
}
PartitionConsumer is a consumer that is aware of partitions and can consume from multiple partitions
func NewPartitionConsumer ¶
func NewPartitionConsumer(client *Client, topic string, partitions []uint32) *PartitionConsumer
NewPartitionConsumer creates a new partition-aware consumer
func NewPartitionConsumerWithConfig ¶
func NewPartitionConsumerWithConfig(client *Client, topic string, partitions []uint32, config ConsumerConfig) *PartitionConsumer
NewPartitionConsumerWithConfig creates a new partition consumer with custom config
func (*PartitionConsumer) Close ¶
func (pc *PartitionConsumer) Close() error
Close closes the partition consumer
func (*PartitionConsumer) FetchAll ¶
func (pc *PartitionConsumer) FetchAll() (map[uint32][]protocol.Message, error)
FetchAll fetches from all assigned partitions
func (*PartitionConsumer) FetchFromPartition ¶
func (pc *PartitionConsumer) FetchFromPartition(partitionID uint32) ([]protocol.Message, error)
FetchFromPartition fetches messages from a specific partition
func (*PartitionConsumer) FetchRoundRobin ¶
func (pc *PartitionConsumer) FetchRoundRobin() ([]protocol.Message, error)
FetchRoundRobin fetches from partitions in round-robin fashion
func (*PartitionConsumer) GetOffsets ¶
func (pc *PartitionConsumer) GetOffsets() map[uint32]int64
GetOffsets returns current offsets for all partitions
func (*PartitionConsumer) GetPartitionInfo ¶
func (pc *PartitionConsumer) GetPartitionInfo(partitionID uint32) (PartitionInfo, error)
GetPartitionInfo returns information about a specific partition
func (*PartitionConsumer) Metrics ¶
func (pc *PartitionConsumer) Metrics() ConsumerMetrics
Metrics returns consumer metrics
func (*PartitionConsumer) PollPartitions ¶
func (pc *PartitionConsumer) PollPartitions(ctx context.Context, interval time.Duration, handler func(uint32, []protocol.Message) error) error
PollPartitions polls all partitions for new messages
func (*PartitionConsumer) SeekAll ¶
func (pc *PartitionConsumer) SeekAll(offset int64) error
SeekAll seeks all partitions to the same offset
func (*PartitionConsumer) SeekPartition ¶
func (pc *PartitionConsumer) SeekPartition(partitionID uint32, offset int64) error
SeekPartition seeks to a specific offset in a partition
type PartitionInfo ¶
type PartitionInfo struct {
PartitionID uint32
Offset int64
HighWater int64
Lag int64
LastFetch time.Time
}
PartitionInfo contains information about a partition
type PendingMessage ¶
PendingMessage represents a message pending in a transaction
type PoolStats ¶
type PoolStats struct {
TotalConnections int
ActiveConnections int
IdleConnections int
BrokerStats map[string]BrokerStats
}
PoolStats holds pool statistics
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer sends messages to StreamBus topics
func NewProducerWithConfig ¶
func NewProducerWithConfig(client *Client, config ProducerConfig) *Producer
NewProducerWithConfig creates a new producer with custom config
func (*Producer) SendMessages ¶
SendMessages sends multiple messages
func (*Producer) SendMessagesToPartition ¶
func (p *Producer) SendMessagesToPartition(topic string, partitionID uint32, messages []protocol.Message) error
SendMessagesToPartition sends multiple messages to a specific partition
func (*Producer) SendToPartition ¶
SendToPartition sends a message to a specific partition
type ProducerConfig ¶
type ProducerConfig struct {
// Whether to require acknowledgment from server
RequireAck bool
// Batch size for batching messages (0 = no batching)
BatchSize int
// Maximum time to wait before flushing batch
BatchTimeout time.Duration
// Compression type (none, gzip, snappy, lz4)
Compression string
// Maximum number of in-flight requests
MaxInFlightRequests int
}
ProducerConfig holds producer-specific configuration
type ProducerState ¶
type ProducerState int
ProducerState represents the state of a transactional producer
const ( ProducerStateUninitialized ProducerState = iota ProducerStateReady ProducerStateInTransaction ProducerStateCommitting ProducerStateAborting ProducerStateFenced ProducerStateClosed )
type ProducerStats ¶
type ProducerStats struct {
MessagesSent int64
MessagesFailed int64
BatchesSent int64
BytesSent int64
}
ProducerStats holds producer statistics
type RebalanceListener ¶
type RebalanceListener interface {
// OnPartitionsRevoked is called before partitions are revoked
OnPartitionsRevoked(partitions map[string][]int32)
// OnPartitionsAssigned is called after new partitions are assigned
OnPartitionsAssigned(partitions map[string][]int32)
}
RebalanceListener is called during rebalancing events
type SASLConfig ¶
type SASLConfig struct {
// Enable SASL
Enabled bool
// SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)
Mechanism string
// Username
Username string
// Password
Password string
}
SASLConfig holds SASL authentication configuration
type SecurityConfig ¶
type SecurityConfig struct {
// TLS configuration
TLS *TLSConfig
// SASL configuration
SASL *SASLConfig
// API Key
APIKey string
}
SecurityConfig holds client security configuration
func (*SecurityConfig) Validate ¶
func (s *SecurityConfig) Validate() error
Validate validates security configuration
type TLSConfig ¶
type TLSConfig struct {
// Enable TLS
Enabled bool
// Certificate file (for client certificate authentication)
CertFile string
// Key file (for client certificate authentication)
KeyFile string
// CA certificate file (to verify server certificate)
CAFile string
// Skip server certificate verification (not recommended for production)
InsecureSkipVerify bool
// Server name for certificate verification
ServerName string
// contains filtered or unexported fields
}
TLSConfig holds TLS configuration for the client
type Transaction ¶
type Transaction struct {
ID transaction.TransactionID
StartTime time.Time
Partitions map[string][]int32 // topic -> partitions
Messages []PendingMessage
}
Transaction represents an active transaction
type TransactionalConsumer ¶
type TransactionalConsumer struct {
// contains filtered or unexported fields
}
TransactionalConsumer provides read-committed isolation for consuming messages
func NewTransactionalConsumer ¶
func NewTransactionalConsumer(config *TransactionalConsumerConfig) (*TransactionalConsumer, error)
NewTransactionalConsumer creates a new transactional consumer
func (*TransactionalConsumer) Close ¶
func (tc *TransactionalConsumer) Close() error
Close closes the consumer
func (*TransactionalConsumer) Commit ¶
func (tc *TransactionalConsumer) Commit(ctx context.Context) error
Commit commits the current position
func (*TransactionalConsumer) CommitSync ¶
func (tc *TransactionalConsumer) CommitSync(ctx context.Context, offsets map[string]map[int32]int64) error
CommitSync commits offsets synchronously
func (*TransactionalConsumer) Committed ¶
func (tc *TransactionalConsumer) Committed(topic string, partition int32) (int64, error)
Committed returns the last committed offset for a partition
func (*TransactionalConsumer) MarkTransactionAborted ¶
func (tc *TransactionalConsumer) MarkTransactionAborted(txnID transaction.TransactionID)
MarkTransactionAborted marks a transaction as aborted Messages from aborted transactions will be filtered in read-committed mode
func (*TransactionalConsumer) Poll ¶
func (tc *TransactionalConsumer) Poll(ctx context.Context) ([]ConsumerRecord, error)
Poll fetches messages from subscribed topics
func (*TransactionalConsumer) Position ¶
func (tc *TransactionalConsumer) Position(topic string, partition int32) (int64, error)
Position returns the current position for a partition
func (*TransactionalConsumer) Seek ¶
func (tc *TransactionalConsumer) Seek(topic string, partition int32, offset int64) error
Seek moves the consumer position to a specific offset
func (*TransactionalConsumer) Stats ¶
func (tc *TransactionalConsumer) Stats() TransactionalConsumerStats
Stats returns consumer statistics
func (*TransactionalConsumer) UpdateLastStableOffset ¶
func (tc *TransactionalConsumer) UpdateLastStableOffset(topic string, partition int32, lso int64)
UpdateLastStableOffset updates the last stable offset for a partition This is called internally when transaction markers are read
type TransactionalConsumerConfig ¶
type TransactionalConsumerConfig struct {
// Client is the underlying StreamBus client
Client *Client
// Topics to consume from
Topics []string
// GroupID for consumer group membership (optional)
GroupID string
// IsolationLevel determines which messages are visible
IsolationLevel IsolationLevel
// MaxPollRecords limits records returned per fetch
MaxPollRecords int
// AutoCommitEnabled enables automatic offset commits
AutoCommitEnabled bool
}
TransactionalConsumerConfig holds configuration for transactional consumer
func DefaultTransactionalConsumerConfig ¶
func DefaultTransactionalConsumerConfig() *TransactionalConsumerConfig
DefaultTransactionalConsumerConfig returns default configuration
type TransactionalConsumerStats ¶
type TransactionalConsumerStats struct {
MessagesConsumed int64
MessagesFiltered int64 // Messages filtered due to aborted transactions
AbortedTransactions int // Number of known aborted transactions
IsolationLevel IsolationLevel // Current isolation level
}
TransactionalConsumerStats holds consumer statistics
type TransactionalProducer ¶
type TransactionalProducer struct {
// contains filtered or unexported fields
}
TransactionalProducer provides exactly-once semantics for message production
func NewTransactionalProducer ¶
func NewTransactionalProducer(client *Client, config TransactionalProducerConfig) (*TransactionalProducer, error)
NewTransactionalProducer creates a new transactional producer
func (*TransactionalProducer) AbortTransaction ¶
func (tp *TransactionalProducer) AbortTransaction(ctx context.Context) error
AbortTransaction aborts the current transaction
func (*TransactionalProducer) BeginTransaction ¶
func (tp *TransactionalProducer) BeginTransaction(ctx context.Context) error
BeginTransaction starts a new transaction
func (*TransactionalProducer) Close ¶
func (tp *TransactionalProducer) Close() error
Close closes the transactional producer
func (*TransactionalProducer) CommitTransaction ¶
func (tp *TransactionalProducer) CommitTransaction(ctx context.Context) error
CommitTransaction commits the current transaction
func (*TransactionalProducer) Send ¶
func (tp *TransactionalProducer) Send(ctx context.Context, topic string, partition int32, message protocol.Message) error
Send sends a message within the current transaction
func (*TransactionalProducer) SendOffsetsToTransaction ¶
func (tp *TransactionalProducer) SendOffsetsToTransaction(ctx context.Context, groupID string, offsets map[string]map[int32]int64) error
SendOffsetsToTransaction adds consumer group offsets to the transaction
func (*TransactionalProducer) Stats ¶
func (tp *TransactionalProducer) Stats() TransactionalProducerStats
Stats returns producer statistics
type TransactionalProducerConfig ¶
type TransactionalProducerConfig struct {
// Unique transaction ID for this producer
TransactionID string
// Transaction timeout
TransactionTimeout time.Duration
// Maximum time to wait for coordinator response
RequestTimeout time.Duration
}
TransactionalProducerConfig holds configuration for transactional producer
func DefaultTransactionalProducerConfig ¶
func DefaultTransactionalProducerConfig() TransactionalProducerConfig
DefaultTransactionalProducerConfig returns default configuration
type TransactionalProducerStats ¶
type TransactionalProducerStats struct {
ProducerID transaction.ProducerID
ProducerEpoch transaction.ProducerEpoch
State ProducerState
TransactionsCommitted int64
TransactionsAborted int64
MessagesProduced int64
}
TransactionalProducerStats holds producer statistics