v1.2.51 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2020 License: MIT Imports: 14 Imported by: 0




This section is empty.


This section is empty.


This section is empty.


type Buffer

type Buffer struct {
	// contains filtered or unexported fields

Buffer holds a temporary changelog Buffer

func NewBuffer

func NewBuffer(p producer.Producer, size int, flushInterval time.Duration, logger log.Logger) *Buffer

NewBuffer creates a new Buffer object

func (*Buffer) Clear

func (b *Buffer) Clear()

Clear clears the Buffer

func (*Buffer) Close

func (b *Buffer) Close()

func (*Buffer) Delete

func (b *Buffer) Delete(record *data.Record)

func (*Buffer) Records

func (b *Buffer) Records() []*data.Record

func (*Buffer) Store

func (b *Buffer) Store(record *data.Record)

Store stores the record in Buffer

type Builder

type Builder func(id string, topic string, partition int32, opts ...Options) (Changelog, error)

type Changelog

type Changelog interface {
	ReadAll(ctx context.Context) ([]*data.Record, error)
	Put(ctx context.Context, record *data.Record) error
	PutAll(ctx context.Context, record []*data.Record) error
	Delete(ctx context.Context, record *data.Record) error
	DeleteAll(ctx context.Context, record []*data.Record) error

func NewMockChangelog

func NewMockChangelog(bufferSize int) Changelog

func NewStateChangelog

func NewStateChangelog(config *StateChangelogConfig, opts ...Options) (Changelog, error)

func NewStoreChangelog

func NewStoreChangelog(applicationId string, topic string, partition int32, opts ...Options) (Changelog, error)

type Options

type Options func(config *options)

func Buffered

func Buffered(size int) Options

func FlushInterval

func FlushInterval(d time.Duration) Options

func Producer

func Producer(p producer.Producer) Options

type ReplicaManager

type ReplicaManager struct {
	// contains filtered or unexported fields

func NewReplicaManager

func NewReplicaManager(conf *ReplicaManagerConf) (*ReplicaManager, error)

func (*ReplicaManager) GetCache

func (m *ReplicaManager) GetCache(tp consumer.TopicPartition) (*cache, error)

func (*ReplicaManager) StartReplicas

func (m *ReplicaManager) StartReplicas(tps []consumer.TopicPartition) error

func (*ReplicaManager) StopReplicas

func (m *ReplicaManager) StopReplicas(tps []consumer.TopicPartition) error

type ReplicaManagerConf

type ReplicaManagerConf struct {
	Consumer      consumer.PartitionConsumerBuilder
	Backend       backend.Builder
	Logger        log.Logger
	Tps           []consumer.TopicPartition
	OffsetManager offsets.Manager

type StateChangelogConfig

type StateChangelogConfig struct {
	ChangelogId    string
	ApplicationId  string
	Producer       producer.Producer
	Topic          string
	Partition      int32
	Logger         log.Logger
	ReplicaManager *ReplicaManager
	Metrics        metrics.Reporter
	Consumer       consumer.PartitionConsumerBuilder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL