Versions in this module Expand all Collapse all v1 v1.0.0 Aug 20, 2018 Changes in this version + func Register(name string, queueHandler queueHandler) error + func Use(name string) (queueHandler, error) + type Consumer struct + func (c *Consumer) Close() + func (c *Consumer) MarkOffset(message *sarama.ConsumerMessage) + func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage + type Handler interface + GetQueue func(name string) (QueueHandler, error) + Initiate func(ctx context.Context) error + NewQueue func(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error) + func NewKafkaHandler() Handler + type Instance struct + Config *config.Instance + Utility *utility.Instance + func NewInstance() *Instance + func (this *Instance) GetQueue(name string) (QueueHandler, error) + func (this *Instance) HandlerName() string + func (this *Instance) Initiate(ctx context.Context) (newCtx context.Context, err error) + func (this *Instance) NewQueue(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error) + func (this *Instance) OnRequestShutdown(c *routing.Context) error + func (this *Instance) OnRequestStartup(c *routing.Context) error + func (this *Instance) OnShutdown(ctx context.Context) (context.Context, error) + func (this *Instance) OnStartup(ctx context.Context) (context.Context, error) + func (this *Instance) Use(ctx context.Context, handlerName string) error + type KafkaHandler struct + func (this *KafkaHandler) GetQueue(name string) (QueueHandler, error) + func (this *KafkaHandler) Initiate(ctx context.Context) error + func (this *KafkaHandler) NewQueue(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error) + type KafkaQueue struct + func (this *KafkaQueue) GetConfig() map[string]interface{} + func (this *KafkaQueue) NewConsumer(ctx context.Context, groupName string, topic string) (*Consumer, error) + func (this *KafkaQueue) NewProducer(ctx context.Context, topic string, ...) (*Producer, error) + type Producer struct + func (p *Producer) Produce(message []byte) + type QueueHandler interface + GetConfig func() map[string]interface{} + NewConsumer func(ctx context.Context, group string, topic string) (*Consumer, error) + NewProducer func(ctx context.Context, topic string, ...) (*Producer, error)