Published: Mar 13, 2023 License: Apache-2.0



Package changestreams provides the functionality for reading the Cloud Spanner change streams.


package main

import (


func main() {
	ctx := context.Background()
	reader, err := changestreams.NewReader(ctx, "myproject", "myinstance", "mydb", "mystream")
	if err != nil {
		log.Fatalf("failed to create a reader: %v", err)
	defer reader.Close()

	if err := reader.Read(ctx, func(result *changestreams.ReadResult) error {
		for _, cr := range result.ChangeRecords {
			for _, dcr := range cr.DataChangeRecords {
				fmt.Printf("[%s] %s %s\n", dcr.CommitTimestamp, dcr.ModType, dcr.TableName)
		return nil
	}); err != nil {
		log.Fatalf("failed to read: %v", err)



type ChangeRecord

type ChangeRecord struct {
	DataChangeRecords      []*DataChangeRecord      `spanner:"data_change_record" json:"data_change_record"`
	HeartbeatRecords       []*HeartbeatRecord       `spanner:"heartbeat_record" json:"heartbeat_record"`
	ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`

ChangeRecord is the single unit of the records from the change stream.

type ChildPartition

type ChildPartition struct {
	Token                 string   `spanner:"token" json:"token"`
	ParentPartitionTokens []string `spanner:"parent_partition_tokens" json:"parent_partition_tokens"`

ChildPartition contains the child partition token.

type ChildPartitionsRecord

type ChildPartitionsRecord struct {
	StartTimestamp  time.Time         `spanner:"start_timestamp" json:"start_timestamp"`
	RecordSequence  string            `spanner:"record_sequence" json:"record_sequence"`
	ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`

ChildPartitionsRecord contains the child partitions of the stream.

type ColumnType

type ColumnType struct {
	Name            string           `spanner:"name" json:"name"`
	Type            spanner.NullJSON `spanner:"type" json:"type"`
	IsPrimaryKey    bool             `spanner:"is_primary_key" json:"is_primary_key"`
	OrdinalPosition int64            `spanner:"ordinal_position" json:"ordinal_position"`

ColumnType is the metadata of the column.

type Config

type Config struct {
	// If StartTimestamp is a zero value of time.Time, reader reads from the current timestamp.
	StartTimestamp time.Time
	// If EndTimestamp is a zero value of time.Time, reader reads until it is cancelled.
	EndTimestamp         time.Time
	HeartbeatInterval    time.Duration
	SpannerClientConfig  spanner.ClientConfig
	SpannerClientOptions []option.ClientOption

Config is the configuration for the reader.

type DataChangeRecord

type DataChangeRecord struct {
	CommitTimestamp                      time.Time     `spanner:"commit_timestamp" json:"commit_timestamp"`
	RecordSequence                       string        `spanner:"record_sequence" json:"record_sequence"`
	ServerTransactionID                  string        `spanner:"server_transaction_id" json:"server_transaction_id"`
	IsLastRecordInTransactionInPartition bool          `spanner:"is_last_record_in_transaction_in_partition" json:"is_last_record_in_transaction_in_partition"`
	TableName                            string        `spanner:"table_name" json:"table_name"`
	ColumnTypes                          []*ColumnType `spanner:"column_types" json:"column_types"`
	Mods                                 []*Mod        `spanner:"mods" json:"mods"`
	ModType                              string        `spanner:"mod_type" json:"mod_type"`
	ValueCaptureType                     string        `spanner:"value_capture_type" json:"value_capture_type"`
	NumberOfRecordsInTransaction         int64         `spanner:"number_of_records_in_transaction" json:"number_of_records_in_transaction"`
	NumberOfPartitionsInTransaction      int64         `spanner:"number_of_partitions_in_transaction" json:"number_of_partitions_in_transaction"`
	TransactionTag                       string        `spanner:"transaction_tag" json:"transaction_tag"`
	IsSystemTransaction                  bool          `spanner:"is_system_transaction" json:"is_system_transaction"`

DataChangeRecord contains a set of changes to the table.

type HeartbeatRecord

type HeartbeatRecord struct {
	Timestamp time.Time `spanner:"timestamp" json:"timestamp"`

HeartbeatRecord is the heartbeat record returned from Cloud Spanner.

type Mod

type Mod struct {
	Keys      spanner.NullJSON `spanner:"keys" json:"keys"`
	NewValues spanner.NullJSON `spanner:"new_values" json:"new_values"`
	OldValues spanner.NullJSON `spanner:"old_values" json:"old_values"`

Mod is the changes that were made on the table.

type ReadResult

type ReadResult struct {
	PartitionToken string          `json:"partition_token"`
	ChangeRecords  []*ChangeRecord `spanner:"ChangeRecord" json:"change_record"`

ReadResult is the result of the read change records from the partition.

type Reader added in v0.2.0

type Reader struct {
	// contains filtered or unexported fields

Reader is the change stream reader.

func NewReader added in v0.2.0

func NewReader(ctx context.Context, projectID, instanceID, databaseID, streamID string) (*Reader, error)

NewReader creates a new reader.

func NewReaderWithConfig added in v0.2.0

func NewReaderWithConfig(ctx context.Context, projectID, instanceID, databaseID, streamID string, config Config) (*Reader, error)

NewReaderWithConfig creates a new reader with a given configuration.

func (*Reader) Close added in v0.2.0

func (r *Reader) Close()

Close closes the reader.

func (*Reader) Read added in v0.2.0

func (r *Reader) Read(ctx context.Context, f func(result *ReadResult) error) error

Read starts reading the change stream.

If function f returns an error, Read finishes the process and returns the error. Once this method is called, reader must not be reused in any other places (i.e. not reentrant).

