Versions in this module Expand all Collapse all v1 v1.1.0 Oct 19, 2022 Changes in this version + type ChangeOrEmptyNotificationConsumer interface + Empty func(ctx context.Context, ackTime gocql.UUID) error + type ProgressManagerWithStartTime interface + GetApplicationReadStartTime func(ctx context.Context) (time.Time, error) + SaveApplicationReadStartTime func(ctx context.Context, startTime time.Time) error type TableBackedProgressManager + func (tbpm *TableBackedProgressManager) GetApplicationReadStartTime(ctx context.Context) (time.Time, error) + func (tbpm *TableBackedProgressManager) SaveApplicationReadStartTime(ctx context.Context, startTime time.Time) error v1.0.0 Mar 23, 2021 Changes in this version + const Insert + const PartitionDelete + const PostImage + const PreImage + const RangeDeleteEndExclusive + const RangeDeleteEndInclusive + const RangeDeleteStartExclusive + const RangeDeleteStartInclusive + const RowDelete + const Update + var ErrNoGenerationsPresent = errors.New("there are no generations present") + var ErrNoSupportedGenerationTablesPresent = errors.New("no supported generation tables are present") + type AdvancedReaderConfig struct + ChangeAgeLimit time.Duration + ConfidenceWindowSize time.Duration + PostEmptyQueryDelay time.Duration + PostFailedQueryDelay time.Duration + PostNonEmptyQueryDelay time.Duration + QueryTimeWindowSize time.Duration + type AtomicChange struct + IsDeleted bool + Value interface{} + type Change struct + Delta []*ChangeRow + PostImage []*ChangeRow + PreImage []*ChangeRow + StreamID StreamID + Time gocql.UUID + func (c *Change) GetCassandraTimestamp() int64 + type ChangeConsumer interface + Consume func(ctx context.Context, change Change) error + End func() error + type ChangeConsumerFactory interface + CreateChangeConsumer func(ctx context.Context, input CreateChangeConsumerInput) (ChangeConsumer, error) + func MakeChangeConsumerFactoryFromFunc(f ChangeConsumerFunc) ChangeConsumerFactory + type ChangeConsumerFunc func(ctx context.Context, tableName string, change Change) error + type ChangeRow struct + func (c *ChangeRow) Columns() []gocql.ColumnInfo + func (c *ChangeRow) GetAtomicChange(column string) AtomicChange + func (c *ChangeRow) GetDeletedElements(columnName string) (interface{}, bool) + func (c *ChangeRow) GetListChange(column string) ListChange + func (c *ChangeRow) GetMapChange(column string) MapChange + func (c *ChangeRow) GetOperation() OperationType + func (c *ChangeRow) GetSetChange(column string) SetChange + func (c *ChangeRow) GetTTL() int64 + func (c *ChangeRow) GetType(columnName string) (gocql.TypeInfo, bool) + func (c *ChangeRow) GetUDTChange(column string) UDTChange + func (c *ChangeRow) GetValue(columnName string) (interface{}, bool) + func (c *ChangeRow) IsDeleted(columnName string) (bool, bool) + func (c *ChangeRow) String() string + type CreateChangeConsumerInput struct + ProgressReporter *ProgressReporter + StreamID StreamID + TableName string + type ListChange struct + AppendedElements interface{} + IsReset bool + RemovedElements []gocql.UUID + type Logger interface + Printf func(format string, v ...interface{}) + type MapChange struct + AddedElements interface{} + IsReset bool + RemovedElements interface{} + type OperationType int8 + func (ot OperationType) String() string + type PeriodicProgressReporter struct + func NewPeriodicProgressReporter(logger Logger, interval time.Duration, reporter *ProgressReporter) *PeriodicProgressReporter + func (ppr *PeriodicProgressReporter) SaveAndStop(ctx context.Context) error + func (ppr *PeriodicProgressReporter) Start(ctx context.Context) + func (ppr *PeriodicProgressReporter) Stop() + func (ppr *PeriodicProgressReporter) Update(newTime gocql.UUID) + type Progress struct + LastProcessedRecordTime gocql.UUID + type ProgressManager interface + GetCurrentGeneration func(ctx context.Context) (time.Time, error) + GetProgress func(ctx context.Context, gen time.Time, table string, streamID StreamID) (Progress, error) + SaveProgress func(ctx context.Context, gen time.Time, table string, streamID StreamID, ...) error + StartGeneration func(ctx context.Context, gen time.Time) error + type ProgressReporter struct + func (pr *ProgressReporter) MarkProgress(ctx context.Context, progress Progress) error + type Reader struct + func NewReader(ctx context.Context, config *ReaderConfig) (*Reader, error) + func (r *Reader) Run(ctx context.Context) error + func (r *Reader) Stop() + func (r *Reader) StopAt(at time.Time) + type ReaderConfig struct + Advanced AdvancedReaderConfig + ChangeConsumerFactory ChangeConsumerFactory + Consistency gocql.Consistency + Logger Logger + ProgressManager ProgressManager + Session *gocql.Session + TableNames []string + func (rc *ReaderConfig) Copy() *ReaderConfig + type SetChange struct + AddedElements interface{} + IsReset bool + RemovedElements interface{} + type StreamID []byte + func (sid StreamID) String() string + type TableBackedProgressManager struct + func NewTableBackedProgressManager(session *gocql.Session, progressTableName string, applicationName string) (*TableBackedProgressManager, error) + func (tbpm *TableBackedProgressManager) GetCurrentGeneration(ctx context.Context) (time.Time, error) + func (tbpm *TableBackedProgressManager) GetProgress(ctx context.Context, gen time.Time, tableName string, streamID StreamID) (Progress, error) + func (tbpm *TableBackedProgressManager) SaveProgress(ctx context.Context, gen time.Time, tableName string, streamID StreamID, ...) error + func (tbpm *TableBackedProgressManager) SetMaxConcurrency(maxConcurrentOps int64) + func (tbpm *TableBackedProgressManager) SetTTL(ttl int32) + func (tbpm *TableBackedProgressManager) StartGeneration(ctx context.Context, gen time.Time) error + type UDTChange struct + AddedFields map[string]interface{} + IsReset bool + RemovedFields []string + RemovedFieldsIndices []int16