kafka

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2018 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ByteDecoder

type ByteDecoder struct{}

ByteDecoder represents a byte decoder.

func (ByteDecoder) Decode

func (d ByteDecoder) Decode(b []byte) (interface{}, error)

Decode transforms byte data to the desired type.

type ByteEncoder

type ByteEncoder struct{}

ByteEncoder represents a byte encoder.

func (ByteEncoder) Encode

func (e ByteEncoder) Encode(v interface{}) ([]byte, error)

Encode transforms the typed data to bytes.

type Decoder

type Decoder interface {
	// Decode transforms byte data to the desired type.
	Decode([]byte) (interface{}, error)
}

Decoder represents a Kafka data decoder.

type Encoder

type Encoder interface {
	// Encode transforms the typed data to bytes.
	Encode(interface{}) ([]byte, error)
}

Encoder represents a Kafka data encoder.

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink represents a Kafka streams sink.

func NewSink

func NewSink(c *SinkConfig) (*Sink, error)

NewSink creates a new Kafka sink.

func (*Sink) Close

func (p *Sink) Close() error

Close closes the processor.

func (*Sink) Process

func (p *Sink) Process(msg *streams.Message) error

Process processes the stream record.

func (*Sink) WithPipe added in v0.3.0

func (p *Sink) WithPipe(pipe streams.Pipe)

WithPipe sets the pipe on the Processor.

type SinkConfig

type SinkConfig struct {
	sarama.Config

	Brokers []string
	Topic   string

	KeyEncoder   Encoder
	ValueEncoder Encoder

	BatchSize int
}

SinkConfig represents the configuration of a Sink.

func NewSinkConfig

func NewSinkConfig() *SinkConfig

NewSinkConfig creates a new SinkConfig.

func (*SinkConfig) Validate

func (c *SinkConfig) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type Source

type Source struct {
	// contains filtered or unexported fields
}

Source represents a Kafka stream source.

func NewSource

func NewSource(c *SourceConfig) (*Source, error)

NewSource creates a new Kafka stream source.

func (*Source) Close

func (s *Source) Close() error

Close closes the Source.

func (*Source) Commit

func (s *Source) Commit(v interface{}) error

Commit marks the consumed records as processed.

func (*Source) Consume

func (s *Source) Consume() (*streams.Message, error)

Consume gets the next record from the Source.

type SourceConfig

type SourceConfig struct {
	sarama.Config

	Brokers []string
	Topic   string
	GroupId string

	Ctx          context.Context
	KeyDecoder   Decoder
	ValueDecoder Decoder

	BufferSize int
}

SourceConfig represents the configuration for a Kafka stream source.

func NewSourceConfig

func NewSourceConfig() *SourceConfig

NewSourceConfig creates a new Kafka source configuration.

func (*SourceConfig) Validate

func (c *SourceConfig) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type StringDecoder

type StringDecoder struct{}

StringDecoder represents a string decoder.

func (StringDecoder) Decode

func (d StringDecoder) Decode(b []byte) (interface{}, error)

Decode transforms byte data to a string.

type StringEncoder

type StringEncoder struct{}

StringEncoder represents a string encoder.

func (StringEncoder) Encode

func (e StringEncoder) Encode(v interface{}) ([]byte, error)

Encode transforms the string data to bytes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL