Documentation ¶
Index ¶
- Constants
- Variables
- func BindingName(name string) func(cfg *bindingConfig)
- func LogLevel(level log.LoggingLevel) func(cfg *bindingConfig)
- func NewKafkaError(code int64, text string, causes ...interface{}) *CodedError
- func Use()
- type AckMode
- type Binder
- type BinderLifecycle
- type BinderOption
- type BinderOptions
- type BinderProperties
- type BindingLifecycle
- type BindingProperties
- type ConsumerDispatchFinalizer
- type ConsumerDispatchInterceptor
- type ConsumerGroupProperties
- type ConsumerHandlerInterceptor
- type ConsumerOptions
- type ConsumerProperties
- type DispatchOptions
- type Encoder
- type GroupConsumer
- type Headers
- type HealthIndicator
- type KafkaProperties
- type LoggerOptions
- type Message
- type MessageContext
- type MessageFilterFunc
- type MessageHandlerFunc
- type MessageLogger
- type MessageMetadata
- type MessageOptions
- type Metadata
- type Net
- type Producer
- type ProducerMessageFinalizer
- type ProducerMessageInterceptor
- type ProducerOptions
- func AckTimeout(timeout time.Duration) ProducerOptions
- func KeyEncoder(enc Encoder) ProducerOptions
- func Partitions(partitionCount int, replicationFactor int) ProducerOptions
- func RequireAllAck() ProducerOptions
- func RequireLocalAck() ProducerOptions
- func RequireNoAck() ProducerOptions
- func WithProducerProperties(p *ProducerProperties) ProducerOptions
- type ProducerProperties
- type ProvisioningProperties
- type SASL
- type SaramaBinder
- type SaramaKafkaBinder
- func (b *SaramaKafkaBinder) Client() sarama.Client
- func (b *SaramaKafkaBinder) CloseProducer(ctx context.Context, topic string)
- func (b *SaramaKafkaBinder) Consume(topic string, group string, options ...ConsumerOptions) (GroupConsumer, error)
- func (b *SaramaKafkaBinder) Done() <-chan struct{}
- func (b *SaramaKafkaBinder) Initialize(ctx context.Context) (err error)
- func (b *SaramaKafkaBinder) ListTopics() (topics []string)
- func (b *SaramaKafkaBinder) Produce(topic string, options ...ProducerOptions) (Producer, error)
- func (b *SaramaKafkaBinder) Shutdown(ctx context.Context) error
- func (b *SaramaKafkaBinder) Start(ctx context.Context) (err error)
- func (b *SaramaKafkaBinder) Subscribe(topic string, options ...ConsumerOptions) (Subscriber, error)
- type Subscriber
- type TLS
Constants ¶
const ( MIMETypeJson = "application/json;charset=utf-8" MIMETypeBinary = "application/octet-stream" MIMETypeText = "text/plain" )
const ( ErrorTypeCodeBinding = Reserved + iota<<errorutils.ErrorTypeOffset ErrorTypeCodeProducer ErrorTypeCodeConsumer )
All "Type" values are used as mask
const ( ErrorSubTypeCodeBindingInternal = ErrorTypeCodeBinding + iota<<errorutils.ErrorSubTypeOffset ErrorSubTypeCodeConnectivity ErrorSubTypeCodeProvisioning )
All "SubType" values are used as mask sub-types of ErrorTypeCodeBinding
const ( ErrorCodeIllegalState = ErrorSubTypeCodeProvisioning + iota ErrorCodeProducerExists ErrorCodeConsumerExists ErrorCodeAutoCreateTopicFailed ErrorCodeAutoAddPartitionsFailed ErrorCodeIllegalLifecycleState )
ErrorSubTypeCodeProvisioning
const ( ErrorSubTypeCodeProducerGeneral = ErrorTypeCodeProducer + iota<<errorutils.ErrorSubTypeOffset ErrorSubTypeCodeIllegalProducerUsage ErrorSubTypeCodeEncoding )
All "SubType" values are used as mask sub-types of ErrorTypeProducer
const ( ErrorSubTypeCodeConsumerGeneral = ErrorTypeCodeConsumer + iota<<errorutils.ErrorSubTypeOffset ErrorSubTypeCodeIllegalConsumerUsage ErrorSubTypeCodeDecoding )
All "SubType" values are used as mask sub-types of ErrorTypeConsumer
const ( ConfigKafkaPrefix = "kafka" ConfigKafkaBindingPrefix = "kafka.bindings" ConfigKafkaDefaultBindingPrefix = "kafka.bindings.default" )
const (
ErrorCodeBindingInternal = ErrorSubTypeCodeBindingInternal + iota
)
ErrorSubTypeCodeBindingInternal
const (
ErrorCodeBrokerNotReachable = ErrorSubTypeCodeConnectivity + iota
)
ErrorSubTypeCodeConnectivity
const (
FxGroup = "kafka"
)
const (
HeaderContentType = "contentType"
)
const ( // Reserved kafka reserved error range Reserved = 0x1a << errorutils.ReservedOffset )
Variables ¶
var ( ErrorCategoryKafka = NewErrorCategory(Reserved, errors.New("error type: kafka")) ErrorTypeBinding = NewErrorType(ErrorTypeCodeBinding, errors.New("error type: binding")) ErrorTypeProducer = NewErrorType(ErrorTypeCodeProducer, errors.New("error type: producer")) ErrorTypeConsumer = NewErrorType(ErrorTypeCodeConsumer, errors.New("error type: consumer")) ErrorSubTypeBindingInternal = NewErrorSubType(ErrorSubTypeCodeBindingInternal, errors.New("error sub-type: internal")) ErrorSubTypeConnectivity = NewErrorSubType(ErrorSubTypeCodeConnectivity, errors.New("error sub-type: connectivity")) ErrorSubTypeProvisioning = NewErrorSubType(ErrorSubTypeCodeProvisioning, errors.New("error sub-type: provisioning")) ErrorSubTypeProducerGeneral = NewErrorSubType(ErrorSubTypeCodeProducerGeneral, errors.New("error sub-type: producer")) ErrorSubTypeIllegalProducerUsage = NewErrorSubType(ErrorSubTypeCodeIllegalProducerUsage, errors.New("error sub-type: producer api usage")) ErrorSubTypeEncoding = NewErrorSubType(ErrorSubTypeCodeEncoding, errors.New("error sub-type: encoding")) ErrorSubTypeConsumerGeneral = NewErrorSubType(ErrorSubTypeCodeConsumerGeneral, errors.New("error sub-type: consumer")) ErrorSubTypeIllegalConsumerUsage = NewErrorSubType(ErrorSubTypeCodeIllegalConsumerUsage, errors.New("error sub-type: consumer api usage")) ErrorSubTypeDecoding = NewErrorSubType(ErrorSubTypeCodeDecoding, errors.New("error sub-type: decoding")) ErrorStartClosedBinding = NewKafkaError(ErrorCodeIllegalLifecycleState, "error: cannot start closed binding") )
ErrorTypes, can be used in errors.Is
var Module = &bootstrap.Module{ Precedence: bootstrap.KafkaPrecedence, Options: []fx.Option{ fx.Provide(BindKafkaProperties, ProvideKafkaBinder), fx.Invoke(initialize), }, }
Functions ¶
func BindingName ¶
func BindingName(name string) func(cfg *bindingConfig)
BindingName is a ProducerOptions or ConsumerOptions that specify the name of the binding. This name is used to read BindingProperties from bootstrap.ApplicationConfig If not specified, lower case of topic name is used. Regardless if name is specified or if corresponding BindingProperties is found, any ProducerOptions or ConsumerOptions used at compile time still apply. The overriding order is as follows:
BindingProperties with matching name > BindingProperties with name "default" > ProducerOptions or ConsumerOptions > prepared defaults during initialization
func LogLevel ¶
func LogLevel(level log.LoggingLevel) func(cfg *bindingConfig)
LogLevel is a ProducerOptions or ConsumerOptions that specify log level of Producer, Subscriber or Consumer
func NewKafkaError ¶
Types ¶
type Binder ¶
type Binder interface { // Produce create a message Producer to given topic, if not existed already. This would also auto-create topics if configured correctly // The returned Producer may also implement BindingLifecycle, in which case BindingLifecycle.Close can be used to manually release resources Produce(topic string, options ...ProducerOptions) (Producer, error) // Subscribe create a Subscriber of given topic, if not existed already. // The returned Subscriber may also implement BindingLifecycle, in which case BindingLifecycle.Close can be used to manually release resources Subscribe(topic string, options ...ConsumerOptions) (Subscriber, error) // Consume create a GroupConsumer of given topic and group, if not existed already. // The returned GroupConsumer may also implement BindingLifecycle, in which case BindingLifecycle.Close can be used to manually release resources Consume(topic string, group string, options ...ConsumerOptions) (GroupConsumer, error) // ListTopics list of topics of all managed bindings ListTopics() []string }
func ProvideKafkaBinder ¶
func ProvideKafkaBinder(di binderDI) Binder
type BinderLifecycle ¶
type BinderLifecycle interface { // Initialize should be called only once, before Shutdown is executed. Initialize(ctx context.Context) error // Start should be called only once, before Shutdown is executed. // If the provided context is cancellable, the lifecycle is shutdown automatically when it happens Start(ctx context.Context) error // Shutdown can be called multiple times, but would do nothing if called more than once. // Important: // - Once called, Calling Initialize or Start would cause unexpected error or resource leak // - Once called, any Producer, Subscriber or GroupConsumer issued by this Binder are closed automatically. // They should be discarded Shutdown(ctx context.Context) error // Done channel used for monitoring lifecycle. Channel is closed during shutdown Done() <-chan struct{} }
type BinderOption ¶
type BinderOption struct { ApplicationConfig bootstrap.ApplicationConfig Properties KafkaProperties ProducerInterceptors []ProducerMessageInterceptor ConsumerInterceptors []ConsumerDispatchInterceptor HandlerInterceptors []ConsumerHandlerInterceptor TLSCertsManager certs.Manager }
type BinderOptions ¶
type BinderOptions func(opt *BinderOption)
type BinderProperties ¶
type BindingLifecycle ¶
type BindingLifecycle interface { // Start initialize any connection and internal run loops. // Must be called after any configuration and before Producer, Subscriber or GroupConsumer is used Start(ctx context.Context) error // Close must be called to release any resource. Once called, regardless the return, this instance must be discarded Close() error // Closed returns if Close has been called Closed() bool }
BindingLifecycle is the interface that controlling lifecycles of any Producer, Subscriber and GroupConsumer
type BindingProperties ¶
type BindingProperties struct { Producer ProducerProperties `json:"producer"` Consumer ConsumerProperties `json:"consumer"` }
type ConsumerDispatchFinalizer ¶
type ConsumerDispatchFinalizer interface { // Finalize is called after message processing is finished by handlers. // Note: the *MessageContext will be discarded after all finalizers finished processing. // So modifying given message context would only affect subsequent ConsumerDispatchFinalizer.Finalize on same message // Note 2: Finalize may also choose to handle given err and returns nil error. // In such case, subsequent ConsumerDispatchFinalizer.Finalize on same message would be invoked as if there was no error Finalize(msgCtx *MessageContext, err error) (*MessageContext, error) }
ConsumerDispatchFinalizer is the interface for other package to finalize message processing. When any ConsumerDispatchInterceptor also implements ConsumerDispatchFinalizer, the Finalize function will be invoked after message processing finished
type ConsumerDispatchInterceptor ¶
type ConsumerDispatchInterceptor interface { // Intercept is called before message is decoded and consumed by registered handlers. // Implementations can modify fields of MessageContext to manipulate behaviour. // When error is returned, dispatcher would cancel operation. Otherwise, a non-nil MessageContext must be returned Intercept(msgCtx *MessageContext) (*MessageContext, error) }
type ConsumerGroupProperties ¶
type ConsumerHandlerInterceptor ¶
type ConsumerHandlerInterceptor interface { // BeforeHandling is called after message is decoded and before handled by each registered handlers. // Implementations can modify fields of MessageContext to manipulate behaviour. // When error is returned, dispatcher would cancel operation. Otherwise, a non-nil MessageContext must be returned BeforeHandling(ctx context.Context, msg *Message) (context.Context, error) // AfterHandling is called after each registered handlers handles the message AfterHandling(ctx context.Context, msg *Message, err error) (context.Context, error) }
type ConsumerOptions ¶
type ConsumerOptions func(cfg *bindingConfig)
func WithConsumerProperties ¶
func WithConsumerProperties(p *ConsumerProperties) ConsumerOptions
WithConsumerProperties apply options configured via ConsumerProperties
type ConsumerProperties ¶
type ConsumerProperties struct { LogLevel *log.LoggingLevel `json:"log-level"` Backoff *utils.Duration `json:"backoff-interval"` Group ConsumerGroupProperties `json:"group"` }
type DispatchOptions ¶
type DispatchOptions func(h *handler)
func FilterOnHeader ¶
func FilterOnHeader(header string, matcher matcher.StringMatcher) DispatchOptions
FilterOnHeader returns a DispatchOptions specifying that the handler should be invoked when certain message header exists and matches the provided matcher
type GroupConsumer ¶
type GroupConsumer interface { // Topic returns the Topic name Topic() string // Group returns the group name Group() string // AddHandler register a message handler function that would process received messages. // Note: A GroupConsumer without a registered handler simply ignore all received messages. // If AddHandler is called after Start, it may miss some messages AddHandler(handlerFunc MessageHandlerFunc, opts ...DispatchOptions) error }
GroupConsumer provides consumer group workflow
type HealthIndicator ¶
type HealthIndicator struct {
// contains filtered or unexported fields
}
func NewHealthIndicator ¶
func NewHealthIndicator(binder Binder) *HealthIndicator
func (*HealthIndicator) Name ¶
func (i *HealthIndicator) Name() string
type KafkaProperties ¶
type KafkaProperties struct { Brokers utils.CommaSeparatedSlice `json:"brokers"` Net Net `json:"net"` Metadata Metadata `json:"metadata"` Binder BinderProperties `json:"binder"` ClientId string `json:"client-id"` }
func BindKafkaProperties ¶
func BindKafkaProperties(ctx *bootstrap.ApplicationContext) KafkaProperties
type LoggerOptions ¶
type LoggerOptions func(opt *loggerOption)
type MessageContext ¶
type MessageContext struct { context.Context Source interface{} Topic string Message Message RawMessage interface{} // contains filtered or unexported fields }
MessageContext internal use only, used by interceptors and processors
type MessageFilterFunc ¶
type MessageHandlerFunc ¶
type MessageHandlerFunc interface{}
MessageHandlerFunc is message handling function that conform with following signature:
func (ctx context.Context, [OPTIONAL_INPUT_PARAMS...]) error Where OPTIONAL_INPUT_PARAMS could contain following components (of which order is not important): - PAYLOAD_PARAM < AnyPayloadType >: message payload, where PayloadType could be any type other than interface, function or chan. If PayloadType is interface{}, raw []byte will be used - HEADERS_PARAM < Headers >: message headers - METADATA_PARAM < *MessageMetadata >: message metadata, includes timestamp, keys, partition, etc. - MESSAGE_PARAM < *Message >: raw message, where Message.Payload would be PayloadType if PAYLOAD_PARAM is also present, or []byte
For Example:
func Handle(ctx context.Context, payload *MyStruct) error func Handle(ctx context.Context, payload *MyStruct, meta *MessageMetadata) error func Handle(ctx context.Context, payload map[string]interface{}) error func Handle(ctx context.Context, headers Headers, payload *MyStruct) error func Handle(ctx context.Context, payload *MyStruct, raw *Message) error func Handle(ctx context.Context, raw *Message) error
type MessageLogger ¶
type MessageLogger interface { WithLevel(level log.LoggingLevel) MessageLogger LogSentMessage(ctx context.Context, msg interface{}) LogReceivedMessage(ctx context.Context, msg interface{}) }
type MessageMetadata ¶
type MessageOptions ¶
type MessageOptions func(config *messageConfig)
func WithEncoder ¶
func WithEncoder(valueEncoder Encoder) MessageOptions
WithEncoder specify how message payload is encoded. Default is "application/json;application/json;charset=utf-8"
func WithKey ¶
func WithKey(key interface{}) MessageOptions
WithKey specify key used for the message. The key is typically used for partitioning. Supported values depends on the KeyEncoder option on the Producer. Default encoder support following types:
- uuid.UUID
- string
- []byte
- encoding.BinaryMarshaler
type Producer ¶
type Producer interface { // Topic returns the Topic name Topic() string // Send publish given message to the TOPIC with pre-configured producer settings // supported message types are: // - *Message // - Message // - any type of body, the body will be serialized using value encoder from options Send(ctx context.Context, message interface{}, options ...MessageOptions) error // ReadyCh is used to monitor producer status. The channel is closed once the Producer is ready to send messages. ReadyCh() <-chan struct{} }
type ProducerMessageFinalizer ¶
type ProducerMessageFinalizer interface { // Finalize is called after message delivery is confirmed. // "confirmed" status depends on Ack mode of the message. // e.g. // - if the message uses RequireNoAck, Finalize is called right after sending the message // - if the message uses RequireAllAck, Finalize is called when Ack is received from all replicas // // Finalize may also be invoked in different goroutine if delivery mode is "sync" // // Note: the *MessageContext will be discarded after all finalizers finished processing. // So modifying given message context would only affect subsequent ProducerMessageFinalizer.Finalize on same message // Note 2: Finalize may also choose to handle given err and returns nil error. // In such case, subsequent ProducerMessageFinalizer.Finalize on same message would be invoked as if there was no error Finalize(msgCtx *MessageContext, partition int32, offset int64, err error) (*MessageContext, error) }
ProducerMessageFinalizer is the interface for other package to finalize message sending process. When any ProducerMessageInterceptor also implements ProducerMessageFinalizer, the Finalize function will be invoked after message delivery is confirmed
type ProducerMessageInterceptor ¶
type ProducerMessageInterceptor interface { // Intercept is called before raw message is prepared and send. // Implementations can modify fields of MessageContext to manipulate sending behaviour. // When error is returned, Producer would cancel operation. Otherwise, a non-nil MessageContext must be returned Intercept(msgCtx *MessageContext) (*MessageContext, error) }
type ProducerOptions ¶
type ProducerOptions func(cfg *bindingConfig)
func AckTimeout ¶
func AckTimeout(timeout time.Duration) ProducerOptions
func KeyEncoder ¶
func KeyEncoder(enc Encoder) ProducerOptions
KeyEncoder configures Producer with given encoder for serializing message key
func Partitions ¶
func Partitions(partitionCount int, replicationFactor int) ProducerOptions
Partitions configure Producer's topic provisioning, by specifying min partition required and their replica number (min.insync.replicas) in case topics are auto-created
func RequireAllAck ¶
func RequireAllAck() ProducerOptions
RequireAllAck waits for all in-sync replicas to commit before responding. The minimum number of in-sync replicas is configured on the broker via the `min.insync.replicas` configuration Key.
func RequireLocalAck ¶
func RequireLocalAck() ProducerOptions
RequireLocalAck waits for only the local commit to succeed before responding.
func RequireNoAck ¶
func RequireNoAck() ProducerOptions
RequireNoAck doesn't send any response, the TCP ACK is all you get.
func WithProducerProperties ¶
func WithProducerProperties(p *ProducerProperties) ProducerOptions
WithProducerProperties apply options configured via ProducerProperties
type ProducerProperties ¶
type ProducerProperties struct { LogLevel *log.LoggingLevel `json:"log-level"` AckMode *AckMode `json:"ack-mode"` AckTimeout *utils.Duration `json:"ack-timeout"` MaxRetry *int `json:"max-retry"` Backoff *utils.Duration `json:"backoff-interval"` Provisioning ProvisioningProperties `json:"provisioning"` }
type ProvisioningProperties ¶
type ProvisioningProperties struct { // AutoCreateTopic when topic doesn't exist, whether attempt to create one AutoCreateTopic *bool `json:"auto-create-topic"` // AutoAddPartitions when actual partition counts is less than PartitionCount, whether attempt to add more partitions AutoAddPartitions *bool `json:"auto-add-partitions"` // AllowLowerPartitions when actual partition counts is less than PartitionCount but AutoAddPartitions is false, // whether return an error AllowLowerPartitions *bool `json:"allow-lower-partitions"` // PartitionCount number of partitions of given topic PartitionCount *int32 `json:"partition-count"` // ReplicationFactor number of replicas per partition when creating topic ReplicationFactor *int16 `json:"replication-factor"` }
type SASL ¶
type SASL struct { // Whether or not to use SASL authentication when connecting to the broker // (defaults to false). Enable bool `json:"enabled"` // Whether or not to send the Kafka SASL handshake first if enabled // (defaults to true). You should only set this to false if you're using // a non-Kafka SASL proxy. Handshake bool `json:"handshake"` //username and password for SASL/PLAIN authentication User string `json:"user"` Password string `josn:"password"` }
type SaramaBinder ¶
type SaramaKafkaBinder ¶
func NewBinder ¶
func NewBinder(ctx context.Context, opts ...BinderOptions) *SaramaKafkaBinder
func (*SaramaKafkaBinder) Client ¶
func (b *SaramaKafkaBinder) Client() sarama.Client
func (*SaramaKafkaBinder) CloseProducer ¶
func (b *SaramaKafkaBinder) CloseProducer(ctx context.Context, topic string)
CloseProducer release resources for dynamic producers
func (*SaramaKafkaBinder) Consume ¶
func (b *SaramaKafkaBinder) Consume(topic string, group string, options ...ConsumerOptions) (GroupConsumer, error)
func (*SaramaKafkaBinder) Done ¶
func (b *SaramaKafkaBinder) Done() <-chan struct{}
func (*SaramaKafkaBinder) Initialize ¶
func (b *SaramaKafkaBinder) Initialize(ctx context.Context) (err error)
Initialize implements BinderLifecycle, prepare for use, negotiate default configs, etc.
func (*SaramaKafkaBinder) ListTopics ¶
func (b *SaramaKafkaBinder) ListTopics() (topics []string)
func (*SaramaKafkaBinder) Produce ¶
func (b *SaramaKafkaBinder) Produce(topic string, options ...ProducerOptions) (Producer, error)
func (*SaramaKafkaBinder) Shutdown ¶
func (b *SaramaKafkaBinder) Shutdown(ctx context.Context) error
Shutdown implements BinderLifecycle, close resources
func (*SaramaKafkaBinder) Start ¶
func (b *SaramaKafkaBinder) Start(ctx context.Context) (err error)
Start implements BinderLifecycle, start all bindings if not started yet (Producer, Subscriber, GroupConsumer, etc).
func (*SaramaKafkaBinder) Subscribe ¶
func (b *SaramaKafkaBinder) Subscribe(topic string, options ...ConsumerOptions) (Subscriber, error)
type Subscriber ¶
type Subscriber interface { // Topic returns the Topic name Topic() string // Partitions returns subscribed partitions Partitions() []int32 // AddHandler register a message handler function that would process received messages. // Note: A Subscriber without a registered handler simply ignore all received messages. // If AddHandler is called after Start, it may miss some messages AddHandler(handlerFunc MessageHandlerFunc, opts ...DispatchOptions) error }
Subscriber provides Pub-Sub workflow
type TLS ¶
type TLS struct { Enable bool `json:"enabled"` Certs certs.SourceProperties `json:"certs"` }