kafka

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2021 License: MIT Imports: 8 Imported by: 1

README

Kafka dialect

A high-level Apache Kafka consumer/producer dialect. This dialect allowes to consume/produce from Apache Kafka in your projects using commander.

Connection string

A connection string is required to connect to the Kafka cluster. The string is build out of key's and value's in a linux like flag syntax: "key=value". An connection string could consist out of the following flags:

Key Required Default Description
brokers true `` Should contain the ip addresses of the brokers in the Kafka cluster
group false `` The Kafka consumer group used to consume messages, when defined is a new consumer group set-up and is the latest marked offset stored. When no group is defined/given is a partition consumer created
version true `` The Kafka version of the cluster
initial-offset false newest The initial offset used when setting up a partition consumer. The initial offset could be one of the following values: (int)0.../"newest"/"oldest"
Example
brokers=192.168.2.1,192.168.2.2 group=example version=2.1.1
brokers=192.168.2.1,192.168.2.2 initial-offset=oldest version=2.1.1

Getting started

Once you have the your connectionstring defined are you able to initialize the Kafka dialect.

connectionstring := "..."
dialect := kafka.NewDialect(connectionstring)

Documentation

Index

Constants

View Source
const (
	OffsetNewest = "newest"
	OffsetOldest = "oldest"
)

Initial offset key values

View Source
const (
	BrokersKey           = "brokers"
	GroupKey             = "group"
	VersionKey           = "version"
	InitialOffsetKey     = "initial-offset"
	ConnectionTimeoutKey = "connection-timeout"
)

These const's contain the connection string keys to different values

Variables

View Source
var (
	DefaultConnectionTimeout = 5 * time.Second
)

Default config value's

Functions

func ValidateConnectionKeyVal

func ValidateConnectionKeyVal(values ConnectionMap) error

ValidateConnectionKeyVal validates if all required valyues are set in the given connectionmap

Types

type Config

type Config struct {
	Brokers           []string
	Group             string
	Version           sarama.KafkaVersion
	InitialOffset     int64
	ConnectionTimeout time.Duration
}

Config contains all the plausible configuration options

func NewConfig

func NewConfig(values ConnectionMap) (Config, error)

NewConfig constructs a Config from the given connection map

type ConnectionMap

type ConnectionMap map[string]string

ConnectionMap contains the connectionstring as a key/value map

func ParseConnectionstring

func ParseConnectionstring(connectionstring string) ConnectionMap

ParseConnectionstring parses the given connectionstring and returns a map with all key/values

type Dialect

type Dialect struct {
	Connection Config
	Config     *sarama.Config
	// contains filtered or unexported fields
}

Dialect represents the kafka dialect

func NewDialect

func NewDialect(connectionstring string) (*Dialect, error)

NewDialect initializes and constructs a new Kafka dialect

func (*Dialect) Close

func (dialect *Dialect) Close() error

Close closes the Kafka consumers and producers

func (*Dialect) Consumer

func (dialect *Dialect) Consumer() types.Consumer

Consumer returns the dialect as consumer

func (*Dialect) Healthy

func (dialect *Dialect) Healthy() bool

Healthy returns a boolean that reprisents if the dialect is healthy

func (*Dialect) Open

func (dialect *Dialect) Open(topics []types.Topic) (err error)

Open opens a kafka consumer and producer

func (*Dialect) Producer

func (dialect *Dialect) Producer() types.Producer

Producer returns the dialect as producer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL