sarama

package
v0.1.0-alpha.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMissingTopic         = errors.New("sarama topic is required")
	ErrMissingProducer      = errors.New("sarama producer is required")
	ErrMissingConsumerGroup = errors.New("sarama consumer group is required")
)

Functions

This section is empty.

Types

type Assignment

type Assignment struct {
	Group        string
	MemberID     string
	GenerationID int32
	Claims       map[string][]int32
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func New

func New(cfg Config) (*Broker, error)

func (*Broker) Close

func (b *Broker) Close() error

func (*Broker) Consume

func (b *Broker) Consume(ctx context.Context) (<-chan capmq.Delivery, error)

func (*Broker) PollErrors

func (b *Broker) PollErrors(ctx context.Context)

func (*Broker) Publish

func (b *Broker) Publish(ctx context.Context, message capmq.Message) error

func (*Broker) PublishBatch

func (b *Broker) PublishBatch(ctx context.Context, messages ...capmq.Message) ([]capmq.PublishResult, error)

func (*Broker) ReportHealth

func (b *Broker) ReportHealth(context.Context) (caphealth.Report, error)

func (*Broker) Subscribe

func (b *Broker) Subscribe(ctx context.Context, subscription capmq.Subscription) (<-chan capmq.Delivery, error)

type BrokerEvent

type BrokerEvent struct {
	Kind      EventKind
	Topic     string
	Group     string
	Partition int
	Offset    int64
	MemberID  string
	Err       error
	Time      time.Time
	Metadata  map[string]string
}

type Config

type Config struct {
	Brokers          []string
	Topic            string
	Group            string
	ClientID         string
	Callback         capmq.ProducerCallback
	ErrorHandler     capmq.ErrorHandler
	Producer         ibmsarama.SyncProducer
	ConsumerGroup    ibmsarama.ConsumerGroup
	SaramaConfig     *ibmsarama.Config
	Observer         Observer
	RebalanceHandler RebalanceHandler
}

type EventKind

type EventKind string
const (
	EventPublishSuccess EventKind = "publish_success"
	EventPublishError   EventKind = "publish_error"
	EventDelivery       EventKind = "delivery"
	EventOffsetCommit   EventKind = "offset_commit"
	EventOffsetReset    EventKind = "offset_reset"
	EventDeadLetter     EventKind = "dead_letter"
	EventConsumerError  EventKind = "consumer_error"
	EventRebalanceStart EventKind = "rebalance_start"
	EventRebalanceEnd   EventKind = "rebalance_end"
)

type Observer

type Observer interface {
	ObserveBrokerEvent(context.Context, BrokerEvent)
}

type RebalanceHandler

type RebalanceHandler interface {
	OnRebalanceStart(context.Context, Assignment)
	OnRebalanceEnd(context.Context, Assignment)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL