consumerserver

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const PackageName = "component.ekafka.consumerserver"

PackageName is the name of this component.

Variables

View Source
var ErrRecoverableError error = errors.New("recoverable error is retryable")

Functions

func DefaultConfig

func DefaultConfig() *config

DefaultConfig returns a default config.

Types

type Component

type Component struct {
	ServerCtx context.Context
	// contains filtered or unexported fields
}

Component starts an Ego server for message consuming.

func NewConsumerServerComponent

func NewConsumerServerComponent(name string, config *config, ekafkaComponent *ekafka.Component, logger *elog.Component) *Component

NewConsumerServerComponent creates a new server instance.

func (*Component) Consumer added in v0.2.0

func (cmp *Component) Consumer() *ekafka.Consumer

Consumer returns the default Consumer.

func (*Component) ConsumerGroup added in v0.2.0

func (cmp *Component) ConsumerGroup() *ekafka.ConsumerGroup

ConsumerGroup returns the default ConsumerGroup.

func (*Component) GracefulStop

func (cmp *Component) GracefulStop(ctx context.Context) error

GracefulStop stops the server.

func (*Component) Info

func (cmp *Component) Info() *server.ServiceInfo

Info returns server info, used by governor and consumer balancer.

func (*Component) Init

func (cmp *Component) Init() error

Init ...

func (*Component) Name

func (cmp *Component) Name() string

Name returns the name of this instance.

func (*Component) OnConsumerGroupStart added in v0.2.0

func (cmp *Component) OnConsumerGroupStart(handler OnConsumerGroupStartHandler) error

OnConsumerGroupStart ...

func (*Component) OnEachMessage added in v0.1.1

func (cmp *Component) OnEachMessage(consumptionErrors chan<- error, handler OnEachMessageHandler) error

OnEachMessage ...

func (*Component) OnStart added in v0.1.1

func (cmp *Component) OnStart(handler OnStartHandler) error

OnStart ...

func (*Component) PackageName

func (cmp *Component) PackageName() string

PackageName returns the package name.

func (*Component) Start

func (cmp *Component) Start() error

Start will start consuming.

func (*Component) Stop

func (cmp *Component) Stop() error

Stop stops the server.

type Container

type Container struct {
	// contains filtered or unexported fields
}

func DefaultContainer

func DefaultContainer() *Container

DefaultContainer 返回默认Container

func Load

func Load(key string) *Container

Load 载入配置,初始化Container

func (*Container) Build

func (c *Container) Build(options ...Option) *Component

Build 构建Container

type OnConsumerGroupStartHandler added in v0.2.0

type OnConsumerGroupStartHandler = func(ctx context.Context, consumerGroup *ekafka.ConsumerGroup) error

OnConsumerGroupStartHandler ...

type OnEachMessageHandler added in v0.1.1

type OnEachMessageHandler = func(ctx context.Context, message kafka.Message) error

OnEachMessageHandler ...

type OnStartHandler added in v0.1.1

type OnStartHandler = func(ctx context.Context, consumer *ekafka.Consumer) error

OnStartHandler ...

type Option

type Option func(c *Container)

func WithDebug

func WithDebug(debug bool) Option

WithDebug enables debug mode.

func WithEkafka

func WithEkafka(ekafkaComponent *ekafka.Component) Option

WithEkafka ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL