Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type PassThroughSerializer ¶
type PassThroughSerializer struct {
}
func NewPassThroughSerializer ¶
func NewPassThroughSerializer() *PassThroughSerializer
func (*PassThroughSerializer) MessageToProduceMessage ¶
func (ts *PassThroughSerializer) MessageToProduceMessage(value string) string
type StreamConfig ¶
type StreamConfig struct {
// contains filtered or unexported fields
}
func NewStreamConfig ¶
func NewStreamConfig(ms common.MessageSerializer, topic common.Topic) StreamConfig
func (StreamConfig) MessageSerializer ¶
func (ss StreamConfig) MessageSerializer() common.MessageSerializer
func (StreamConfig) Topic ¶
func (ss StreamConfig) Topic() common.Topic
type TopicStreamer ¶
type TopicStreamer struct {
// contains filtered or unexported fields
}
func NewTopicStreamer ¶
func NewTopicStreamer(brokers []string, topic common.Topic, args ...interface{}) *TopicStreamer
NewTopicStreamer creates a new topic streamer that streams messages from a topic to other topics. The streamer is configured with a list of brokers and a topic to stream from. If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments.
- ts := NewTopicStreamer(brokers, topic)
- ts := NewTopicStreamer(brokers, topic, consumerConfig, producerConfig)
- ts := NewTopicStreamer(brokers, topic, nil, producerConfig)
func (*TopicStreamer) AddConfig ¶
func (ts *TopicStreamer) AddConfig(config StreamConfig)
func (*TopicStreamer) Configs ¶ added in v0.1.1
func (ts *TopicStreamer) Configs() []StreamConfig
func (*TopicStreamer) Consumer ¶ added in v0.1.1
func (ts *TopicStreamer) Consumer() *internal.StreamConsumer
func (*TopicStreamer) Run ¶
func (ts *TopicStreamer) Run()
func (*TopicStreamer) Stop ¶
func (ts *TopicStreamer) Stop()
func (*TopicStreamer) Topic ¶ added in v0.1.1
func (ts *TopicStreamer) Topic() common.Topic
Click to show internal directories.
Click to hide internal directories.