kafka

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

README

Kafka

The Kafka module provides an abstraction for interfacing with Kafaka so that application code can focus on writing message processing code.

Binder

The Binder is the main interface for working with Kafka. The Kafka module provides a Kafka.Binder interface which your application can inject. Once your application have a reference to the Binder interface, you can create message producer from the Binder, or add your message consumer/subscriber to the binder.

Example

  1. To activate the Kafka module
Kafka.Use()
  1. Add Kafka properties to application.yml
# Following configuration serve as an example
# values specified in `kafka.bindings.default.*` are same as hardcoded defaults
#
# To overwrite defaults, add section with prefix `kafka.bindings.<your binding name>`,
# and specify the binding name when using Binder with `BindingName(...)` option
kafka:
  bindings:
    default:
      producer:
        log-level: "debug"
        ack-mode: "local" # all, local or none
        ack-timeout: 10s
        max-retry: 3
        backoff-interval: 100ms
        provisioning:
          auto-create-topic: true
          auto-add-partitions: true
          allow-lower-partitions: true
          partition-count: 1
          replication-factor: 1
      consumer:
        log-level: "debug"
        join-timeout: 60s
        max-retry: 4
        backoff-interval: 2s
    binding-name:
      producer:
        ...
      consumer:
        ...
  1. Inject the Kafka.Binder into your application
fx.Provide(NewComponent)

To create a producer from a Binder.

func NewComponent(b kafka.Binder) (*MyComponent, error) {
	p, err := b.Produce("MY_TOPIC", kafka.BindingName("my-binding-name"))
	if err != nil {
		return nil, err
	}
	return &MyComponent{Producer: p}, nil
}

Here you will have a component that have a reference to a message producer. The BindingName option allows binding specific configuration to be applied to your producer. See the documentation on BindingName for more details.

To add a consumer to the Binder, use fx.Invoke to registers the functions so that it's executed eagerly on application start. See fx documentation for the difference between fx.Invoke and fx.Provide.

fx.Invoke(AddConsumer)
func AddConsumer(Binder kafka.Binder) error {
	mc := &MyConsumer{
	}
	consumer, e := di.Binder.Consume("MY_TOPIC", kafkaGroup, kafka.BindingName("my-binding-name"))
	if e != nil {
		return e
	}
	if e := consumer.AddHandler(mc.MyMessageHandler); e != nil {
		return e
	}
	return nil
}

*MyConsumer has a method that implements Kafka.MessageHandlerFunc

See Kafka.MessageHandlerFunc for details on what methods are acceptable as message handler functions you can use in the consumer.AddHandler call.

See Kafka.Binder for details on additional details with regard to creating Producer, Consumer and Subscriber.

Documentation

Index

Constants

View Source
const (
	MIMETypeJson   = "application/json;charset=utf-8"
	MIMETypeBinary = "application/octet-stream"
	MIMETypeText   = "text/plain"
)
View Source
const (
	ErrorTypeCodeBinding = Reserved + iota<<errorutils.ErrorTypeOffset
	ErrorTypeCodeProducer
	ErrorTypeCodeConsumer
)

All "Type" values are used as mask

View Source
const (
	ErrorSubTypeCodeBindingInternal = ErrorTypeCodeBinding + iota<<errorutils.ErrorSubTypeOffset
	ErrorSubTypeCodeConnectivity
	ErrorSubTypeCodeProvisioning
)

All "SubType" values are used as mask sub-types of ErrorTypeCodeBinding

View Source
const (
	ErrorCodeIllegalState = ErrorSubTypeCodeProvisioning + iota
	ErrorCodeProducerExists
	ErrorCodeConsumerExists
	ErrorCodeAutoCreateTopicFailed
	ErrorCodeAutoAddPartitionsFailed
	ErrorCodeIllegalLifecycleState
)

ErrorSubTypeCodeProvisioning

View Source
const (
	ErrorSubTypeCodeProducerGeneral = ErrorTypeCodeProducer + iota<<errorutils.ErrorSubTypeOffset
	ErrorSubTypeCodeIllegalProducerUsage
	ErrorSubTypeCodeEncoding
)

