Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DecodeProcessor ¶ added in v0.13.4
type DecodeProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example ¶
p := NewDecodeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Decode a base64 encoded string", Description: `This example decodes the base64 encoded string stored in ` + "`.Payload.After`" + `. Note that the result is a string, so if you want to further process the result (e.g. parse the string as JSON), you need to chain other processors (e.g. [` + "`json.decode`" + `](/docs/using/processors/builtin/json.decode)).`, Config: config.Config{ "field": ".Payload.After.foo", }, Have: opencdc.Record{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.RawData("test-key"), Payload: opencdc.Change{ After: opencdc.StructuredData{ "foo": "YmFy", }, }, }, Want: sdk.SingleRecord{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.RawData("test-key"), Payload: opencdc.Change{ After: opencdc.StructuredData{ "foo": "bar", }, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, "key": "test-key", "payload": { "before": null, "after": { - "foo": "YmFy" + "foo": "bar" } } }
func NewDecodeProcessor ¶
func NewDecodeProcessor(log.CtxLogger) *DecodeProcessor
func (*DecodeProcessor) Process ¶ added in v0.13.4
func (p *DecodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DecodeProcessor) Specification ¶ added in v0.13.4
func (p *DecodeProcessor) Specification() (sdk.Specification, error)
type EncodeProcessor ¶ added in v0.13.4
type EncodeProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (RawData) ¶
p := NewEncodeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Encode record key to base64", Description: `TThis example takes a record containing raw data in ` + "`.Key`" + ` and converts it into a base64 encoded string.`, Config: config.Config{ "field": ".Key", }, Have: opencdc.Record{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.RawData("test-key"), Payload: opencdc.Change{ After: opencdc.StructuredData{ "foo": "bar", }, }, }, Want: sdk.SingleRecord{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.RawData("dGVzdC1rZXk="), Payload: opencdc.Change{ After: opencdc.StructuredData{ "foo": "bar", }, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, - "key": "test-key", + "key": "dGVzdC1rZXk=", "payload": { "before": null, "after": { "foo": "bar" } } }
Example (StringField) ¶
p := NewEncodeProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Encode nested value to base64", Description: `This example takes a record containing a string in ` + "`.Payload.Before.foo`" + ` and converts it into a base64 encoded string.`, Config: config.Config{ "field": ".Payload.After.foo", }, Have: opencdc.Record{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.RawData("test-key"), Payload: opencdc.Change{ After: opencdc.StructuredData{ "foo": "bar", }, }, }, Want: sdk.SingleRecord{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.RawData("test-key"), Payload: opencdc.Change{ After: opencdc.StructuredData{ "foo": "YmFy", }, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,14 +1,14 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, "key": "test-key", "payload": { "before": null, "after": { - "foo": "bar" + "foo": "YmFy" } } }
func NewEncodeProcessor ¶
func NewEncodeProcessor(log.CtxLogger) *EncodeProcessor
func (*EncodeProcessor) Process ¶ added in v0.13.4
func (p *EncodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*EncodeProcessor) Specification ¶ added in v0.13.4
func (p *EncodeProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.