Documentation
¶
Overview ¶
Package changestreams provides the functionality for reading the Cloud Spanner change streams.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/cloudspannerecosystem/spanner-change-streams-tail/changestreams"
)
func main() {
ctx := context.Background()
reader, err := changestreams.NewReader(ctx, "myproject", "myinstance", "mydb", "mystream")
if err != nil {
log.Fatalf("failed to create a reader: %v", err)
}
defer reader.Close()
if err := reader.Read(ctx, func(result *changestreams.ReadResult) error {
for _, cr := range result.ChangeRecords {
for _, dcr := range cr.DataChangeRecords {
fmt.Printf("[%s] %s %s\n", dcr.CommitTimestamp, dcr.ModType, dcr.TableName)
}
}
return nil
}); err != nil {
log.Fatalf("failed to read: %v", err)
}
}
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChangeRecord ¶
type ChangeRecord struct {
DataChangeRecords []*DataChangeRecord `spanner:"data_change_record" json:"data_change_record"`
HeartbeatRecords []*HeartbeatRecord `spanner:"heartbeat_record" json:"heartbeat_record"`
ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"`
}
ChangeRecord is the single unit of the records from the change stream.
type ChildPartition ¶
type ChildPartition struct {
Token string `spanner:"token" json:"token"`
ParentPartitionTokens []string `spanner:"parent_partition_tokens" json:"parent_partition_tokens"`
}
ChildPartition contains the child partition token.
type ChildPartitionsRecord ¶
type ChildPartitionsRecord struct {
StartTimestamp time.Time `spanner:"start_timestamp" json:"start_timestamp"`
RecordSequence string `spanner:"record_sequence" json:"record_sequence"`
ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"`
}
ChildPartitionsRecord contains the child partitions of the stream.
type ColumnType ¶
type ColumnType struct {
Name string `spanner:"name" json:"name"`
Type spanner.NullJSON `spanner:"type" json:"type"`
IsPrimaryKey bool `spanner:"is_primary_key" json:"is_primary_key"`
OrdinalPosition int64 `spanner:"ordinal_position" json:"ordinal_position"`
}
ColumnType is the metadata of the column.
type Config ¶
type Config struct {
// If StartTimestamp is a zero value of time.Time, reader reads from the current timestamp.
StartTimestamp time.Time
// If EndTimestamp is a zero value of time.Time, reader reads until it is cancelled.
EndTimestamp time.Time
HeartbeatInterval time.Duration
SpannerClientConfig spanner.ClientConfig
SpannerClientOptions []option.ClientOption
}
Config is the configuration for the reader.
type DataChangeRecord ¶
type DataChangeRecord struct {
CommitTimestamp time.Time `spanner:"commit_timestamp" json:"commit_timestamp"`
RecordSequence string `spanner:"record_sequence" json:"record_sequence"`
ServerTransactionID string `spanner:"server_transaction_id" json:"server_transaction_id"`
IsLastRecordInTransactionInPartition bool `spanner:"is_last_record_in_transaction_in_partition" json:"is_last_record_in_transaction_in_partition"`
TableName string `spanner:"table_name" json:"table_name"`
ColumnTypes []*ColumnType `spanner:"column_types" json:"column_types"`
Mods []*Mod `spanner:"mods" json:"mods"`
ModType string `spanner:"mod_type" json:"mod_type"`
ValueCaptureType string `spanner:"value_capture_type" json:"value_capture_type"`
NumberOfRecordsInTransaction int64 `spanner:"number_of_records_in_transaction" json:"number_of_records_in_transaction"`
NumberOfPartitionsInTransaction int64 `spanner:"number_of_partitions_in_transaction" json:"number_of_partitions_in_transaction"`
TransactionTag string `spanner:"transaction_tag" json:"transaction_tag"`
IsSystemTransaction bool `spanner:"is_system_transaction" json:"is_system_transaction"`
}
DataChangeRecord contains a set of changes to the table.
type HeartbeatRecord ¶
HeartbeatRecord is the heartbeat record returned from Cloud Spanner.
type Mod ¶
type Mod struct {
Keys spanner.NullJSON `spanner:"keys" json:"keys"`
NewValues spanner.NullJSON `spanner:"new_values" json:"new_values"`
OldValues spanner.NullJSON `spanner:"old_values" json:"old_values"`
}
Mod is the changes that were made on the table.
type ReadResult ¶
type ReadResult struct {
PartitionToken string `json:"partition_token"`
ChangeRecords []*ChangeRecord `spanner:"ChangeRecord" json:"change_record"`
}
ReadResult is the result of the read change records from the partition.
type Reader ¶ added in v0.2.0
type Reader struct {
// contains filtered or unexported fields
}
Reader is the change stream reader.
func NewReader ¶ added in v0.2.0
func NewReader(ctx context.Context, projectID, instanceID, databaseID, streamID string) (*Reader, error)
NewReader creates a new reader.
func NewReaderWithConfig ¶ added in v0.2.0
func NewReaderWithConfig(ctx context.Context, projectID, instanceID, databaseID, streamID string, config Config) (*Reader, error)
NewReaderWithConfig creates a new reader with a given configuration.