All "SubType" values are used as mask sub-types of ErrorTypeProducer

View Source
const (
	ErrorSubTypeCodeConsumerGeneral = ErrorTypeCodeConsumer + iota<<errorutils.ErrorSubTypeOffset
	ErrorSubTypeCodeIllegalConsumerUsage
	ErrorSubTypeCodeDecoding
)

All "SubType" values are used as mask sub-types of ErrorTypeConsumer

View Source
const (
	ConfigKafkaPrefix               = "kafka"
	ConfigKafkaBindingPrefix        = "kafka.bindings"
	ConfigKafkaDefaultBindingPrefix = "kafka.bindings.default"
)
View Source
const (
	ErrorCodeBindingInternal = ErrorSubTypeCodeBindingInternal + iota
)

ErrorSubTypeCodeBindingInternal

View Source
const (
	ErrorCodeBrokerNotReachable = ErrorSubTypeCodeConnectivity + iota
)

ErrorSubTypeCodeConnectivity

View Source
const (
	FxGroup = "kafka"
)
View Source
const (
	HeaderContentType = "contentType"
)
View Source
const (
	// Reserved kafka reserved error range
	Reserved = 0x1a << errorutils.ReservedOffset
)

Variables

View Source
var (
	ErrorCategoryKafka = NewErrorCategory(Reserved, errors.New("error type: kafka"))
	ErrorTypeBinding   = NewErrorType(ErrorTypeCodeBinding, errors.New("error type: binding"))
	ErrorTypeProducer  = NewErrorType(ErrorTypeCodeProducer, errors.New("error type: producer"))
	ErrorTypeConsumer  = NewErrorType(ErrorTypeCodeConsumer, errors.New("error type: consumer"))

	ErrorSubTypeBindingInternal = NewErrorSubType(ErrorSubTypeCodeBindingInternal, errors.New("error sub-type: internal"))
	ErrorSubTypeConnectivity    = NewErrorSubType(ErrorSubTypeCodeConnectivity, errors.New("error sub-type: connectivity"))
	ErrorSubTypeProvisioning    = NewErrorSubType(ErrorSubTypeCodeProvisioning, errors.New("error sub-type: provisioning"))

	ErrorSubTypeProducerGeneral      = NewErrorSubType(ErrorSubTypeCodeProducerGeneral, errors.New("error sub-type: producer"))
	ErrorSubTypeIllegalProducerUsage = NewErrorSubType(ErrorSubTypeCodeIllegalProducerUsage, errors.New("error sub-type: producer api usage"))
	ErrorSubTypeEncoding             = NewErrorSubType(ErrorSubTypeCodeEncoding, errors.New("error sub-type: encoding"))
	ErrorSubTypeConsumerGeneral      = NewErrorSubType(ErrorSubTypeCodeConsumerGeneral, errors.New("error sub-type: consumer"))
	ErrorSubTypeIllegalConsumerUsage = NewErrorSubType(ErrorSubTypeCodeIllegalConsumerUsage, errors.New("error sub-type: consumer api usage"))
	ErrorSubTypeDecoding             = NewErrorSubType(ErrorSubTypeCodeDecoding, errors.New("error sub-type: decoding"))

	ErrorStartClosedBinding = NewKafkaError(ErrorCodeIllegalLifecycleState, "error: cannot start closed binding")
)

ErrorTypes, can be used in errors.Is

View Source
var Module = &bootstrap.Module{
	Precedence: bootstrap.KafkaPrecedence,
	Options: []fx.Option{
		fx.Provide(BindKafkaProperties, ProvideKafkaBinder),
		fx.Provide(tracingProvider()),
		fx.Invoke(initialize),
	},
}

Functions

func BindingName

func BindingName(name string) func(cfg *bindingConfig)

BindingName is a ProducerOptions or ConsumerOptions that specify the name of the binding. This name is used to read BindingProperties from bootstrap.ApplicationConfig If not specified, lower case of topic name is used. Regardless if name is specified or if corresponding BindingProperties is found, any ProducerOptions or ConsumerOptions used at compile time still apply. The overriding order is as follows:

BindingProperties with matching name >
BindingProperties with name "default" >
ProducerOptions or ConsumerOptions >
prepared defaults during initialization

func LogLevel

func LogLevel(level log.LoggingLevel) func(cfg *bindingConfig)

LogLevel is a ProducerOptions or ConsumerOptions that specify log level of Producer, Subscriber or Consumer

func NewKafkaError

func NewKafkaError(code int64, text string, causes ...interface{}) *CodedError

func Use

func Use()

Use Allow service to include this module in main()

Types

type AckMode

type AckMode string
const (
	AckModeModeAll   AckMode = "all"
	AckModeModeLocal AckMode = "local"
	AckModeModeNone  AckMode = "none"
)

func (*AckMode) UnmarshalText

func (m *AckMode) UnmarshalText(data []byte) error

type Binder

type Binder interface {
	// Produce create a message Producer to given topic, if not existed already. This would also auto-create topics if configured correctly
	// The returned Producer may also implement BindingLifecycle, in which case BindingLifecycle.Close can be used to manually release resources
	Produce(topic string, options ...ProducerOptions) (Producer, error)

	// Subscribe create a Subscriber of given topic, if not existed already.
	// The returned Subscriber may also implement BindingLifecycle, in which case BindingLifecycle.Close can be used to manually release resources
	Subscribe(topic string, options ...ConsumerOptions) (Subscriber, error)

	// Consume create a GroupConsumer of given topic and group, if not existed already.
	// The returned GroupConsumer may also implement BindingLifecycle, in which case BindingLifecycle.Close can be used to manually release resources
	Consume(topic string, group string, options ...ConsumerOptions) (GroupConsumer, error)

	// ListTopics list of topics of all managed bindings
	ListTopics() []string
}

func ProvideKafkaBinder

func ProvideKafkaBinder(di binderDI) Binder

type BinderLifecycle

type BinderLifecycle interface {
	// Initialize should be called only once, before Shutdown is executed.
	Initialize(ctx context.Context) error
	// Start should be called only once, before Shutdown is executed.
	// If the provided context is cancellable, the lifecycle is shutdown automatically when it happens
	Start(ctx context.Context) error
	// Shutdown can be called multiple times, but would do nothing if called more than once.
	// Important:
	// - Once called, Calling Initialize or Start would cause unexpected error or resource leak
	// - Once called, any Producer, Subscriber or GroupConsumer issued by this Binder are closed automatically.
	//	 They should be discarded
	Shutdown(ctx context.Context) error
	// Done channel used for monitoring lifecycle. Channel is closed during shutdown
	Done() <-chan struct{}
}

type BinderOption

type BinderOption struct {
	ApplicationConfig    bootstrap.ApplicationConfig
	Properties           KafkaProperties
	ProducerInterceptors []ProducerMessageInterceptor
	ConsumerInterceptors []ConsumerDispatchInterceptor
	HandlerInterceptors  []ConsumerHandlerInterceptor
	TLSCertsManager      certs.Manager
}

type BinderOptions

type BinderOptions func(opt *BinderOption)

type BinderProperties

type BinderProperties struct {
	InitialHeartbeat       utils.Duration `json:"init-heartbeat"`
	HeartbeatCurveFactor   float64        `json:"heartbeat-curve-factor"`
	HeartbeatCurveMidpoint float64        `json:"heartbeat-curve-midpoint"`
	WatchdogHeartbeat      utils.Duration `json:"watchdog-heartbeat"`
}

type BindingLifecycle

type BindingLifecycle interface {
	// Start initialize any connection and internal run loops.
	// Must be called after any configuration and before Producer, Subscriber or GroupConsumer is used
	Start(ctx context.Context) error

	// Close must be called to release any resource. Once called, regardless the return, this instance must be discarded
	Close() error

	// Closed returns if Close has been called
	Closed() bool
}

BindingLifecycle is the interface that controlling lifecycles of any Producer, Subscriber and GroupConsumer

type BindingProperties

type BindingProperties struct {
	Producer ProducerProperties `json:"producer"`
	Consumer ConsumerProperties `json:"consumer"`
}

type ConsumerDispatchFinalizer

type ConsumerDispatchFinalizer interface {
	// Finalize is called after message processing is finished by handlers.
	// Note: the *MessageContext will be discarded after all finalizers finished processing.
	// 		 So modifying given message context would only affect subsequent ConsumerDispatchFinalizer.Finalize on same message
	// Note 2: Finalize may also choose to handle given err and returns nil error.
	//		   In such case, subsequent ConsumerDispatchFinalizer.Finalize on same message would be invoked as if there was no error
	Finalize(msgCtx *MessageContext, err error) (*MessageContext, error)
}

ConsumerDispatchFinalizer is the interface for other package to finalize message processing. When any ConsumerDispatchInterceptor also implements ConsumerDispatchFinalizer, the Finalize function will be invoked after message processing finished

type ConsumerDispatchInterceptor

type ConsumerDispatchInterceptor interface {
	// Intercept is called before message is decoded and consumed by registered handlers.
	// Implementations can modify fields of MessageContext to manipulate behaviour.
	// When error is returned, dispatcher would cancel operation. Otherwise, a non-nil MessageContext must be returned
	Intercept(msgCtx *MessageContext) (*MessageContext, error)
}

type ConsumerGroupProperties

type ConsumerGroupProperties struct {
	JoinTimeout *utils.Duration `json:"join-timeout"`
	MaxRetry    *int            `json:"max-retry"`
	Backoff     *utils.Duration `json:"backoff-interval"`
}

type ConsumerHandlerInterceptor

type ConsumerHandlerInterceptor interface {
	// BeforeHandling is called after message is decoded and before handled by each registered handlers.
	// Implementations can modify fields of MessageContext to manipulate behaviour.
	// When error is returned, dispatcher would cancel operation. Otherwise, a non-nil MessageContext must be returned
	BeforeHandling(ctx context.Context, msg *Message) (context.Context, error)

	// AfterHandling is called after each registered handlers handles the message
	AfterHandling(ctx context.Context, msg *Message, err error) (context.Context, error)
}

type ConsumerOptions

type ConsumerOptions func(cfg *bindingConfig)

func WithConsumerProperties

func WithConsumerProperties(p *ConsumerProperties) ConsumerOptions

WithConsumerProperties apply options configured via ConsumerProperties

type ConsumerProperties

type ConsumerProperties struct {
	LogLevel *log.LoggingLevel       `json:"log-level"`
	Backoff  *utils.Duration         `json:"backoff-interval"`
	Group    ConsumerGroupProperties `json:"group"`
}

type DispatchOptions

type DispatchOptions func(h *handler)

func AddInterceptors added in v0.14.0

func AddInterceptors(interceptors ...ConsumerHandlerInterceptor) DispatchOptions

AddInterceptors returns a DispatchOptions that add ConsumerHandlerInterceptor to a MessageHandlerFunc

func FilterOnHeader

func FilterOnHeader(header string, matcher matcher.StringMatcher) DispatchOptions

FilterOnHeader returns a DispatchOptions specifying that the handler should be invoked when certain message header exists and matches the provided matcher

type Dispatcher added in v0.14.0

type Dispatcher struct {
	Interceptors []ConsumerDispatchInterceptor
	Logger       MessageLogger
	// contains filtered or unexported fields
}

Dispatcher process MessageContext and dispatch it to registered MessageHandlerFunc. This struct is intended for Subscriber or GroupConsumer implementors. It should not be directly used by application.

func (*Dispatcher) AddHandler added in v0.14.0

func (d *Dispatcher) AddHandler(fn MessageHandlerFunc, opts ...DispatchOptions) error

func (*Dispatcher) Dispatch added in v0.14.0

func (d *Dispatcher) Dispatch(msgCtx *MessageContext) (err error)

type Encoder

type Encoder interface {
	// MIMEType returns the MIME type value when using the Encoder
	MIMEType() string
	Encode(v interface{}) ([]byte, error)
}

type GroupConsumer

type GroupConsumer interface {
	// Topic returns the Topic name
	Topic() string

	// Group returns the group name
	Group() string

	// AddHandler register a message handler function that would process received messages.
	// Note: A GroupConsumer without a registered handler simply ignore all received messages.
	// 		 If AddHandler is called after Start, it may miss some messages
	AddHandler(handlerFunc MessageHandlerFunc, opts ...DispatchOptions) error
}

GroupConsumer provides consumer group workflow

type Headers

type Headers map[string]string

type HealthIndicator

type HealthIndicator struct {
	// contains filtered or unexported fields
}

func NewHealthIndicator

func NewHealthIndicator(binder Binder) *HealthIndicator

func (*HealthIndicator) Health

func (*HealthIndicator) Name

func (i *HealthIndicator) Name() string

type KafkaProperties

type KafkaProperties struct {
	Brokers  utils.CommaSeparatedSlice `json:"brokers"`
	Net      Net                       `json:"net"`
	Metadata Metadata                  `json:"metadata"`
	Binder   BinderProperties          `json:"binder"`
	ClientId string                    `json:"client-id"`
}

func BindKafkaProperties

func BindKafkaProperties(ctx *bootstrap.ApplicationContext) KafkaProperties

type LoggerOptions

type LoggerOptions func(opt *loggerOption)

type Message

type Message struct {
	Headers Headers
	Payload interface{}
}

type MessageContext

type MessageContext struct {
	context.Context

	Source     interface{}
	Topic      string
	Message    Message
	RawMessage interface{}
	// contains filtered or unexported fields
}

MessageContext internal use only, used by Interceptors and processors

type MessageFilterFunc

type MessageFilterFunc func(ctx context.Context, msg *Message) (shouldHandle bool)

type MessageHandlerFunc

type MessageHandlerFunc interface{}

MessageHandlerFunc is message handling function that conform with following signature:

	func (ctx context.Context, [OPTIONAL_INPUT_PARAMS...]) error

 Where OPTIONAL_INPUT_PARAMS could contain following components (of which order is not important):
	- PAYLOAD_PARAM 	< AnyPayloadType >: 	message payload, where PayloadType could be any type other than interface, function or chan.
		                         				If PayloadType is interface{}, raw []byte will be used
	- HEADERS_PARAM 	< Headers >: 			message headers
	- METADATA_PARAM 	< *MessageMetadata >: 	message metadata, includes timestamp, keys, partition, etc.
	- MESSAGE_PARAM 	< *Message >: 			raw message, where Message.Payload would be PayloadType if PAYLOAD_PARAM is also present, or []byte

For Example:

func Handle(ctx context.Context, payload *MyStruct) error
func Handle(ctx context.Context, payload *MyStruct, meta *MessageMetadata) error
func Handle(ctx context.Context, payload map[string]interface{}) error
func Handle(ctx context.Context, headers Headers, payload *MyStruct) error
func Handle(ctx context.Context, payload *MyStruct, raw *Message) error
func Handle(ctx context.Context, raw *Message) error

type MessageLogger

type MessageLogger interface {
	WithLevel(level log.LoggingLevel) MessageLogger
	LogSentMessage(ctx context.Context, msg interface{})
	LogReceivedMessage(ctx context.Context, msg interface{})
}

type MessageMetadata

type MessageMetadata struct {
	Key       []byte
	Partition int
	Offset    int
	Timestamp time.Time
}

type MessageOptions

type MessageOptions func(config *messageConfig)

func WithEncoder

func WithEncoder(valueEncoder Encoder) MessageOptions

WithEncoder specify how message payload is encoded. Default is "application/json;application/json;charset=utf-8"

func WithKey

func WithKey(key interface{}) MessageOptions

WithKey specify key used for the message. The key is typically used for partitioning. Supported values depends on the KeyEncoder option on the Producer. Default encoder support following types:

  • uuid.UUID
  • string
  • []byte
  • encoding.BinaryMarshaler

type Metadata

type Metadata struct {
	RefreshFrequency utils.Duration `json:"refresh-frequency"`
}

type Net

type Net struct {
	Sasl SASL `json:"sasl"`
	Tls  TLS  `json:"tls"`
}

type Producer

type Producer interface {
	// Topic returns the Topic name
	Topic() string

	// Send publish given message to the TOPIC with pre-configured producer settings
	// supported message types are:
	// 	- *Message
	// 	- Message
	//  - any type of body, the body will be serialized using value encoder from options
	Send(ctx context.Context, message interface{}, options ...MessageOptions) error

	// ReadyCh is used to monitor producer status. The channel is closed once the Producer is ready to send messages.
	ReadyCh() <-chan struct{}
}

type ProducerMessageFinalizer

type ProducerMessageFinalizer interface {
	// Finalize is called after message delivery is confirmed.
	// "confirmed" status depends on Ack mode of the message.
	// e.g.
	// 	- if the message uses RequireNoAck, Finalize is called right after sending the message
	// 	- if the message uses RequireAllAck, Finalize is called when Ack is received from all replicas
	//
	// Finalize may also be invoked in different goroutine if delivery mode is "sync"
	//
	// Note: the *MessageContext will be discarded after all finalizers finished processing.
	// 		 So modifying given message context would only affect subsequent ProducerMessageFinalizer.Finalize on same message
	// Note 2: Finalize may also choose to handle given err and returns nil error.
	//		   In such case, subsequent ProducerMessageFinalizer.Finalize on same message would be invoked as if there was no error
	Finalize(msgCtx *MessageContext, partition int32, offset int64, err error) (*MessageContext, error)
}

ProducerMessageFinalizer is the interface for other package to finalize message sending process. When any ProducerMessageInterceptor also implements ProducerMessageFinalizer, the Finalize function will be invoked after message delivery is confirmed

type ProducerMessageInterceptor

type ProducerMessageInterceptor interface {
	// Intercept is called before raw message is prepared and send.
	// Implementations can modify fields of MessageContext to manipulate sending behaviour.
	// When error is returned, Producer would cancel operation. Otherwise, a non-nil MessageContext must be returned
	Intercept(msgCtx *MessageContext) (*MessageContext, error)
}

type ProducerOptions

type ProducerOptions func(cfg *bindingConfig)

func AckTimeout

func AckTimeout(timeout time.Duration) ProducerOptions

func KeyEncoder

func KeyEncoder(enc Encoder) ProducerOptions

KeyEncoder configures Producer with given encoder for serializing message key

func Partitions

func Partitions(partitionCount int, replicationFactor int) ProducerOptions

Partitions configure Producer's topic provisioning, by specifying min partition required and their replica number (min.insync.replicas) in case topics are auto-created

func RequireAllAck

func RequireAllAck() ProducerOptions

RequireAllAck waits for all in-sync replicas to commit before responding. The minimum number of in-sync replicas is configured on the broker via the `min.insync.replicas` configuration Key.

func RequireLocalAck

func RequireLocalAck() ProducerOptions

RequireLocalAck waits for only the local commit to succeed before responding.

func RequireNoAck

func RequireNoAck() ProducerOptions

RequireNoAck doesn't send any response, the TCP ACK is all you get.

func WithProducerProperties

func WithProducerProperties(p *ProducerProperties) ProducerOptions

WithProducerProperties apply options configured via ProducerProperties

type ProducerProperties

type ProducerProperties struct {
	LogLevel     *log.LoggingLevel      `json:"log-level"`
	AckMode      *AckMode               `json:"ack-mode"`
	AckTimeout   *utils.Duration        `json:"ack-timeout"`
	MaxRetry     *int                   `json:"max-retry"`
	Backoff      *utils.Duration        `json:"backoff-interval"`
	Provisioning ProvisioningProperties `json:"provisioning"`
}

