Documentation ¶
Index ¶
- type Dispatcher
- type DispatcherConfig
- type DispatcherImpl
- func (d *DispatcherImpl) ObserveMetrics(interval time.Duration)
- func (d *DispatcherImpl) SecretChanged(ctx context.Context, secret *corev1.Secret)
- func (d *DispatcherImpl) Shutdown()
- func (d *DispatcherImpl) UpdateSubscriptions(ctx context.Context, channelRef types.NamespacedName, ...) commonconsumer.SubscriberStatusMap
- type Handler
- type SubscriberWrapper
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher interface { SecretChanged(ctx context.Context, secret *corev1.Secret) Shutdown() UpdateSubscriptions(ctx context.Context, channelRef types.NamespacedName, subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap }
Dispatcher Interface
func NewDispatcher ¶
func NewDispatcher(dispatcherConfig DispatcherConfig, controlServer controlprotocol.ServerHandler, enqueue func(ref types.NamespacedName)) (Dispatcher, <-chan commonconsumer.ManagerEvent)
NewDispatcher Is The Dispatcher Constructor
type DispatcherConfig ¶
type DispatcherConfig struct { Logger *zap.Logger ClientId string Brokers []string Topic string ChannelKey string StatsReporter metrics.StatsReporter MetricsRegistry gometrics.Registry SaramaConfig *sarama.Config }
DispatcherConfig Defines A Dispatcher Config Struct To Hold Configuration
type DispatcherImpl ¶
type DispatcherImpl struct { DispatcherConfig MetricsStopChan chan struct{} MetricsStoppedChan chan struct{} // contains filtered or unexported fields }
DispatcherImpl Is A Struct With Configuration & ConsumerGroup State
func (*DispatcherImpl) ObserveMetrics ¶ added in v0.22.0
func (d *DispatcherImpl) ObserveMetrics(interval time.Duration)
ObserveMetrics Is An Async Process For Observing Kafka Metrics
func (*DispatcherImpl) SecretChanged ¶ added in v0.21.0
func (d *DispatcherImpl) SecretChanged(ctx context.Context, secret *corev1.Secret)
SecretChanged is called by the secretObserver handler function in main() so that settings specific to the dispatcher may be changed if necessary
func (*DispatcherImpl) UpdateSubscriptions ¶
func (d *DispatcherImpl) UpdateSubscriptions(ctx context.Context, channelRef types.NamespacedName, subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap
UpdateSubscriptions manages the Dispatcher's Subscriptions to align with new state
type Handler ¶
type Handler struct { Logger *zap.Logger GroupId string Subscriber *eventingduck.SubscriberSpec MessageDispatcher channel.MessageDispatcher // contains filtered or unexported fields }
Handler Struct implementing the KafkaConsumerHandler Interface
func NewHandler ¶
func NewHandler(logger *zap.Logger, groupId string, subscriber *eventingduck.SubscriberSpec) *Handler
NewHandler creates a new Handler instance.
func (*Handler) GetConsumerGroup ¶ added in v0.24.0
GetConsumerGroup returns the ConsumerGroup ID of the Handler
func (*Handler) Handle ¶ added in v0.24.0
func (h *Handler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error)
Handle is responsible for processing the individual ConsumerMessages. The first return bool indicates whether to MarkOffset in the ConsumerGroup and the second error value will be sent to the ConsumerGroups error channel as well as the SetReady() function.
type SubscriberWrapper ¶
type SubscriberWrapper struct { eventingduck.SubscriberSpec GroupId string }
SubscriberWrapper Defines A Knative Eventing SubscriberSpec Wrapper Enhanced With Sarama ConsumerGroup ID
func NewSubscriberWrapper ¶
func NewSubscriberWrapper(subscriberSpec eventingduck.SubscriberSpec, groupId string) *SubscriberWrapper
NewSubscriberWrapper Is The SubscriberWrapper Constructor