kafka

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2024 License: MIT Imports: 8 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ElementKey = micro.ElementKey("KafkaComponent")

ElementKey is ElementKey for kafka

View Source
var ErrCodeTimedOut = kafka.ErrTimedOut

ErrCodeTimedOut - kafka timeout error

Functions

func SetDefaultConfig

func SetDefaultConfig()

SetDefaultConfig -

Types

type Component

type Component struct {
	micro.EmptyComponent
	// contains filtered or unexported fields
}

Component is Component for kafka

func (*Component) Init

func (c *Component) Init(server *micro.Server) error

Init the component

func (*Component) Name

func (c *Component) Name() string

Name of the component

func (*Component) PostStop

func (c *Component) PostStop(ctx context.Context) error

PostStop called after Stop()

func (*Component) PreInit

func (c *Component) PreInit(ctx context.Context) error

PreInit called before Init()

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start the component

type Config

type Config struct {
	Enable           bool   `toml:"enable"`
	BootStrapServers string `toml:"bootstrapserveres"` // BootStrapServers Broker
	ClientID         string `toml:"clientid"`          // GroupID for producer
	MessageMaxBytes  int    `toml:"messagemaxbytes"`   // MessageMaxBytes
	Topic            string `toml:"topic"`
	GroupID          string `toml:"groupid"`
}

Config struct

func GetConfig

func GetConfig() *Config

GetConfig -

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka -

func New

func New(config *Config, logger logging.ILogger) *Kafka

New -

func (*Kafka) Close

func (k *Kafka) Close()

Close -

func (*Kafka) ConsumerData

func (k *Kafka) ConsumerData(t time.Duration) (*kafka.Message, kafka.ErrorCode, error)

ConsumerData -

func (*Kafka) CreateConsumer

func (k *Kafka) CreateConsumer() error

CreateConsumer -

func (*Kafka) CreateProducerKeepalived

func (k *Kafka) CreateProducerKeepalived(ctx context.Context)

CreateProducerKeepalived -

func (*Kafka) ProduceData

func (k *Kafka) ProduceData(topic string, key, value []byte)

ProduceData -

func (*Kafka) ProduceDataSimple

func (k *Kafka) ProduceDataSimple(value []byte)

ProduceDataSimple -

func (*Kafka) ProduceDataWithTimeKey

func (k *Kafka) ProduceDataWithTimeKey(topic string, value []byte)

ProduceDataWithTimeKey -

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL