kstream

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2021 License: MIT Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LeftJoin join.Type = iota
	InnerJoin
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BuilderOption

type BuilderOption func(*DefaultBuilders)

func WithBackendBuilder

func WithBackendBuilder(builder backend.Builder) BuilderOption

func WithChangelogBuilder

func WithChangelogBuilder(builder changelog.Builder) BuilderOption

func WithConsumerBuilder

func WithConsumerBuilder(builder consumer.Builder) BuilderOption

func WithKafkaAdmin

func WithKafkaAdmin(kafkaAdmin admin.KafkaAdmin) BuilderOption

func WithOffsetManager

func WithOffsetManager(offsetManager offsets.Manager) BuilderOption

func WithPartitionConsumerBuilder

func WithPartitionConsumerBuilder(builder consumer.PartitionConsumerBuilder) BuilderOption

func WithProducerBuilder

func WithProducerBuilder(builder producer.Builder) BuilderOption

func WithStateStoreBuilder

func WithStateStoreBuilder(builder store.StateStoreBuilder) BuilderOption

func WithStoreBuilder

func WithStoreBuilder(builder store.Builder) BuilderOption

type DefaultBuilders

type DefaultBuilders struct {
	Producer producer.Builder

	Consumer          consumer.Builder
	PartitionConsumer consumer.PartitionConsumerBuilder
	Store             store.Builder
	IndexedStore      store.IndexedStoreBuilder
	Backend           backend.Builder
	StateStore        store.StateStoreBuilder
	OffsetManager     offsets.Manager
	KafkaAdmin        admin.KafkaAdmin
	// contains filtered or unexported fields
}

type GlobalTable

type GlobalTable interface {
	Stream
}

type GlobalTableOffset

type GlobalTableOffset int64

Starting offset for the global table partition.

const GlobalTableOffsetDefault GlobalTableOffset = 0

GlobalTableOffsetDefault defines the starting offset for the GlobalTable when GlobalTable stream syncing started.

const GlobalTableOffsetLatest GlobalTableOffset = -1

GlobalTableOffsetLatest defines the beginning of the partition. Suitable for topics with retention policy delete since the topic can contains historical data.

type GlobalTableOption

type GlobalTableOption func(options *globalTableOptions)

func GlobalTableWithBackendWriter

func GlobalTableWithBackendWriter(writer StoreWriter) GlobalTableOption

GlobalTableWithBackendWriter overrides the persisting behavior of the GlobalTable. eg :

func(r *data.Record, store store.Store) error {
	// tombstone handling
	if r.Value == nil {
		if err := store.Backend().Delete(r.Key); err != nil {
			return err
		}
	}

	return store.Backend().Set(r.Key, r.Value, 0)
}

func GlobalTableWithLogger

func GlobalTableWithLogger(logger log.Logger) GlobalTableOption

GlobalTableWithLogger overrides the default logger for the GlobalTable (default is NoopLogger).

func GlobalTableWithOffset

func GlobalTableWithOffset(offset GlobalTableOffset) GlobalTableOption

GlobalTableWithOffset overrides the default starting offset when GlobalTable syncing started.

func GlobalTableWithVersionComparator added in v1.3.0

func GlobalTableWithVersionComparator(comparator RecordVersionComparator) GlobalTableOption

GlobalTableWithVersionComparator adds the version extractor for the GlobalTable from past records (default is nil).

func GlobalTableWithVersionExtractor added in v1.3.0

func GlobalTableWithVersionExtractor(extractor RecordVersionExtractor) GlobalTableOption

GlobalTableWithVersionExtractor adds the version extractor for the GlobalTable from past records (default is nil).

type GlobalTableStreamConfig

type GlobalTableStreamConfig struct {
	ConsumerBuilder consumer.PartitionConsumerBuilder
	BackendBuilder  backend.Builder
	OffsetManager   offsets.Manager
	KafkaAdmin      admin.KafkaAdmin
	Metrics         metrics.Reporter
	Logger          log.Logger
}

type Instances

type Instances struct {
	// contains filtered or unexported fields
}

func NewStreams

func NewStreams(builder *StreamBuilder, options ...InstancesOptions) *Instances

func (*Instances) Start

func (ins *Instances) Start() (err error)

func (*Instances) Stop

func (ins *Instances) Stop()

Stop stops all the running Streams Instances and then GlobalTables

type InstancesOptions

type InstancesOptions func(config *instancesOptions)

func NotifyOnStart

func NotifyOnStart(c chan bool) InstancesOptions

func WithConsumerOptions added in v1.2.0

func WithConsumerOptions(opt consumer.Option) InstancesOptions

type KSink

type KSink struct {
	Id              int32
	KeyEncoder      encoding.Encoder
	ValEncoder      encoding.Encoder
	Producer        producer.Producer
	ProducerBuilder producer.Builder

	TopicPrefix string

	Repartitioned bool

	KeyEncoderBuilder encoding.Builder
	ValEncoderBuilder encoding.Builder
	// contains filtered or unexported fields
}

func NewKSinkBuilder

func NewKSinkBuilder(name string, id int32, topic topic, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption) *KSink

func (*KSink) AddChild

func (s *KSink) AddChild(node topology.Node)

func (*KSink) AddChildBuilder

func (s *KSink) AddChildBuilder(builder topology.NodeBuilder)

func (*KSink) Build

func (s *KSink) Build() (topology.Node, error)

func (*KSink) ChildBuilders

func (s *KSink) ChildBuilders() []topology.NodeBuilder

func (*KSink) Childs

func (s *KSink) Childs() []topology.Node

func (*KSink) Close

func (s *KSink) Close() error

func (*KSink) ID

func (s *KSink) ID() int32

func (*KSink) Info

func (s *KSink) Info() map[string]string

func (*KSink) Name

func (s *KSink) Name() string

func (*KSink) Next

func (*KSink) Next() bool

func (*KSink) Run

func (s *KSink) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*KSink) SinkType

func (s *KSink) SinkType() string

func (*KSink) Type

func (*KSink) Type() topology.Type

type Option

type Option func(*kStreamOptions)

func WithConfig

func WithConfig(configs StreamConfigs) Option

func WithLogger

func WithLogger(logger log.Logger) Option

func WithWorkerPoolOptions

func WithWorkerPoolOptions(poolConfig *worker_pool.PoolConfig) Option

type RecordVersionComparator added in v1.3.0

type RecordVersionComparator func(newVersion, currentVersion int64) bool

type RecordVersionExtractor added in v1.3.0

type RecordVersionExtractor func(ctx context.Context, key, value interface{}) (int64, error)

type SinkOption

type SinkOption func(sink *KSink)

func SinkWithProducer added in v1.2.0

func SinkWithProducer(p producer.Builder) SinkOption

func SinkWithRecordHeaderExtractor added in v1.2.0

func SinkWithRecordHeaderExtractor(f func(ctx context.Context, in SinkRecord) (headers data.RecordHeaders, err error)) SinkOption

func SinkWithTombstoneFilter added in v1.2.0

func SinkWithTombstoneFilter(f func(ctx context.Context, in SinkRecord) (tombstone bool)) SinkOption

func WithCustomRecord deprecated

func WithCustomRecord(f func(ctx context.Context, in SinkRecord) (out SinkRecord, err error)) SinkOption

Deprecated: Please use SinkWithRecordHeaderExtractor instead

func WithProducer deprecated

func WithProducer(p producer.Builder) SinkOption

Deprecated: Please use SinkWithProducer instead

type SinkRecord

type SinkRecord struct {
	Key, Value interface{}
	Timestamp  time.Time          // only set if kafka is version 0.10+, inner message timestamp
	Headers    data.RecordHeaders // only set if kafka is version 0.11+
}

type SourceNode

type SourceNode struct {
	Id int32
	// contains filtered or unexported fields
}

func (*SourceNode) AddChild

func (sn *SourceNode) AddChild(node topology.Node)

func (*SourceNode) AddChildBuilder

func (sn *SourceNode) AddChildBuilder(builder topology.NodeBuilder)

func (*SourceNode) Build

func (sn *SourceNode) Build() (topology.Node, error)

func (*SourceNode) ChildBuilders

func (sn *SourceNode) ChildBuilders() []topology.NodeBuilder

func (*SourceNode) Childs

func (sn *SourceNode) Childs() []topology.Node

func (*SourceNode) Close

func (sn *SourceNode) Close()

func (*SourceNode) Name

func (sn *SourceNode) Name() string

func (*SourceNode) Next

func (sn *SourceNode) Next() bool

func (*SourceNode) Run

func (sn *SourceNode) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*SourceNode) Type

func (sn *SourceNode) Type() topology.Type

type StoreWriter

type StoreWriter func(r *data.Record, store store.Store) error

type Stream

type Stream interface {
	Branch(branches []branch.Details, opts ...Option) []Stream
	SelectKey(selectKeyFunc processors.SelectKeyFunc) Stream
	TransformValue(valueTransformFunc processors.ValueTransformFunc) Stream
	Transform(transformer processors.TransFunc) Stream
	Filter(filter processors.FilterFunc) Stream
	Process(processor processors.ProcessFunc) Stream
	JoinGlobalTable(table Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper, typ join.Type) Stream
	JoinKTable(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream
	JoinStream(stream Stream, valMapper join.ValueMapper, opts ...join.RepartitionOption) Stream
	//LeftJoin(stream Stream, keyMapper join.KeyMapper, valMapper join.ValueMapper) Stream
	Through(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption) Stream
	Materialize(topic, storeName string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...processors.MaterializeOption) Stream
	To(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...SinkOption)
}

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder(config *StreamBuilderConfig, options ...BuilderOption) *StreamBuilder

func (*StreamBuilder) Build

func (b *StreamBuilder) Build(streams ...Stream) error

func (*StreamBuilder) GlobalTable

func (b *StreamBuilder) GlobalTable(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, store string, options ...GlobalTableOption) GlobalTable

func (*StreamBuilder) StoreRegistry

func (b *StreamBuilder) StoreRegistry() store.Registry

func (*StreamBuilder) Stream

func (b *StreamBuilder) Stream(topic string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...Option) Stream

type StreamBuilderConfig

type StreamBuilderConfig struct {
	ApplicationId    string
	AsyncProcessing  bool
	BootstrapServers []string // kafka Brokers
	WorkerPool       *worker_pool.PoolConfig
	Store            struct {
		BackendBuilder backend.Builder
		ChangeLog      struct {
			MinInSycReplicas  int // min number of insync replications in other nodes
			ReplicationFactor int
			Suffix            string
			Buffered          bool
			BufferedSize      int
		}
		Http struct {
			Enabled bool
			Host    string
		}
	}
	DLQ struct {
		Enabled          bool
		BootstrapServers []string
		TopicFormat      string
		//Type             dlq.DqlType // G, T
		Topic string // if global
	}
	Host      string
	ChangeLog struct {
		Enabled           bool
		Replicated        bool
		MinInSycReplicas  int // min number of insync replications in other nodes
		ReplicationFactor int
		Suffix            string
		Buffer            struct {
			Enabled       bool
			Size          int
			FlushInterval time.Duration
		}
	}
	Consumer      *consumer.Config
	ConsumerCount int
	*sarama.Config
	Producer         *producer.Config
	KafkaLogsEnabled bool
	MetricsReporter  metrics.Reporter
	Logger           log.Logger
	DefaultBuilders  *DefaultBuilders
}

func NewStreamBuilderConfig

func NewStreamBuilderConfig() *StreamBuilderConfig

func (*StreamBuilderConfig) String

type StreamConfigs

type StreamConfigs map[string]interface{}

type StreamInstance

type StreamInstance struct {
	// contains filtered or unexported fields
}

func (*StreamInstance) Start

func (s *StreamInstance) Start(wg *sync.WaitGroup) error

starts the high level consumer for all streams

func (*StreamInstance) Stop

func (s *StreamInstance) Stop()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL