Documentation ¶
Index ¶
- func NewChangeLogger(producer kafka.Producer, tp kafka.TopicPartition) topology.ChangeLogger
- func NewChangelogBuilder(store stores.StoreBuilder, opts ...ChangelogBuilderOption) topology.ChangelogSyncerBuilder
- func NewStoreBuilder(name string, keyEncoder encoding.Encoder, valEncoder encoding.Encoder, ...) topology.LoggableStoreBuilder
- type ChangelogBuilderOption
- func ChangelogWithSourceTopic(topic string) ChangelogBuilderOption
- func ChangelogWithTopicConfigs(config map[string]string) ChangelogBuilderOption
- func ChangelogWithTopicReplicaCount(count int16) ChangelogBuilderOption
- func ChangelogWithTopicTopicNameFormatter(fn ChangelogTopicFormatter) ChangelogBuilderOption
- type ChangelogLoggerBuilder
- type ChangelogStatus
- type ChangelogTopicFormatter
- type OffsetStore
- type StateStore
- func (str *StateStore) Delete(ctx context.Context, key interface{}) error
- func (str *StateStore) Flush() error
- func (str *StateStore) Get(ctx context.Context, key interface{}) (interface{}, error)
- func (str *StateStore) Iterator(ctx context.Context) (stores.Iterator, error)
- func (str *StateStore) PrefixedIterator(ctx context.Context, keyPrefix interface{}, prefixEncoder encoding.Encoder) (stores.Iterator, error)
- func (str *StateStore) ResetCache()
- func (str *StateStore) Set(ctx context.Context, key, value interface{}, expiry time.Duration) error
- type StoreBuilderOption
- func ChangelogSyncEnabled() StoreBuilderOption
- func LoggingDisabled() StoreBuilderOption
- func StoreBuilderWithKeyEncoder(encoder encoding.Encoder) StoreBuilderOption
- func StoreBuilderWithStoreOption(options ...stores.Option) StoreBuilderOption
- func StoreBuilderWithValEncoder(encoder encoding.Encoder) StoreBuilderOption
- func UseStoreBuilder(nativeBuilder stores.StoreBuilder) StoreBuilderOption
- func WithChangelogOptions(options ...ChangelogBuilderOption) StoreBuilderOption
- func WithChangelogSyncDisabled() StoreBuilderOption
- func WithNameFunc(fn topology.StateStoreNameFunc) StoreBuilderOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewChangeLogger ¶
func NewChangeLogger(producer kafka.Producer, tp kafka.TopicPartition) topology.ChangeLogger
func NewChangelogBuilder ¶
func NewChangelogBuilder(store stores.StoreBuilder, opts ...ChangelogBuilderOption) topology.ChangelogSyncerBuilder
func NewStoreBuilder ¶
func NewStoreBuilder(name string, keyEncoder encoding.Encoder, valEncoder encoding.Encoder, options ...StoreBuilderOption) topology.LoggableStoreBuilder
Types ¶
type ChangelogBuilderOption ¶
type ChangelogBuilderOption func(store *changelogBuilder)
func ChangelogWithSourceTopic ¶
func ChangelogWithSourceTopic(topic string) ChangelogBuilderOption
func ChangelogWithTopicConfigs ¶
func ChangelogWithTopicConfigs(config map[string]string) ChangelogBuilderOption
func ChangelogWithTopicReplicaCount ¶
func ChangelogWithTopicReplicaCount(count int16) ChangelogBuilderOption
func ChangelogWithTopicTopicNameFormatter ¶
func ChangelogWithTopicTopicNameFormatter(fn ChangelogTopicFormatter) ChangelogBuilderOption
type ChangelogLoggerBuilder ¶
type ChangelogLoggerBuilder interface {
Build(ctx topology.SubTopologyContext, store string) (topology.ChangeLogger, error)
}
type ChangelogStatus ¶
type ChangelogStatus string
type ChangelogTopicFormatter ¶
type ChangelogTopicFormatter func(storeName string) func(ctx topology.BuilderContext) string
type OffsetStore ¶
type StateStore ¶
type StateStore struct { stores.Store topology.ChangelogSyncer // contains filtered or unexported fields }
func (*StateStore) Delete ¶
func (str *StateStore) Delete(ctx context.Context, key interface{}) error
func (*StateStore) Flush ¶
func (str *StateStore) Flush() error
func (*StateStore) Get ¶
func (str *StateStore) Get(ctx context.Context, key interface{}) (interface{}, error)
func (*StateStore) PrefixedIterator ¶
func (*StateStore) ResetCache ¶
func (str *StateStore) ResetCache()
type StoreBuilderOption ¶
type StoreBuilderOption func(builder *stateStoreBuilder)
func ChangelogSyncEnabled ¶
func ChangelogSyncEnabled() StoreBuilderOption
func LoggingDisabled ¶
func LoggingDisabled() StoreBuilderOption
func StoreBuilderWithKeyEncoder ¶
func StoreBuilderWithKeyEncoder(encoder encoding.Encoder) StoreBuilderOption
func StoreBuilderWithStoreOption ¶
func StoreBuilderWithStoreOption(options ...stores.Option) StoreBuilderOption
func StoreBuilderWithValEncoder ¶
func StoreBuilderWithValEncoder(encoder encoding.Encoder) StoreBuilderOption
func UseStoreBuilder ¶
func UseStoreBuilder(nativeBuilder stores.StoreBuilder) StoreBuilderOption
func WithChangelogOptions ¶
func WithChangelogOptions(options ...ChangelogBuilderOption) StoreBuilderOption
func WithChangelogSyncDisabled ¶
func WithChangelogSyncDisabled() StoreBuilderOption
func WithNameFunc ¶
func WithNameFunc(fn topology.StateStoreNameFunc) StoreBuilderOption
Click to show internal directories.
Click to hide internal directories.