type ProvisioningProperties

type ProvisioningProperties struct {
	// AutoCreateTopic when topic doesn't exist, whether attempt to create one
	AutoCreateTopic *bool `json:"auto-create-topic"`

	// AutoAddPartitions when actual partition counts is less than PartitionCount, whether attempt to add more partitions
	AutoAddPartitions *bool `json:"auto-add-partitions"`

	// AllowLowerPartitions when actual partition counts is less than PartitionCount but AutoAddPartitions is false,
	// whether return an error
	AllowLowerPartitions *bool `json:"allow-lower-partitions"`

	// PartitionCount number of partitions of given topic
	PartitionCount *int32 `json:"partition-count"`

	// ReplicationFactor number of replicas per partition when creating topic
	ReplicationFactor *int16 `json:"replication-factor"`
}

type SASL

type SASL struct {
	// Whether or not to use SASL authentication when connecting to the broker
	// (defaults to false).
	Enable bool `json:"enabled"`
	// Whether or not to send the Kafka SASL handshake first if enabled
	// (defaults to true). You should only set this to false if you're using
	// a non-Kafka SASL proxy.
	Handshake bool `json:"handshake"`
	//username and password for SASL/PLAIN authentication
	User     string `json:"user"`
	Password string `josn:"password"`
}

type SaramaBinder

type SaramaBinder interface {
	Binder
	Client() sarama.Client
}

type SaramaKafkaBinder

type SaramaKafkaBinder struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewBinder

func NewBinder(ctx context.Context, opts ...BinderOptions) *SaramaKafkaBinder

func (*SaramaKafkaBinder) Client

func (b *SaramaKafkaBinder) Client() sarama.Client

func (*SaramaKafkaBinder) CloseProducer

func (b *SaramaKafkaBinder) CloseProducer(ctx context.Context, topic string)

CloseProducer release resources for dynamic producers

func (*SaramaKafkaBinder) Consume

func (b *SaramaKafkaBinder) Consume(topic string, group string, options ...ConsumerOptions) (GroupConsumer, error)

func (*SaramaKafkaBinder) Done

func (b *SaramaKafkaBinder) Done() <-chan struct{}

func (*SaramaKafkaBinder) Initialize

func (b *SaramaKafkaBinder) Initialize(ctx context.Context) (err error)

Initialize implements BinderLifecycle, prepare for use, negotiate default configs, etc.

func (*SaramaKafkaBinder) ListTopics

func (b *SaramaKafkaBinder) ListTopics() (topics []string)

func (*SaramaKafkaBinder) Produce

func (b *SaramaKafkaBinder) Produce(topic string, options ...ProducerOptions) (Producer, error)

func (*SaramaKafkaBinder) Shutdown

func (b *SaramaKafkaBinder) Shutdown(ctx context.Context) error

Shutdown implements BinderLifecycle, close resources

func (*SaramaKafkaBinder) Start

func (b *SaramaKafkaBinder) Start(ctx context.Context) (err error)

Start implements BinderLifecycle, start all bindings if not started yet (Producer, Subscriber, GroupConsumer, etc).

func (*SaramaKafkaBinder) Subscribe

func (b *SaramaKafkaBinder) Subscribe(topic string, options ...ConsumerOptions) (Subscriber, error)

type Subscriber

type Subscriber interface {
	// Topic returns the Topic name
	Topic() string

	// Partitions returns subscribed partitions
	Partitions() []int32

	// AddHandler register a message handler function that would process received messages.
	// Note: A Subscriber without a registered handler simply ignore all received messages.
	// 		 If AddHandler is called after Start, it may miss some messages
	AddHandler(handlerFunc MessageHandlerFunc, opts ...DispatchOptions) error
}

Subscriber provides Pub-Sub workflow

type TLS

type TLS struct {
	Enable bool                   `json:"enabled"`
	Certs  certs.SourceProperties `json:"certs"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL