README

scylla-cdc-go

Package scyllacdc is a library that helps develop applications that react to changes from Scylla's CDC.

It is recommended to get familiar with the Scylla CDC documentation first in order to understand the concepts used in the documentation of scyllacdc: https://docs.scylladb.com/using-scylla/cdc/

Documentation

For an explanation how to use the library, please look at the godoc documenation.

This repository also includes two example programs.

Expand ▾ Collapse ▴

Documentation

Overview

Package scyllacdc is a library that helps develop applications that react to changes from Scylla's CDC.

It is recommended to get familiar with the Scylla CDC documentation first in order to understand the concepts used in the documentation of scyllacdc: https://docs.scylladb.com/using-scylla/cdc/

Overview

The library hides the complexity of reading from CDC log stemming from the need for polling for changes and handling topology changes. It reads changes from CDC logs of selected tables and propagates them to instances of ChangeConsumer - which is an interface that is meant to be implemented by the user.

Getting started

To start working with the library, you first need to implement your own logic for consuming changes. The simplest way to do it is to define a ChangeConsumerFunc which will be called for each change from the CDC log. For example:

func printerConsumer(ctx context.Context, tableName string, c scyllacdc.Change) error {
	fmt.Printf("[%s] %#v\n", tableName, c)
}

For any use case more complicated than above, you will need to define a ChangeConsumer and a ChangeConsumerFactory:

type myConsumer struct {
	id        int
	tableName string
}

func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error {
	fmt.Printf("[%d] [%s] %#v\n", mc.id, mc.tableName, change)
	return nil
}

func (mc *myConsumer) End() error {
	return nil
}

type myFactory struct {
	nextID int
}

func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (ChangeConsumer, error)
	f.nextID++
	return &myConsumer{
		id:        f.nextID-1,
		tableName: input.TableName,
	}, nil
}

Next, you need to create and run a scyllacdc.Reader object:

func main() {
	cluster := gocql.NewCluster("127.0.0.1")
	cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
	session, err := cluster.CreateSession()
	if err != nil {
		log.Fatal(err)
	}
	defer session.Close()

	cfg := &scyllacdc.ReaderConfig{
		Session:               session,
		TableNames:            []string{"my_keyspace.my_table"},
		ChangeConsumerFactory: scyllacdc.MakeChangeConsumerFactoryFromFunc(printerConsumer),
		// The above can be changed to:
		// ChangeConsumerFactory: &myFactory{},
	}

	reader, err := scyllacdc.NewReader(context.Background(), cfg)
	if err != nil {
		log.Fatal(err)
	}

	// React to Ctrl+C signal, and stop gracefully after the first signal
	// Second signal exits the process
	signalC := make(chan os.Signal)
	go func() {
		<-signalC
		reader.Stop()

		<-signalC
		os.Exit(1)
	}()
	signal.Notify(signalC, os.Interrupt)

	if err := reader.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Saving progress

The library supports saving progress and restoring from the last saved position. To enable it, you need to do two things:

First, you need to modify your consumer to regularly save progress. The consumer receives a *scyllacdc.ProgressReporter object which can be used to save progress at any point in the lifetime of the consumer.

The library itself doesn't regularly save progress - it only does it by itself when switching to the next CDC generation. Therefore, the consumer is responsible for saving the progress regularly.

Example:

type myConsumer struct {
	// PeriodicProgressReporter is a wrapper around ProgressReporter
	// which rate-limits saving the progress
	reporter *scyllacdc.PeriodicProgressReporter
}

func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error {
	// ... do work ...

	mc.reporter.Update(change.Time)
	return nil
}

func (mc *myConsumer) End() error {
	_ = mc.reporter.SaveAndStop(context.Background())
	return nil
}

type myFactory struct {
	session *gocql.Session
}

func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (ChangeConsumer, error)
	reporter := scyllacdc.NewPeriodicProgressReporter(f.session, time.Minute, input.ProgressReporter)
	reporter.Start(ctx)
	return &myConsumer{reporter: reporter}, nil
}

Then, you need to specify an appropriate ProgressManager in the configuration. ProgressManager represents a mechanism of saving and restoring progress. You can use the provided implementations (TableBackedProgressManager), or implement it yourself.

In the main function:

cfg.ProgressReporter = scyllacdc.NewTableBackedProgressManager("my_keyspace.progress_table", "my_application_name")

Processing changes

Data from the CDC log is supplied to the ChangeConsumer through Change objects, which can contain multiple ChangeRow objects. A single ChangeRow corresponds to a single, full (all columns included) row from the CDC log.

func (mc *myConsumer) Consume(ctx context.Background, change scyllacdc.Change) error {
	for _, changeRow := range change.Deltas {
		// You can access CDC columns directly via
		// GetValue, IsDeleted, GetDeletedElements
		rawValue, _ := changeRow.GetValue("col_int")
		intValue := rawValue.(*int)
		isDeleted, _ := changeRow.IsDeleted("col_int")
		if isDeleted {
			fmt.Println("Column col_int was set to null")
		} else if intValue != nil {
			fmt.Printf("Column col_int was set to %d\n", *intValue)
		}

		// You can also use convenience functions:
		// GetAtomicChange, GetListChange, GetUDTChange, etc.
		atomicChange := changeRow.GetAtomicChange("col_text")
		strValue := atomicChange.Value.(*string)
		if atomicChange.IsDeleted {
			fmt.Println("Column col_text was deleted")
		} else if strValue != nil {
			fmt.Printf("Column col_text was set to %s\n", *strValue)
		}
	}

	return nil
}

Index

Constants

View Source
const (
	PreImage                  OperationType = 0
	Update                                  = 1
	Insert                                  = 2
	RowDelete                               = 3
	PartitionDelete                         = 4
	RangeDeleteStartInclusive               = 5
	RangeDeleteStartExclusive               = 6
	RangeDeleteEndInclusive                 = 7
	RangeDeleteEndExclusive                 = 8
	PostImage                               = 9
)

Variables

View Source
var (
	ErrNoGenerationsPresent = errors.New("there are no generations present")
)

Functions

This section is empty.

Types

type AdvancedReaderConfig

type AdvancedReaderConfig struct {
	// ConfidenceWindowSize defines a minimal age a change must have in order
	// to be read.
	//
	// Due to the eventually consistent nature of Scylla, newer writes may
	// appear in CDC log earlier than some older writes. This can cause the
	// Reader to skip the older write, therefore the need for this parameter.
	//
	// If the parameter is left as 0, the library will automatically choose
	// a default confidence window size.
	ConfidenceWindowSize time.Duration

	// The library uses select statements to fetch changes from CDC Log tables.
	// Each select fetches changes from a single table and fetches only changes
	// from a limited set of CDC streams. If such select returns one or more
	// changes then next select to this table and set of CDC streams will be
	// issued after a delay. This parameter specifies the length of the delay.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the length of the delay.
	PostNonEmptyQueryDelay time.Duration

	// The library uses select statements to fetch changes from CDC Log tables.
	// Each select fetches changes from a single table and fetches only changes
	// from a limited set of CDC streams. If such select returns no changes then
	// next select to this table and set of CDC streams will be issued after
	// a delay. This parameter specifies the length of the delay.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the length of the delay.
	PostEmptyQueryDelay time.Duration

	// If the library tries to read from the CDC log and the read operation
	// fails, it will wait some time before attempting to read again. This
	// parameter specifies the length of the delay.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the length of the delay.
	PostFailedQueryDelay time.Duration

	// Changes are queried using select statements with restriction on the time
	// those changes appeared. The restriction is bounding the time from both
	// lower and upper bounds. This parameter defines the width of the time
	// window used for the restriction.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the size of the restriction window.
	QueryTimeWindowSize time.Duration

	// When the library starts for the first time it has to start consuming
	// changes from some point in time. This parameter defines how far in the
	// past it needs to look. If the value of the parameter is set to an hour,
	// then the library will only read historical changes that are no older than
	// an hour.
	//
	// Note of caution: data in CDC Log table is automatically deleted so
	// setting this parameter to something bigger than TTL used on CDC Log won’t
	// cause changes older than this TTL to appear.
	//
	// If the parameter is left as 0, the library will automatically adjust
	// the size of the restriction window.
	ChangeAgeLimit time.Duration
}

    AdvancedReaderConfig contains advanced parameters that control behavior of the CDC Reader. It is not recommended to change them unless really necessary. They have carefully selected default values that should work for most cases. Changing these parameters need to be done carefully.

    type AtomicChange

    type AtomicChange struct {
    	// Value contains the scalar value of the column.
    	// If the column was not changed or was deleted, it will be nil.
    	//
    	// Type: T.
    	Value interface{}
    
    	// IsDeleted tells if this column was set to NULL by this change.
    	IsDeleted bool
    }

      AtomicChange represents a change to a column of an atomic or a frozen type.

      type Change

      type Change struct {
      	// Corresponds to cdc$stream_id.
      	StreamID StreamID
      
      	// Corresponds to cdc$time.
      	Time gocql.UUID
      
      	// PreImage rows of the group.
      	PreImage []*ChangeRow
      
      	// Delta rows of the group.
      	Delta []*ChangeRow
      
      	// PostImage rows of the group.
      	PostImage []*ChangeRow
      }

        Change represents a group of rows from CDC log with the same cdc$stream_id and cdc$time timestamp.

        func (*Change) GetCassandraTimestamp

        func (c *Change) GetCassandraTimestamp() int64

          GetCassandraTimestamp returns a timestamp of the operation suitable to put as a TIMESTAMP parameter to a DML statement (INSERT, UPDATE, DELETE).

          type ChangeConsumer

          type ChangeConsumer interface {
          	// Processes a change from the CDC log associated with the stream of
          	// the ChangeConsumer. This method is called in a sequential manner for each
          	// row that appears in the stream.
          	//
          	// If this method returns an error, the library will stop with an error.
          	Consume(ctx context.Context, change Change) error
          
          	// Called after all rows from the stream were consumed, and the reader
          	// is about to switch to a new generation, or stop execution altogether.
          	//
          	// If this method returns an error, the library will stop with an error.
          	End() error
          }

            ChangeConsumer processes changes from a single stream of the CDC log.

            type ChangeConsumerFactory

            type ChangeConsumerFactory interface {
            	// Creates a change consumer with given parameters.
            	//
            	// If this method returns an error, the library will stop with an error.
            	CreateChangeConsumer(ctx context.Context, input CreateChangeConsumerInput) (ChangeConsumer, error)
            }

              ChangeConsumerFactory is used by the library to instantiate ChangeConsumer objects when the new generation starts.

              func MakeChangeConsumerFactoryFromFunc

              func MakeChangeConsumerFactoryFromFunc(f ChangeConsumerFunc) ChangeConsumerFactory

                MakeChangeConsumerFactoryFromFunc can be used if your processing is very simple, and don't need to keep any per-stream state or save any progress. The function supplied as an argument will be shared by all consumers created by this factory, and will be called for each change in the CDC log.

                Please note that the consumers created by this factory do not perform any synchronization on their own when calling supplied function, therefore you need to guarantee that calling `f` is thread safe.

                type ChangeConsumerFunc

                type ChangeConsumerFunc func(ctx context.Context, tableName string, change Change) error

                  ChangeConsumerFunc can be used in conjunction with MakeChangeConsumerFactoryFromFunc if your processing is very simple. For more information, see the description of the MakeChangeConsumerFactoryFromFunc function.

                  type ChangeRow

                  type ChangeRow struct {
                  	// contains filtered or unexported fields
                  }

                    ChangeRow corresponds to a single row from the CDC log.

                    The ChangeRow uses a slightly different representation of values than gocql's MapScan in order to faithfully represent nullability of all values:

                    Scalar types such as int, text etc. are represented by a pointer to their counterpart in gocql (in this case, *int and *string). The only exception is the blob, which is encoded as []byte slice - if the column was nil, then it will contain a nil slice, if the column was not nil but just empty, then the resulting slice will be empty, but not nil.

                    Tuple types are always represented as an []interface{} slice of values in this representation (e.g. tuple<int, text> will contain an *int and a *string). If the tuple itself was null, then it will be represented as a nil []interface{} slice.

                    Lists and sets are represented as slices of the corresponding type. Because lists and sets cannot contain nils, if a value was to be represented as a pointer, it will be represented as a value instead. For example, list<int> becomes []int, but list<frozen<tuple<int, text>> becomes [][]interface{} because the tuple type cannot be flattened.

                    Maps are represented as map[K]V, where K and V are in the "flattened" form as lists and sets.

                    UDTs are represented as map[string]interface{}, with values fields being represented as described here. For example, a UDT with fields (a int, b text) will be represented as a map with two values of types (*int) and (*string).

                    For a comprehensive guide on how to interpret data in the CDC log, see Scylla documentation about CDC.

                    func (*ChangeRow) Columns

                    func (c *ChangeRow) Columns() []gocql.ColumnInfo

                      Columns returns information about data columns in the cdc log table. It contains information about all columns - both with and without cdc$ prefix.

                      func (*ChangeRow) GetAtomicChange

                      func (c *ChangeRow) GetAtomicChange(column string) AtomicChange

                        GetAtomicChange returns a ScalarChange struct for a given column. Results are undefined if the column in the base table was not an atomic type.

                        func (*ChangeRow) GetDeletedElements

                        func (c *ChangeRow) GetDeletedElements(columnName string) (interface{}, bool)

                          GetDeletedElements returns which elements were deleted from the non-atomic column. This function works only for non-atomic columns

                          func (*ChangeRow) GetListChange

                          func (c *ChangeRow) GetListChange(column string) ListChange

                            GetListChange returns a ListChange struct for a given column. Results are undefined if the column in the base table was not a list.

                            func (*ChangeRow) GetMapChange

                            func (c *ChangeRow) GetMapChange(column string) MapChange

                              GetMapChange returns a MapChange struct for a given column. Results are undefined if the column in the base table was not a map.

                              func (*ChangeRow) GetOperation

                              func (c *ChangeRow) GetOperation() OperationType

                                GetOperation returns the type of operation this change represents.

                                func (*ChangeRow) GetSetChange

                                func (c *ChangeRow) GetSetChange(column string) SetChange

                                  GetSetChange returns a SetChange struct for a given column. Results are undefined if the column in the base table was not a set.

                                  func (*ChangeRow) GetTTL

                                  func (c *ChangeRow) GetTTL() int64

                                    GetTTL returns TTL for the operation, or 0 if no TTL was used.

                                    func (*ChangeRow) GetType

                                    func (c *ChangeRow) GetType(columnName string) (gocql.TypeInfo, bool)

                                      GetType returns gocql's representation of given column type.

                                      func (*ChangeRow) GetUDTChange

                                      func (c *ChangeRow) GetUDTChange(column string) UDTChange

                                        GetUDTChange returns a UDTChange struct for a given column. Results are undefined if the column in the base table was not a UDT.

                                        func (*ChangeRow) GetValue

                                        func (c *ChangeRow) GetValue(columnName string) (interface{}, bool)

                                          GetValue returns value that was assigned to this specific column.

                                          func (*ChangeRow) IsDeleted

                                          func (c *ChangeRow) IsDeleted(columnName string) (bool, bool)

                                            IsDeleted returns a boolean indicating if given column was set to null. This only works for clustering columns.

                                            func (*ChangeRow) String

                                            func (c *ChangeRow) String() string

                                              String is needed to implement the fmt.Stringer interface.

                                              type CreateChangeConsumerInput

                                              type CreateChangeConsumerInput struct {
                                              	// Name of the table from which the new ChangeConsumer will receive changes.
                                              	TableName string
                                              
                                              	// ID of the stream from which the new ChangeConsumer will receive changes.
                                              	StreamID StreamID
                                              
                                              	ProgressReporter *ProgressReporter
                                              }

                                                CreateChangeConsumerInput represents input to the CreateChangeConsumer function.

                                                type ListChange

                                                type ListChange struct {
                                                	// AppendedElements contains values appended to the list in the form
                                                	// of map from cell timestamps to values.
                                                	//
                                                	// For more information about how to interpret it, see "Advanced column"
                                                	// types" in the CDC documentation.
                                                	//
                                                	// Type: map[gocql.UUID]T
                                                	AppendedElements interface{}
                                                
                                                	// RemovedElements contains indices of the removed elements.
                                                	//
                                                	// For more information about how to interpret it, see "Advanced column"
                                                	// types" in the CDC documentation.
                                                	//
                                                	// Type: []gocql.UUID
                                                	RemovedElements []gocql.UUID
                                                
                                                	// IsReset tells if the list value was overwritten instead of being
                                                	// appended to or removed from. If it's true, than AppendedValue will
                                                	// contain the new state of the list (which can be NULL).
                                                	IsReset bool
                                                }

                                                  ListChange represents a change to a column of a type list<T>.

                                                  type Logger

                                                  type Logger interface {
                                                  	Printf(format string, v ...interface{})
                                                  }

                                                  type MapChange

                                                  type MapChange struct {
                                                  	// AddedElements contains a map of elements which were added to the map
                                                  	// by the operation.
                                                  	//
                                                  	// Type: map[K]V.
                                                  	AddedElements interface{}
                                                  
                                                  	// RemovedElements contains a slice of keys which were removed from the map
                                                  	// by the operation.
                                                  	// Please note that if the operation overwrote the old value of the map
                                                  	// instead of adding/removing elements, this field _will be nil_.
                                                  	// Instead, IsReset field will be set, and AddedValues will contain
                                                  	// the new state of the map.
                                                  	//
                                                  	// Type: []K
                                                  	RemovedElements interface{}
                                                  
                                                  	// IsReset tells if the map value was overwritten instead of being
                                                  	// appended to or removed from. If it's true, than AddedElements will
                                                  	// contain the new state of the map (which can be NULL).
                                                  	IsReset bool
                                                  }

                                                    MapChange represents a change to a column of type map<K, V>.

                                                    type OperationType

                                                    type OperationType int8

                                                      OperationType corresponds to the cdc$operation column in CDC log, and describes the type of the operation given row represents.

                                                      For a comprehensive explanation of what each operation type means, see Scylla documentation about CDC.

                                                      func (OperationType) String

                                                      func (ot OperationType) String() string

                                                        String is needed to implement the fmt.Stringer interface.

                                                        type PeriodicProgressReporter

                                                        type PeriodicProgressReporter struct {
                                                        	// contains filtered or unexported fields
                                                        }

                                                          PeriodicProgressReporter is a wrapper around ProgressReporter which can be used to save progress in regular periods of time.

                                                          func NewPeriodicProgressReporter

                                                          func NewPeriodicProgressReporter(logger Logger, interval time.Duration, reporter *ProgressReporter) *PeriodicProgressReporter

                                                            NewPeriodicProgressReporter creates a new PeriodicProgressReporter with given report interval.

                                                            func (*PeriodicProgressReporter) SaveAndStop

                                                            func (ppr *PeriodicProgressReporter) SaveAndStop(ctx context.Context) error

                                                              SaveAndStop stops inner goroutine, waits until it finishes, and then saves the most recent progress.

                                                              func (*PeriodicProgressReporter) Start

                                                              func (ppr *PeriodicProgressReporter) Start(ctx context.Context)

                                                                Start spawns an internal goroutine and starts the progress reporting loop.

                                                                func (*PeriodicProgressReporter) Stop

                                                                func (ppr *PeriodicProgressReporter) Stop()

                                                                  Stop stops inner goroutine and waits until it finishes.

                                                                  func (*PeriodicProgressReporter) Update

                                                                  func (ppr *PeriodicProgressReporter) Update(newTime gocql.UUID)

                                                                    Update tells the PeriodicProgressReporter that a row has been processed.

                                                                    type Progress

                                                                    type Progress struct {
                                                                    	// LastProcessedRecordTime represents the value of the cdc$time column
                                                                    	// of the last processed record in the stream.
                                                                    	LastProcessedRecordTime gocql.UUID
                                                                    }

                                                                      Progress represents the point up to which the library has processed changes in a given stream.

                                                                      type ProgressManager

                                                                      type ProgressManager interface {
                                                                      	// GetCurrentGeneration returns the time of the generation that was
                                                                      	// last saved by StartGeneration. The library will call this function
                                                                      	// at the beginning in order to determine from which generation it should
                                                                      	// start reading first.
                                                                      	//
                                                                      	// If there is no information available about the time of the generation
                                                                      	// from which reading should start, GetCurrentGeneration can return
                                                                      	// a zero time value. In that case, reading will start from the point
                                                                      	// determined by AdvancedReaderConfig.ChangeAgeLimit.
                                                                      	//
                                                                      	// If this function returns an error, the library will stop with an error.
                                                                      	GetCurrentGeneration(ctx context.Context) (time.Time, error)
                                                                      
                                                                      	// StartGeneration is called after all changes have been read from the
                                                                      	// previous generation and the library is about to start processing
                                                                      	// the next one. The ProgressManager should save this information so that
                                                                      	// GetCurrentGeneration will return it after the library is restarted.
                                                                      	//
                                                                      	// If this function returns an error, the library will stop with an error.
                                                                      	StartGeneration(ctx context.Context, gen time.Time) error
                                                                      
                                                                      	// GetProgress retrieves information about the progress of given stream,
                                                                      	// in a given table. If there was no progress saved for this stream
                                                                      	// during this generation, GetProgress can return a zero time value
                                                                      	// and the library will start processing changes from the stream
                                                                      	// starting from the beginning of the generation.
                                                                      	//
                                                                      	// This method needs to be thread-safe, as the library is allowed to
                                                                      	// call it concurrently for different combinations of `table` and `streamID`.
                                                                      	// The library won't issue concurrent calls to this method with the same
                                                                      	// `table` and `streamID` parameters.
                                                                      	//
                                                                      	// If this function returns an error, the library will stop with an error.
                                                                      	GetProgress(ctx context.Context, gen time.Time, table string, streamID StreamID) (Progress, error)
                                                                      
                                                                      	// SaveProgress stores information about the last cdc log record which was
                                                                      	// processed successfully. If the reader is restarted, it should resume
                                                                      	// work for this stream starting from the row _after_ the last saved
                                                                      	// timestamp.
                                                                      	//
                                                                      	// This method is only called by ChangeConsumers, indirectly through
                                                                      	// the ProgressReporter struct. Within a generation, ChangeConsumers
                                                                      	// are run concurrently, therefore SaveProgress should be safe to call
                                                                      	// concurrently.
                                                                      	//
                                                                      	// Contrary to other methods, an error returned does not immediately
                                                                      	// result in the library stopping with an error. The error is propagated
                                                                      	// to the ChangeConsumer, and it can decide what to do with the error next.
                                                                      	SaveProgress(ctx context.Context, gen time.Time, table string, streamID StreamID, progress Progress) error
                                                                      }

                                                                        ProgressManager allows the library to load and save progress for each stream and table separately.

                                                                        type ProgressReporter

                                                                        type ProgressReporter struct {
                                                                        	// contains filtered or unexported fields
                                                                        }

                                                                          ProgressReporter is a helper object for the ChangeConsumer. It allows the consumer to save its progress.

                                                                          func (*ProgressReporter) MarkProgress

                                                                          func (pr *ProgressReporter) MarkProgress(ctx context.Context, progress Progress) error

                                                                            MarkProgress saves progress for the consumer associated with the ProgressReporter.

                                                                            The associated ChangeConsumer is allowed to call it anytime between its creation by ChangeConsumerFactory and the moment it is stopped (the call to (ChangeConsumer).End() finishes).

                                                                            type Reader

                                                                            type Reader struct {
                                                                            	// contains filtered or unexported fields
                                                                            }

                                                                              Reader reads changes from CDC logs of the specified tables.

                                                                              func NewReader

                                                                              func NewReader(ctx context.Context, config *ReaderConfig) (*Reader, error)

                                                                                NewReader creates a new CDC reader using the specified configuration.

                                                                                func (*Reader) Run

                                                                                func (r *Reader) Run(ctx context.Context) error

                                                                                  Run runs the CDC reader. This call is blocking and returns after an error occurs, or the reader is stopped gracefully.

                                                                                  func (*Reader) Stop

                                                                                  func (r *Reader) Stop()

                                                                                    Stop tells the reader to stop as soon as possible. There is no guarantee related to how much data will be processed in each stream when the reader stops. If you want to e.g. make sure that all cdc log data with timestamps up to the current moment was processed, use (*Reader).StopAt(time.Now()). This function does not wait until the reader stops.

                                                                                    func (*Reader) StopAt

                                                                                    func (r *Reader) StopAt(at time.Time)

                                                                                      StopAt tells the reader to stop reading changes after reaching given timestamp. Does not guarantee that the reader won't read any changes after the timestamp, but the reader will stop after all tables and streams are advanced to or past the timestamp. This function does not wait until the reader stops.

                                                                                      type ReaderConfig

                                                                                      type ReaderConfig struct {
                                                                                      	// An active gocql session to the cluster.
                                                                                      	Session *gocql.Session
                                                                                      
                                                                                      	// Names of the tables for which to read changes. This should be the name
                                                                                      	// of the base table, not the cdc log table.
                                                                                      	// Can be prefixed with keyspace name.
                                                                                      	TableNames []string
                                                                                      
                                                                                      	// Consistency to use when querying CDC log.
                                                                                      	// If not specified, QUORUM consistency will be used.
                                                                                      	Consistency gocql.Consistency
                                                                                      
                                                                                      	// Creates ChangeProcessors, which process information fetched from the CDC log.
                                                                                      	// A callback which processes information fetched from the CDC log.
                                                                                      	ChangeConsumerFactory ChangeConsumerFactory
                                                                                      
                                                                                      	// An object which allows the reader to read and write information about
                                                                                      	// current progress.
                                                                                      	ProgressManager ProgressManager
                                                                                      
                                                                                      	// A logger. If set, it will receive log messages useful for debugging of the library.
                                                                                      	Logger Logger
                                                                                      
                                                                                      	// Advanced parameters.
                                                                                      	Advanced AdvancedReaderConfig
                                                                                      }

                                                                                        ReaderConfig defines parameters used for creation of the CDC Reader object.

                                                                                        func (*ReaderConfig) Copy

                                                                                        func (rc *ReaderConfig) Copy() *ReaderConfig

                                                                                          Copy makes a shallow copy of the ReaderConfig.

                                                                                          type SetChange

                                                                                          type SetChange struct {
                                                                                          	// AddedElements contains a slice of values which were added to the set
                                                                                          	// by the operation. If there were any values added, it will contain
                                                                                          	// a slice of form []T, where T is gocql's representation of the element
                                                                                          	// type.
                                                                                          	//
                                                                                          	// Type: []T
                                                                                          	AddedElements interface{}
                                                                                          
                                                                                          	// RemovedElements contains a slice of values which were removed from the set
                                                                                          	// by the operation. Like AddedValues, it's either a slice or a nil
                                                                                          	// interface.
                                                                                          	//
                                                                                          	// Please note that if the operation overwrote the old value of the set
                                                                                          	// instead of adding/removing elements, this field _will be nil_.
                                                                                          	// Instead, IsReset field will be set, and AddedValues will contain
                                                                                          	// the new state of the set.
                                                                                          	//
                                                                                          	// Type: []T
                                                                                          	RemovedElements interface{}
                                                                                          
                                                                                          	// IsReset tells if the set value was overwritten instead of being
                                                                                          	// appended to or removed from. If it's true, than AddedElements will
                                                                                          	// contain the new state of the set (which can be NULL).
                                                                                          	IsReset bool
                                                                                          }

                                                                                            SetChange represents a change to a column of type set<T>.

                                                                                            type StreamID

                                                                                            type StreamID []byte

                                                                                              StreamID represents an ID of a stream from a CDC log (cdc$time column).

                                                                                              func (StreamID) String

                                                                                              func (sid StreamID) String() string

                                                                                                String is needed to implement the fmt.Stringer interface.

                                                                                                type TableBackedProgressManager

                                                                                                type TableBackedProgressManager struct {
                                                                                                	// contains filtered or unexported fields
                                                                                                }

                                                                                                  TableBackedProgressManager is a ProgressManager which saves progress in a Scylla table.

                                                                                                  The schema is as follows:

                                                                                                  CREATE TABLE IF NOT EXISTS <table name> (
                                                                                                      generation timestamp,
                                                                                                      application_name text,
                                                                                                      table_name text,
                                                                                                      stream_id blob,
                                                                                                      last_timestamp timeuuid,
                                                                                                      current_generation timestamp,
                                                                                                      PRIMARY KEY ((generation, application_name, table_name, stream_id))
                                                                                                  )
                                                                                                  

                                                                                                  Progress for each stream is stored in a separate row, indexed by generation, application_name, table_name and stream_id.

                                                                                                  For storing information about current generation, special rows with stream set to empty bytes is used.

                                                                                                  func NewTableBackedProgressManager

                                                                                                  func NewTableBackedProgressManager(session *gocql.Session, progressTableName string, applicationName string) (*TableBackedProgressManager, error)

                                                                                                    NewTableBackedProgressManager creates a new TableBackedProgressManager.

                                                                                                    func (*TableBackedProgressManager) GetCurrentGeneration

                                                                                                    func (tbpm *TableBackedProgressManager) GetCurrentGeneration(ctx context.Context) (time.Time, error)

                                                                                                      GetCurrentGeneration is needed to implement the ProgressManager interface.

                                                                                                      func (*TableBackedProgressManager) GetProgress

                                                                                                      func (tbpm *TableBackedProgressManager) GetProgress(ctx context.Context, gen time.Time, tableName string, streamID StreamID) (Progress, error)

                                                                                                        GetProgress is needed to implement the ProgressManager interface.

                                                                                                        func (*TableBackedProgressManager) SaveProgress

                                                                                                        func (tbpm *TableBackedProgressManager) SaveProgress(ctx context.Context, gen time.Time, tableName string, streamID StreamID, progress Progress) error

                                                                                                          SaveProgress is needed to implement the ProgressManager interface.

                                                                                                          func (*TableBackedProgressManager) SetMaxConcurrency

                                                                                                          func (tbpm *TableBackedProgressManager) SetMaxConcurrency(maxConcurrentOps int64)

                                                                                                            SetMaxConcurrency sets the maximum allowed concurrency for write operations. By default, it's 100. This function must not be called after Reader for this manager is started.

                                                                                                            func (*TableBackedProgressManager) SetTTL

                                                                                                            func (tbpm *TableBackedProgressManager) SetTTL(ttl int32)

                                                                                                              SetTTL sets the TTL used to expire progress. By default, it's 7 days.

                                                                                                              func (*TableBackedProgressManager) StartGeneration

                                                                                                              func (tbpm *TableBackedProgressManager) StartGeneration(ctx context.Context, gen time.Time) error

                                                                                                                StartGeneration is needed to implement the ProgressManager interface.

                                                                                                                type UDTChange

                                                                                                                type UDTChange struct {
                                                                                                                	// AddedFields contains a map of fields. Non-null value of a field
                                                                                                                	// indicate that the field was written to, otherwise it was not written.
                                                                                                                	AddedFields map[string]interface{}
                                                                                                                
                                                                                                                	// RemovedFields contains names of fields which were set to null
                                                                                                                	// by this operation.
                                                                                                                	RemovedFields []string
                                                                                                                
                                                                                                                	// RemovedFieldsIndices contains indices of tields which were set to null
                                                                                                                	// by this operation.
                                                                                                                	RemovedFieldsIndices []int16
                                                                                                                
                                                                                                                	// IsReset tells if the UDT was overwritten instead of only some fields
                                                                                                                	// being overwritten. If this flag is true, then nil fields in AddedFields
                                                                                                                	// will mean that those fields should be set to null.
                                                                                                                	IsReset bool
                                                                                                                }

                                                                                                                  UDTChange represents a change to a column of a UDT type.

                                                                                                                  Directories

                                                                                                                  Path Synopsis
                                                                                                                  examples