opencdc

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2025 License: Apache-2.0 Imports: 11 Imported by: 146

Documentation

Index

Constants

View Source
const (
	// OpenCDCVersion is a constant that should be used as the value in the
	// metadata field MetadataVersion. It ensures the OpenCDC format version can
	// be easily identified in case the record gets marshaled into a different
	// untyped format (e.g. JSON).
	OpenCDCVersion = "v1"

	// MetadataOpenCDCVersion is a Record.Metadata key for the version of the
	// OpenCDC format (e.g. "v1"). This field exists to ensure the OpenCDC
	// format version can be easily identified in case the record gets marshaled
	// into a different untyped format (e.g. JSON).
	MetadataOpenCDCVersion = "opencdc.version"
	// MetadataCreatedAt is a Record.Metadata key for the time when the record
	// was created in the 3rd party system. The expected format is a unix
	// timestamp in nanoseconds.
	MetadataCreatedAt = "opencdc.createdAt"
	// MetadataReadAt is a Record.Metadata key for the time when the record was
	// read from the 3rd party system. The expected format is a unix timestamp
	// in nanoseconds.
	MetadataReadAt = "opencdc.readAt"
	// MetadataCollection is a Record.Metadata key for the name of the collection
	// where the record originated from and/or where it should be stored.
	MetadataCollection = "opencdc.collection"

	// MetadataKeySchemaSubject is a Record.Metadata key for the subject of the schema of
	// the record's .Key field.
	MetadataKeySchemaSubject = "opencdc.key.schema.subject"
	// MetadataKeySchemaVersion is a Record.Metadata key for the version of the schema of
	// the record's .Key field.
	MetadataKeySchemaVersion = "opencdc.key.schema.version"

	// MetadataPayloadSchemaSubject is a Record.Metadata key for the subject of the schema of
	// the record's .Payload field.
	MetadataPayloadSchemaSubject = "opencdc.payload.schema.subject"
	// MetadataPayloadSchemaVersion is a Record.Metadata key for the version of the schema of
	// the record's .Payload field.
	MetadataPayloadSchemaVersion = "opencdc.payload.schema.version"

	// MetadataFileName is a Record.Metadata key for the original file name,
	// applicable when the record originates from a file-based source.
	MetadataFileName = "opencdc.file.name"
	// MetadataFileSize is a Record.Metadata key for the total size (in bytes)
	// of the original file, if the record is derived from a file.
	MetadataFileSize = "opencdc.file.size"
	// MetadataFileHash is a Record.Metadata key for the hash (e.g., SHA-256) of
	// the complete file, used when the record originates from a file.
	MetadataFileHash = "opencdc.file.hash"
	// MetadataFileChunked is a Record.Metadata key that indicates whether
	// the record represents a chunk of a larger file (i.e., part of a chunked transfer).
	MetadataFileChunked = "opencdc.file.chunked"
	// MetadataFileChunkIndex is a Record.Metadata key for the one-based index
	// of the current chunk, if the record is part of a chunked file.
	MetadataFileChunkIndex = "opencdc.file.chunk.index"
	// MetadataFileChunkCount is a Record.Metadata key indicating the total
	// number of chunks the file was split into, when chunking is used.
	MetadataFileChunkCount = "opencdc.file.chunk.count"

	// MetadataConduitSourcePluginName is a Record.Metadata key for the name of
	// the source plugin that created this record.
	MetadataConduitSourcePluginName = "conduit.source.plugin.name"
	// MetadataConduitSourcePluginVersion is a Record.Metadata key for the
	// version of the source plugin that created this record.
	MetadataConduitSourcePluginVersion = "conduit.source.plugin.version"
	// MetadataConduitDestinationPluginName is a Record.Metadata key for the
	// name of the destination plugin that has written this record
	// (only available in records once they are written by a destination).
	MetadataConduitDestinationPluginName = "conduit.destination.plugin.name"
	// MetadataConduitDestinationPluginVersion is a Record.Metadata key for the
	// version of the destination plugin that has written this record
	// (only available in records once they are written by a destination).
	MetadataConduitDestinationPluginVersion = "conduit.destination.plugin.version"

	// MetadataConduitSourceConnectorID is a Record.Metadata key for the ID of
	// the source connector that produced this record.
	MetadataConduitSourceConnectorID = "conduit.source.connector.id"
	// MetadataConduitDLQNackError is a Record.Metadata key for the error that
	// caused a record to be nacked and pushed to the dead-letter queue.
	MetadataConduitDLQNackError = "conduit.dlq.nack.error"
	// MetadataConduitDLQNackNodeID is a Record.Metadata key for the ID of the
	// internal node that nacked the record.
	MetadataConduitDLQNackNodeID = "conduit.dlq.nack.node.id"
)

Variables

View Source
var (
	// ErrMetadataFieldNotFound is returned in metadata utility functions when a
	// metadata field is not found.
	ErrMetadataFieldNotFound = errors.New("metadata field not found")
	// ErrUnknownOperation is returned when trying to parse an Operation string
	// and encountering an unknown operation.
	ErrUnknownOperation = errors.New("unknown operation")
	// ErrInvalidProtoDataType is returned when trying to convert a proto data
	// type to raw or structured data.
	ErrInvalidProtoDataType = errors.New("invalid proto data type")
)

Functions

func WithJSONMarshalOptions

func WithJSONMarshalOptions(ctx context.Context, options *JSONMarshalOptions) context.Context

WithJSONMarshalOptions attaches JSONMarshalOptions to a context.

Types

type Change

type Change struct {
	// Before contains the data before the operation occurred. This field is
	// optional and should only be populated for operations OperationUpdate
	// OperationDelete (if the system supports fetching the data before the
	// operation).
	Before Data `json:"before"`
	// After contains the data after the operation occurred. This field should
	// be populated for all operations except OperationDelete.
	After Data `json:"after"`
}

func (*Change) FromProto

func (c *Change) FromProto(proto *opencdcv1.Change) error

FromProto takes data from the supplied proto object and populates the receiver. If the proto object is nil, the receiver is set to its zero value. If the function returns an error, the receiver could be partially populated.

func (Change) ToProto

func (c Change) ToProto(proto *opencdcv1.Change) error

ToProto takes data from the receiver and populates the supplied proto object. If the function returns an error, the proto object could be partially populated.

type Data

type Data interface {
	Bytes() []byte
	Clone() Data
	ToProto(*opencdcv1.Data) error
	// contains filtered or unexported methods
}

Data is a structure that contains some bytes. The only structs implementing Data are RawData and StructuredData.

type JSONMarshalOptions

type JSONMarshalOptions struct {
	// RawDataAsString is a flag that indicates if the RawData type should be
	// serialized as a string. If set to false, RawData will be serialized as a
	// base64 encoded string. If set to true, RawData will be serialized as a
	// string without conversion.
	RawDataAsString bool
}

JSONMarshalOptions can customize how a record is serialized to JSON. It can be attached to a context using WithJSONMarshalOptions and supplied to json.MarshalContext to customize the serialization behavior.

type JSONSerializer

type JSONSerializer JSONMarshalOptions

JSONSerializer is a RecordSerializer that serializes records to JSON using the configured options.

func (JSONSerializer) Serialize

func (s JSONSerializer) Serialize(r Record) ([]byte, error)

type Metadata

type Metadata map[string]string

func (Metadata) GetCollection added in v0.2.0

func (m Metadata) GetCollection() (string, error)

GetCollection returns the value for key MetadataCollection. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDLQNackError

func (m Metadata) GetConduitDLQNackError() (string, error)

GetConduitDLQNackError returns the value for key MetadataConduitDLQNackError. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDLQNackNodeID

func (m Metadata) GetConduitDLQNackNodeID() (string, error)

GetConduitDLQNackNodeID returns the value for key MetadataConduitDLQNackNodeID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDestinationPluginName

func (m Metadata) GetConduitDestinationPluginName() (string, error)

GetConduitDestinationPluginName returns the value for key MetadataConduitDestinationPluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDestinationPluginVersion

func (m Metadata) GetConduitDestinationPluginVersion() (string, error)

GetConduitDestinationPluginVersion returns the value for key MetadataConduitDestinationPluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourceConnectorID

func (m Metadata) GetConduitSourceConnectorID() (string, error)

GetConduitSourceConnectorID returns the value for key MetadataConduitSourceConnectorID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourcePluginName

func (m Metadata) GetConduitSourcePluginName() (string, error)

GetConduitSourcePluginName returns the value for key MetadataConduitSourcePluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourcePluginVersion

func (m Metadata) GetConduitSourcePluginVersion() (string, error)

GetConduitSourcePluginVersion returns the value for key MetadataConduitSourcePluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetCreatedAt

func (m Metadata) GetCreatedAt() (time.Time, error)

GetCreatedAt parses the value for key MetadataCreatedAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.

func (Metadata) GetFileChunkCount added in v0.5.3

func (m Metadata) GetFileChunkCount() (int, error)

GetFileChunkCount gets the metadata value for key MetadataFileChunkCount. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetFileChunkIndex added in v0.5.3

func (m Metadata) GetFileChunkIndex() (int, error)

GetFileChunkIndex gets the metadata value for key MetadataFileChunkIndex. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetFileChunked added in v0.5.3

func (m Metadata) GetFileChunked() (bool, error)

GetFileChunked gets the metadata value for key MetadataFileChunked.

func (Metadata) GetFileHash added in v0.5.3

func (m Metadata) GetFileHash() (string, error)

GetFileHash gets the metadata value for key MetadataFileHash. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetFileName added in v0.5.3

func (m Metadata) GetFileName() (string, error)

GetFileName gets the metadata value for key MetadataFileName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetFileSize added in v0.5.3

func (m Metadata) GetFileSize() (int64, error)

GetFileSize gets the metadata value for key MetadataFileSize. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetKeySchemaSubject added in v0.3.0

func (m Metadata) GetKeySchemaSubject() (string, error)

GetKeySchemaSubject returns the value for key MetadataKeySchemaSubject. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetKeySchemaVersion added in v0.3.0

func (m Metadata) GetKeySchemaVersion() (int, error)

GetKeySchemaVersion returns the value for key MetadataKeySchemaVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetOpenCDCVersion

func (m Metadata) GetOpenCDCVersion() (string, error)

GetOpenCDCVersion returns the value for key MetadataOpenCDCVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetPayloadSchemaSubject added in v0.3.0

func (m Metadata) GetPayloadSchemaSubject() (string, error)

GetPayloadSchemaSubject returns the value for key MetadataPayloadSchemaSubject. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetPayloadSchemaVersion added in v0.3.0

func (m Metadata) GetPayloadSchemaVersion() (int, error)

GetPayloadSchemaVersion returns the value for key MetadataPayloadSchemaVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetReadAt

func (m Metadata) GetReadAt() (time.Time, error)

GetReadAt parses the value for key MetadataReadAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.

func (Metadata) SetCollection added in v0.2.0

func (m Metadata) SetCollection(collection string)

SetCollection sets the metadata value for key MetadataCollection.

func (Metadata) SetConduitDLQNackError

func (m Metadata) SetConduitDLQNackError(err string)

SetConduitDLQNackError sets the metadata value for key MetadataConduitDLQNackError.

func (Metadata) SetConduitDLQNackNodeID

func (m Metadata) SetConduitDLQNackNodeID(id string)

SetConduitDLQNackNodeID sets the metadata value for key MetadataConduitDLQNackNodeID.

func (Metadata) SetConduitDestinationPluginName

func (m Metadata) SetConduitDestinationPluginName(name string)

SetConduitDestinationPluginName sets the metadata value for key MetadataConduitDestinationPluginName.

func (Metadata) SetConduitDestinationPluginVersion

func (m Metadata) SetConduitDestinationPluginVersion(version string)

SetConduitDestinationPluginVersion sets the metadata value for key MetadataConduitDestinationPluginVersion.

func (Metadata) SetConduitSourceConnectorID

func (m Metadata) SetConduitSourceConnectorID(id string)

SetConduitSourceConnectorID sets the metadata value for key MetadataConduitSourceConnectorID.

func (Metadata) SetConduitSourcePluginName

func (m Metadata) SetConduitSourcePluginName(name string)

SetConduitSourcePluginName sets the metadata value for key MetadataConduitSourcePluginName.

func (Metadata) SetConduitSourcePluginVersion

func (m Metadata) SetConduitSourcePluginVersion(version string)

SetConduitSourcePluginVersion sets the metadata value for key MetadataConduitSourcePluginVersion.

func (Metadata) SetCreatedAt

func (m Metadata) SetCreatedAt(createdAt time.Time)

SetCreatedAt sets the metadata value for key MetadataCreatedAt as a unix timestamp in nanoseconds.

func (Metadata) SetFileChunkCount added in v0.5.3

func (m Metadata) SetFileChunkCount(i int)

SetFileChunkCount sets the metadata value for key MetadataFileChunkCount.

func (Metadata) SetFileChunkIndex added in v0.5.3

func (m Metadata) SetFileChunkIndex(i int)

SetFileChunkIndex sets the metadata value for key MetadataFileChunkIndex.

func (Metadata) SetFileChunked added in v0.5.3

func (m Metadata) SetFileChunked(ok bool)

SetFileChunked sets the metadata value for key MetadataFileChunked.

func (Metadata) SetFileHash added in v0.5.3

func (m Metadata) SetFileHash(filehash string)

SetFileHash sets the metadata value for key MetadataFileHash.

func (Metadata) SetFileName added in v0.5.3

func (m Metadata) SetFileName(filename string)

SetFileName sets the metadata value for key MetadataFileName.

func (Metadata) SetFileSize added in v0.5.3

func (m Metadata) SetFileSize(filesize int64)

SetFileSize sets the metadata value for key MetadataFileSize.

func (Metadata) SetKeySchemaSubject added in v0.3.0

func (m Metadata) SetKeySchemaSubject(subject string)

SetKeySchemaSubject sets the metadata value for key MetadataKeySchemaSubject.

func (Metadata) SetKeySchemaVersion added in v0.3.0

func (m Metadata) SetKeySchemaVersion(version int)

SetKeySchemaVersion sets the metadata value for key MetadataKeySchemaVersion.

func (Metadata) SetOpenCDCVersion

func (m Metadata) SetOpenCDCVersion()

SetOpenCDCVersion sets the metadata value for key MetadataVersion to the current version of OpenCDC used.

func (Metadata) SetPayloadSchemaSubject added in v0.3.0

func (m Metadata) SetPayloadSchemaSubject(subject string)

SetPayloadSchemaSubject sets the metadata value for key MetadataPayloadSchemaSubject.

func (Metadata) SetPayloadSchemaVersion added in v0.3.0

func (m Metadata) SetPayloadSchemaVersion(version int)

SetPayloadSchemaVersion sets the metadata value for key MetadataPayloadSchemaVersion.

func (Metadata) SetReadAt

func (m Metadata) SetReadAt(createdAt time.Time)

SetReadAt sets the metadata value for key MetadataReadAt as a unix timestamp in nanoseconds.

type Operation

type Operation int

Operation defines what triggered the creation of a record.

const (
	OperationCreate   Operation = iota + 1 // create
	OperationUpdate                        // update
	OperationDelete                        // delete
	OperationSnapshot                      // snapshot
)

func (Operation) MarshalText

func (o Operation) MarshalText() ([]byte, error)

func (Operation) String

func (i Operation) String() string

func (*Operation) UnmarshalText

func (o *Operation) UnmarshalText(b []byte) error

type Position

type Position []byte

Position is a unique identifier for a record being processed. It's a Source's responsibility to choose and assign record positions, as they will be used by the Source in subsequent pipeline runs.

func (Position) String

func (p Position) String() string

String is used when displaying the position in logs.

type RawData

type RawData []byte

RawData contains unstructured data in form of a byte slice.

func (RawData) Bytes

func (d RawData) Bytes() []byte

func (RawData) Clone

func (d RawData) Clone() Data

func (RawData) MarshalJSON

func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error)

func (RawData) ToProto

func (d RawData) ToProto(proto *opencdcv1.Data) error

ToProto takes data from the receiver and populates the supplied proto object.

type Record

type Record struct {
	// Position uniquely represents the record.
	Position Position `json:"position"`
	// Operation defines what triggered the creation of a record. There are four
	// possibilities: create, update, delete or snapshot. The first three
	// operations are encountered during normal CDC operation, while "snapshot"
	// is meant to represent records during an initial load. Depending on the
	// operation, the record will contain either the payload before the change,
	// after the change, both or none (see field Payload).
	Operation Operation `json:"operation"`
	// Metadata contains additional information regarding the record.
	Metadata Metadata `json:"metadata"`

	// Key represents a value that should identify the entity (e.g. database
	// row).
	Key Data `json:"key"`
	// Payload holds the payload change (data before and after the operation
	// occurred).
	Payload Change `json:"payload"`
	// contains filtered or unexported fields
}

Record represents a single data record produced by a source and/or consumed by a destination connector. Record should be used as a value, not a pointer, except when (de)serializing the record. Note that methods related to (de)serializing the record mutate the record and are thus not thread-safe (see SetSerializer, FromProto and UnmarshalJSON).

func (Record) Bytes

func (r Record) Bytes() []byte

Bytes returns the serialized representation of the Record. By default, this function returns a JSON representation. The serialization logic can be changed using SetSerializer.

func (Record) Clone

func (r Record) Clone() Record

func (*Record) FromProto

func (r *Record) FromProto(proto *opencdcv1.Record) error

FromProto takes data from the supplied proto object and populates the receiver. If the proto object is nil, the receiver is set to its zero value. If the function returns an error, the receiver could be partially populated.

func (Record) Map

func (r Record) Map() map[string]interface{}

func (*Record) SetSerializer

func (r *Record) SetSerializer(serializer RecordSerializer)

SetSerializer sets the serializer used to encode the record into bytes. If serializer is nil, the serializing behavior is reset to the default (JSON). This method mutates the receiver and is not thread-safe.

func (Record) ToProto

func (r Record) ToProto(proto *opencdcv1.Record) error

ToProto takes data from the receiver and populates the supplied proto object. If the function returns an error, the proto object could be partially populated.

func (*Record) UnmarshalJSON

func (r *Record) UnmarshalJSON(b []byte) error

type RecordSerializer

type RecordSerializer interface {
	Serialize(Record) ([]byte, error)
}

RecordSerializer is a type that can serialize a record to bytes. It's used in destination connectors to change the output structure and format.

type StructuredData

type StructuredData map[string]interface{}

StructuredData contains data in form of a map with string keys and arbitrary values.

func (StructuredData) Bytes

func (d StructuredData) Bytes() []byte

func (StructuredData) Clone

func (d StructuredData) Clone() Data

func (StructuredData) ToProto

func (d StructuredData) ToProto(proto *opencdcv1.Data) error

ToProto takes data from the receiver and populates the supplied proto object.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL