Versions in this module Expand all Collapse all v6 v6.0.2 Mar 30, 2021 Changes in this version + type ByteDecoder struct + func (d ByteDecoder) Decode(b []byte) (interface{}, error) + type ByteEncoder struct + func (e ByteEncoder) Encode(v interface{}) ([]byte, error) + type Decoder interface + Decode func([]byte) (interface{}, error) + type DecoderFunc func(value []byte) (interface{}, error) + func (f DecoderFunc) Decode(value []byte) (interface{}, error) + var NilDecoder DecoderFunc = func([]byte) (interface{}, error) { ... } + type Encoder interface + Encode func(interface{}) ([]byte, error) + type EncoderFunc func(interface{}) ([]byte, error) + func (f EncoderFunc) Encode(value interface{}) ([]byte, error) + type Metadata []*PartitionOffset + func (m Metadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata + func (m Metadata) WithOrigin(o streams.MetadataOrigin) + type PartitionOffset struct + Offset int64 + Origin streams.MetadataOrigin + Partition int32 + Topic string + type Sink struct + func NewSink(c *SinkConfig) (*Sink, error) + func (p *Sink) Close() error + func (p *Sink) Commit(ctx context.Context) error + func (p *Sink) Process(msg streams.Message) error + func (p *Sink) WithPipe(pipe streams.Pipe) + type SinkConfig struct + BatchSize int + Brokers []string + KeyEncoder Encoder + Topic string + ValueEncoder Encoder + func NewSinkConfig() *SinkConfig + func (c *SinkConfig) Validate() error + type Source struct + func NewSource(c *SourceConfig) (*Source, error) + func (s *Source) Cleanup(_ sarama.ConsumerGroupSession) error + func (s *Source) Close() error + func (s *Source) Commit(v interface{}) error + func (s *Source) Consume() (streams.Message, error) + func (s *Source) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error + func (s *Source) Setup(session sarama.ConsumerGroupSession) error + type SourceConfig struct + Brokers []string + BufferSize int + Ctx context.Context + ErrorsBufferSize int + GroupID string + KeyDecoder Decoder + Topic string + ValueDecoder Decoder + func NewSourceConfig() *SourceConfig + func (c *SourceConfig) Validate() error + type StringDecoder struct + func (d StringDecoder) Decode(b []byte) (interface{}, error) + type StringEncoder struct + func (e StringEncoder) Encode(v interface{}) ([]byte, error) Other modules containing this package github.com/rafalmnich/streams