queue

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2018 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Register

func Register(name string, queueHandler queueHandler) error

func Use

func Use(name string) (queueHandler, error)

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close()

关闭消费者

func (*Consumer) MarkOffset

func (c *Consumer) MarkOffset(message *sarama.ConsumerMessage)

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage

type Handler

type Handler interface {
	Initiate(ctx context.Context) error
	NewQueue(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error)
	GetQueue(name string) (QueueHandler, error)
}

func NewKafkaHandler

func NewKafkaHandler() Handler

type Instance

type Instance struct {
	Config  *config.Instance
	Utility *utility.Instance
	// contains filtered or unexported fields
}

func NewInstance

func NewInstance() *Instance

func (*Instance) GetQueue

func (this *Instance) GetQueue(name string) (QueueHandler, error)

func (*Instance) HandlerName

func (this *Instance) HandlerName() string

func (*Instance) Initiate

func (this *Instance) Initiate(ctx context.Context) (newCtx context.Context, err error)

func (*Instance) NewQueue

func (this *Instance) NewQueue(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error)

func (*Instance) OnRequestShutdown

func (this *Instance) OnRequestShutdown(c *routing.Context) error

func (*Instance) OnRequestStartup

func (this *Instance) OnRequestStartup(c *routing.Context) error

func (*Instance) OnShutdown

func (this *Instance) OnShutdown(ctx context.Context) (context.Context, error)

func (*Instance) OnStartup

func (this *Instance) OnStartup(ctx context.Context) (context.Context, error)

func (*Instance) Use

func (this *Instance) Use(ctx context.Context, handlerName string) error

type KafkaHandler

type KafkaHandler struct {
	// contains filtered or unexported fields
}

func (*KafkaHandler) GetQueue

func (this *KafkaHandler) GetQueue(name string) (QueueHandler, error)

func (*KafkaHandler) Initiate

func (this *KafkaHandler) Initiate(ctx context.Context) error

func (*KafkaHandler) NewQueue

func (this *KafkaHandler) NewQueue(ctx context.Context, name string, config map[string]interface{}) (QueueHandler, error)

type KafkaQueue

type KafkaQueue struct {
	// contains filtered or unexported fields
}

func (*KafkaQueue) GetConfig

func (this *KafkaQueue) GetConfig() map[string]interface{}

func (*KafkaQueue) NewConsumer

func (this *KafkaQueue) NewConsumer(ctx context.Context, groupName string, topic string) (*Consumer, error)

创建消费者

func (*KafkaQueue) NewProducer

func (this *KafkaQueue) NewProducer(ctx context.Context, topic string, errHandles ...func(context.Context, string, string, error)) (*Producer, error)

创建生产者

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Produce

func (p *Producer) Produce(message []byte)

生产

type QueueHandler

type QueueHandler interface {
	NewConsumer(ctx context.Context, group string, topic string) (*Consumer, error)
	NewProducer(ctx context.Context, topic string, errHandles ...func(context.Context, string, string, error)) (*Producer, error)
	GetConfig() map[string]interface{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL