v0.3.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2023 License: Apache-2.0 Imports: 9 Imported by: 0



Package changestreams provides the functionality for reading the Cloud Spanner change streams.


package main

import (


func main() {
	ctx := context.Background()
	reader, err := changestreams.NewReader(ctx, "myproject", "myinstance", "mydb", "mystream")
	if err != nil {
		log.Fatalf("failed to create a reader: %v", err)
	defer reader.Close()

	if err := reader.Read(ctx, func(result *changestreams.ReadResult) error {
		for _, cr := range result.ChangeRecords {
			for _, dcr := range cr.DataChangeRecords {
				fmt.Printf("[%s] %s %s\n", dcr.CommitTimestamp, dcr.ModType, dcr.TableName)
		return nil
	}); err != nil {
		log.Fatalf("failed to read: %v", err)



This section is empty.


This section is empty.


This section is empty.


type ChangeRecord

type ChangeRecord struct {
	DataChangeRecords      []*DataChangeRecord      `spanner:"data_change_record" json:"data_change_record"`
	HeartbeatRecords       []*HeartbeatRecord       `spanner:"heartbeat_record" json:"heartbeat_record"`
	ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`

ChangeRecord is the single unit of the records from the change stream.

type ChildPartition

type ChildPartition struct {
	Token                 string   `spanner:"token" json:"token"`
	ParentPartitionTokens []string `spanner:"parent_partition_tokens" json:"parent_partition_tokens"`

ChildPartition contains the child partition token.

type ChildPartitionsRecord

type ChildPartitionsRecord struct {
	StartTimestamp  time.Time         `spanner:"start_timestamp" json:"start_timestamp"`
	RecordSequence  string            `spanner:"record_sequence" json:"record_sequence"`
	ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`

ChildPartitionsRecord contains the child partitions of the stream.

type ColumnType

type ColumnType struct {
	Name            string           `spanner:"name" json:"name"`
	Type            spanner.NullJSON `spanner:"type" json:"type"`
	IsPrimaryKey    bool             `spanner:"is_primary_key" json:"is_primary_key"`
	OrdinalPosition int64            `spanner:"ordinal_position" json:"ordinal_position"`

ColumnType is the metadata of the column.

type Config

type Config struct {
	// If StartTimestamp is a zero value of time.Time, reader reads from the current timestamp.
	StartTimestamp time.Time
	// If EndTimestamp is a zero value of time.Time, reader reads until it is cancelled.
	EndTimestamp         time.Time
	HeartbeatInterval    time.Duration
	SpannerClientConfig  spanner.ClientConfig
	SpannerClientOptions []option.ClientOption

Config is the configuration for the reader.

type DataChangeRecord

type DataChangeRecord struct {
	CommitTimestamp                      time.Time     `spanner:"commit_timestamp" json:"commit_timestamp"`
	RecordSequence                       string        `spanner:"record_sequence" json:"record_sequence"`
	ServerTransactionID                  string        `spanner:"server_transaction_id" json:"server_transaction_id"`
	IsLastRecordInTransactionInPartition bool          `spanner:"is_last_record_in_transaction_in_partition" json:"is_last_record_in_transaction_in_partition"`
	TableName                            string        `spanner:"table_name" json:"table_name"`
	ColumnTypes                          []*ColumnType `spanner:"column_types" json:"column_types"`
	Mods                                 []*Mod        `spanner:"mods" json:"mods"`
	ModType                              string        `spanner:"mod_type" json:"mod_type"`
	ValueCaptureType                     string        `spanner:"value_capture_type" json:"value_capture_type"`
	NumberOfRecordsInTransaction         int64         `spanner:"number_of_records_in_transaction" json:"number_of_records_in_transaction"`
	NumberOfPartitionsInTransaction      int64         `spanner:"number_of_partitions_in_transaction" json:"number_of_partitions_in_transaction"`
	TransactionTag                       string        `spanner:"transaction_tag" json:"transaction_tag"`
	IsSystemTransaction                  bool          `spanner:"is_system_transaction" json:"is_system_transaction"`

DataChangeRecord contains a set of changes to the table.

type HeartbeatRecord

type HeartbeatRecord struct {
	Timestamp time.Time `spanner:"timestamp" json:"timestamp"`

HeartbeatRecord is the heartbeat record returned from Cloud Spanner.

type Mod

type Mod struct {
	Keys      spanner.NullJSON `spanner:"keys" json:"keys"`
	NewValues spanner.NullJSON `spanner:"new_values" json:"new_values"`
	OldValues spanner.NullJSON `spanner:"old_values" json:"old_values"`

Mod is the changes that were made on the table.

type ReadResult

type ReadResult struct {
	PartitionToken string          `json:"partition_token"`
	ChangeRecords  []*ChangeRecord `spanner:"ChangeRecord" json:"change_record"`

ReadResult is the result of the read change records from the partition.

type Reader added in v0.2.0

type Reader struct {
	// contains filtered or unexported fields

Reader is the change stream reader.

func NewReader added in v0.2.0

func NewReader(ctx context.Context, projectID, instanceID, databaseID, streamID string) (*Reader, error)

NewReader creates a new reader.

func NewReaderWithConfig added in v0.2.0

func NewReaderWithConfig(ctx context.Context, projectID, instanceID, databaseID, streamID string, config Config) (*Reader, error)

NewReaderWithConfig creates a new reader with a given configuration.

func (*Reader) Close added in v0.2.0

func (r *Reader) Close()

Close closes the reader.

func (*Reader) Read added in v0.2.0

func (r *Reader) Read(ctx context.Context, f func(result *ReadResult) error) error

Read starts reading the change stream.

If function f returns an error, Read finishes the process and returns the error. Once this method is called, reader must not be reused in any other places (i.e. not reentrant).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL