Documentation
¶
Index ¶
- func NewErrorProcessor(log.CtxLogger) sdk.Processor
- type CloneProcessor
- type ErrorProcessor
- func (p *ErrorProcessor) Configure(ctx context.Context, cfg config.Config) error
- func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
- func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *ErrorProcessor) Specification() (sdk.Specification, error)
- type FilterProcessor
- type SplitProcessor
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CloneProcessor ¶ added in v0.14.0
type CloneProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (Simple) ¶
p := NewCloneProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Clone record into multiple records", Description: "This example takes a record and clones it once, producing 2 records, each containing the same data, except for the metadata field `clone.index`.", Config: config.Config{"count": "1"}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"foo": "bar"}, Key: opencdc.StructuredData{"id": 123}, Payload: opencdc.Change{After: opencdc.StructuredData{ "name": "Alice", "age": 30, }}, }, Want: sdk.MultiRecord{ opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{ "foo": "bar", "clone.index": "0", }, Key: opencdc.StructuredData{"id": 123}, Payload: opencdc.Change{After: opencdc.StructuredData{ "name": "Alice", "age": 30, }}, }, opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{ "foo": "bar", "clone.index": "1", }, Key: opencdc.StructuredData{"id": 123}, Payload: opencdc.Change{After: opencdc.StructuredData{ "name": "Alice", "age": 30, }}, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,17 +1,38 @@ -{ +[ + { - "position": null, + "position": null, - "operation": "create", + "operation": "create", - "metadata": { + "metadata": { - "foo": "bar" + "clone.index": "0", + "foo": "bar" + }, + "key": { + "id": 123 + }, + "payload": { + "before": null, + "after": { + "age": 30, + "name": "Alice" + } + } + }, + { + "position": null, + "operation": "create", + "metadata": { + "clone.index": "1", + "foo": "bar" - }, + }, - "key": { + "key": { - "id": 123 + "id": 123 - }, + }, - "payload": { + "payload": { - "before": null, + "before": null, - "after": { + "after": { - "age": 30, + "age": 30, - "name": "Alice" + "name": "Alice" - } + } - } + } -} + } +]
func NewCloneProcessor ¶ added in v0.14.0
func NewCloneProcessor(log.CtxLogger) *CloneProcessor
func (*CloneProcessor) Process ¶ added in v0.14.0
func (p *CloneProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*CloneProcessor) Specification ¶ added in v0.14.0
func (p *CloneProcessor) Specification() (sdk.Specification, error)
type ErrorProcessor ¶ added in v0.13.4
type ErrorProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example ¶
p := NewErrorProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Error record with custom error message`, Description: `This example shows how to configure the error processor to return a custom error message for a record using a Go template.`, Config: config.Config{ "message": "custom error message with data from record: {{.Metadata.foo}}", }, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"foo": "bar"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, Want: sdk.ErrorRecord{ Error: cerrors.New("custom error message with data from record: bar"), }, })
Output: processor returned error: custom error message with data from record: bar
func (*ErrorProcessor) MiddlewareOptions ¶ added in v0.13.4
func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
func (*ErrorProcessor) Process ¶ added in v0.13.4
func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ErrorProcessor) Specification ¶ added in v0.13.4
func (p *ErrorProcessor) Specification() (sdk.Specification, error)
type FilterProcessor ¶ added in v0.13.4
type FilterProcessor struct {
sdk.UnimplementedProcessor
}
Example ¶
p := NewFilterProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Filter out the record`, Config: config.Config{}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, Want: sdk.FilterRecord{}, })
Output: processor filtered record out
func NewFilterProcessor ¶
func NewFilterProcessor(log.CtxLogger) *FilterProcessor
func (*FilterProcessor) MiddlewareOptions ¶ added in v0.13.4
func (*FilterProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
func (*FilterProcessor) Process ¶ added in v0.13.4
func (p *FilterProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*FilterProcessor) Specification ¶ added in v0.13.4
func (p *FilterProcessor) Specification() (sdk.Specification, error)
type SplitProcessor ¶ added in v0.14.0
type SplitProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (Simple) ¶
p := NewSplitProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Split array into multiple records", Description: "This example takes the array in field `.Payload.After.users` and splits it into separate records, each containing one element.", Config: config.Config{"field": ".Payload.After.users"}, Have: opencdc.Record{ Operation: opencdc.OperationUpdate, Key: opencdc.StructuredData{"id": 123}, Payload: opencdc.Change{After: opencdc.StructuredData{ "users": []map[string]any{ {"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}, {"name": "Charlie", "age": 35}, }, }}, }, Want: sdk.MultiRecord{ opencdc.Record{ Operation: opencdc.OperationUpdate, Metadata: map[string]string{"split.index": "0"}, Key: opencdc.StructuredData{"id": 123}, Payload: opencdc.Change{After: opencdc.StructuredData{ "users": map[string]any{"name": "Alice", "age": 30}, }}, }, opencdc.Record{ Operation: opencdc.OperationUpdate, Metadata: map[string]string{"split.index": "1"}, Key: opencdc.StructuredData{"id": 123}, Payload: opencdc.Change{After: opencdc.StructuredData{ "users": map[string]any{"name": "Bob", "age": 25}, }}, }, opencdc.Record{ Operation: opencdc.OperationUpdate, Metadata: map[string]string{"split.index": "2"}, Key: opencdc.StructuredData{"id": 123}, Payload: opencdc.Change{After: opencdc.StructuredData{ "users": map[string]any{"name": "Charlie", "age": 35}, }}, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,27 +1,59 @@ -{ +[ + { - "position": null, + "position": null, - "operation": "update", + "operation": "update", - "metadata": null, + "metadata": { + "split.index": "0" + }, - "key": { + "key": { - "id": 123 + "id": 123 - }, + }, - "payload": { - "before": null, - "after": { - "users": [ - { - "age": 30, - "name": "Alice" - }, - { - "age": 25, + "payload": { + "before": null, + "after": { + "users": { + "age": 30, + "name": "Alice" + } + } + } + }, + { + "position": null, + "operation": "update", + "metadata": { + "split.index": "1" + }, + "key": { + "id": 123 + }, + "payload": { + "before": null, + "after": { + "users": { + "age": 25, + "name": "Bob" + } + } + } + }, + { + "position": null, + "operation": "update", + "metadata": { + "split.index": "2" + }, + "key": { + "id": 123 + }, + "payload": { + "before": null, - "name": "Bob" + "after": { - }, - { + "users": { "age": 35, "name": "Charlie" } - ] + } } } -} +]
func NewSplitProcessor ¶ added in v0.14.0
func NewSplitProcessor(log.CtxLogger) *SplitProcessor
func (*SplitProcessor) Process ¶ added in v0.14.0
func (p *SplitProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*SplitProcessor) Specification ¶ added in v0.14.0
func (p *SplitProcessor) Specification() (sdk.Specification, error)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.