Documentation
¶
Index ¶
- Constants
- func GlobalTableWithLogger(logger log.Logger) globalTableOption
- func GlobalTableWithOffset(offset GlobalTableOffset) globalTableOption
- type DefaultBuilders
- type GlobalTable
- type GlobalTableOffset
- type GlobalTableStreamConfig
- type Instances
- type InstancesOptions
- type KSink
- func (s *KSink) AddChild(node node.Node)
- func (s *KSink) AddChildBuilder(builder node.NodeBuilder)
- func (s *KSink) Build() (node.Node, error)
- func (s *KSink) ChildBuilders() []node.NodeBuilder
- func (s *KSink) Childs() []node.Node
- func (s *KSink) Close() error
- func (s *KSink) ID() int32
- func (s *KSink) Info() map[string]string
- func (s *KSink) Name() string
- func (*KSink) Next() bool
- func (s *KSink) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)
- func (s *KSink) SinkType() string
- func (*KSink) Type() node.Type
- type Option
- type Repartition
- type RepartitionOption
- type RepartitionOptions
- type RepartitionTopic
- type Side
- type SinkOption
- type SourceNode
- func (sn *SourceNode) AddChild(node node.Node)
- func (sn *SourceNode) AddChildBuilder(builder node.NodeBuilder)
- func (sn *SourceNode) Build() (node.Node, error)
- func (sn *SourceNode) ChildBuilders() []node.NodeBuilder
- func (sn *SourceNode) Childs() []node.Node
- func (sn *SourceNode) Close()
- func (sn *SourceNode) Name() string
- func (sn *SourceNode) Next() bool
- func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
- func (sn *SourceNode) Type() node.Type
- type Stream
- type StreamBuilder
- func (b *StreamBuilder) Build(streams ...Stream) error
- func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) GlobalTable
- func (b *StreamBuilder) StoreRegistry() store.Registry
- func (b *StreamBuilder) Stream(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, ...) Stream
- type StreamBuilderConfig
- type StreamConfigs
- type StreamInstance
Constants ¶
View Source
const ( LeftJoin join.JoinType = iota InnerJoin )
Variables ¶
This section is empty.
Functions ¶
func GlobalTableWithLogger ¶
func GlobalTableWithOffset ¶
func GlobalTableWithOffset(offset GlobalTableOffset) globalTableOption
Types ¶
type DefaultBuilders ¶
type DefaultBuilders struct {
Producer producer.Builder
Consumer consumer.Builder
PartitionConsumer consumer.PartitionConsumerBuilder
Store store.Builder
Backend backend.Builder
StateStore store.StateStoreBuilder
OffsetManager offsets.Manager
KafkaAdmin admin.KafkaAdmin
// contains filtered or unexported fields
}
type GlobalTable ¶
type GlobalTable interface {
Stream
}
type GlobalTableOffset ¶
type GlobalTableOffset int
const GlobalTableOffsetDefault GlobalTableOffset = 0
table will start syncing from locally stored offset or topic oldest offset
const GlobalTableOffsetLatest GlobalTableOffset = -1
table will start syncing from topic latest offset (suitable for stream topics since the topic can contains historical data )
type GlobalTableStreamConfig ¶
type Instances ¶
type Instances struct {
// contains filtered or unexported fields
}
func NewStreams ¶
func NewStreams(builder *StreamBuilder, options ...InstancesOptions) *Instances
type InstancesOptions ¶
type InstancesOptions func(config *instancesOptions)
func NotifyOnStart ¶
func NotifyOnStart(c chan bool) InstancesOptions
func WithReBalanceHandler ¶
func WithReBalanceHandler(h consumer.ReBalanceHandler) InstancesOptions
type KSink ¶
type KSink struct {
Id int32
KeyEncoder encoding.Encoder
ValEncoder encoding.Encoder
Producer producer.Producer
ProducerBuilder producer.Builder
TopicPrefix string
Repartitioned bool
KeyEncoderBuilder encoding.Builder
ValEncoderBuilder encoding.Builder
// contains filtered or unexported fields
}
func NewKSinkBuilder ¶
func (*KSink) AddChildBuilder ¶
func (s *KSink) AddChildBuilder(builder node.NodeBuilder)
func (*KSink) ChildBuilders ¶
func (s *KSink) ChildBuilders() []node.NodeBuilder
type Option ¶
type Option func(*kStreamConfig)
func WithConfig ¶
func WithConfig(configs StreamConfigs) Option
func WithLogger ¶
func WithWorkerPool ¶
func WithWorkerPool(poolConfig *task_pool.PoolConfig) Option
type Repartition ¶
type Repartition struct {
Enable bool
StreamSide Side
KeyEncoder encoding.Builder
ValueEncoder encoding.Builder
Topic RepartitionTopic
}
func (Repartition) Validate ¶
func (r Repartition) Validate(s Side) error
type RepartitionOption ¶
type RepartitionOption func(sink *RepartitionOptions)
func RepartitionLeftStream ¶
func RepartitionLeftStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption
func RepartitionRightStream ¶
func RepartitionRightStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption
type RepartitionOptions ¶
type RepartitionOptions struct {
LeftTopic topic
RightTopic topic
LeftRepartition Repartition
RightRepartition Repartition
}
func (*RepartitionOptions) Apply ¶
func (iOpts *RepartitionOptions) Apply(options ...RepartitionOption)
type RepartitionTopic ¶
type SinkOption ¶
type SinkOption func(sink *KSink)
func WithProducer ¶
func WithProducer(p producer.Builder) SinkOption
type SourceNode ¶
type SourceNode struct {
Id int32
// contains filtered or unexported fields
}
func (*SourceNode) AddChild ¶
func (sn *SourceNode) AddChild(node node.Node)
func (*SourceNode) AddChildBuilder ¶
func (sn *SourceNode) AddChildBuilder(builder node.NodeBuilder)
func (*SourceNode) ChildBuilders ¶
func (sn *SourceNode) ChildBuilders() []node.NodeBuilder
func (*SourceNode) Childs ¶
func (sn *SourceNode) Childs() []node.Node
func (*SourceNode) Close ¶
func (sn *SourceNode) Close()
func (*SourceNode) Name ¶
func (sn *SourceNode) Name() string
func (*SourceNode) Next ¶
func (sn *SourceNode) Next() bool
func (*SourceNode) Run ¶
func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)
func (*SourceNode) Type ¶
func (sn *SourceNode) Type() node.Type
type Stream ¶
type Stream interface {
Branch(branches []branch.Details, opts ...Option) []Stream
SelectKey(selectKeyFunc processors.SelectKeyFunc) Stream
TransformValue(valueTransformFunc processors.ValueTransformFunc) Stream
Transform(transformer processors.TransFunc) Stream
Filter(filter processors.FilterFunc) Stream
Process(processor processors.ProcessFunc) Stream
JoinGlobalTable(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper, typ join.JoinType) Stream
JoinKTable(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream
JoinStream(stream Stream, valMapper join.ValueMapper, opts ...RepartitionOption) Stream
//LeftJoin(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream
Through(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption) Stream
To(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption)
}
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder(config *StreamBuilderConfig) *StreamBuilder
func (*StreamBuilder) Build ¶
func (b *StreamBuilder) Build(streams ...Stream) error
func (*StreamBuilder) GlobalTable ¶
func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, store string, options ...globalTableOption) GlobalTable
func (*StreamBuilder) StoreRegistry ¶
func (b *StreamBuilder) StoreRegistry() store.Registry
type StreamBuilderConfig ¶
type StreamBuilderConfig struct {
ApplicationId string
AsyncProcessing bool
BootstrapServers []string // kafka Brokers
WorkerPool *task_pool.PoolConfig
Store struct {
BackendBuilder backend.Builder
ChangeLog struct {
MinInSycReplicas int // min number of insync replications in other nodes
ReplicationFactor int
Suffix string
Buffered bool
BufferedSize int
}
Http struct {
Host string
}
}
DLQ struct {
Enabled bool
BootstrapServers []string
TopicFormat string
//Type dlq.DqlType // G, T
Topic string // if global
}
Host string
ChangeLog struct {
Enabled bool
Replicated bool
MinInSycReplicas int // min number of insync replications in other nodes
ReplicationFactor int
Suffix string
Buffer struct {
Enabled bool
Size int
FlushInterval time.Duration
}
}
Consumer *consumer.Config
ConsumerCount int
*sarama.Config
Producer *producer.Config
MetricsReporter metrics.Reporter
Logger log.Logger
DefaultBuilders *DefaultBuilders
}
func NewStreamBuilderConfig ¶
func NewStreamBuilderConfig() *StreamBuilderConfig
func (*StreamBuilderConfig) String ¶
func (c *StreamBuilderConfig) String(b *StreamBuilder) string
type StreamConfigs ¶
type StreamConfigs map[string]interface{}
type StreamInstance ¶
type StreamInstance struct {
// contains filtered or unexported fields
}
func (*StreamInstance) Start ¶
func (s *StreamInstance) Start(wg *sync.WaitGroup) error
starts the high level consumer for all streams
func (*StreamInstance) Stop ¶
func (s *StreamInstance) Stop()
Source Files
¶
Click to show internal directories.
Click to hide internal directories.