Documentation
¶
Overview ¶
Package avro is a generated GoMock package.
Package avro is a generated GoMock package.
Index ¶
- type DecodeProcessor
- func (p *DecodeProcessor) Configure(ctx context.Context, c config.Config) error
- func (p *DecodeProcessor) Open(context.Context) error
- func (p *DecodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *DecodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)
- func (p *DecodeProcessor) Specification() (sdk.Specification, error)
- func (p *DecodeProcessor) Teardown(ctx context.Context) error
- type EncodeProcessor
- func (p *EncodeProcessor) Configure(ctx context.Context, c config.Config) error
- func (p *EncodeProcessor) Open(context.Context) error
- func (p *EncodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *EncodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)
- func (p *EncodeProcessor) Specification() (sdk.Specification, error)
- func (p *EncodeProcessor) Teardown(context.Context) error
- type MockDecoder
- type MockDecoderDecodeCall
- func (c *MockDecoderDecodeCall) Do(f func(context.Context, opencdc.RawData) (opencdc.StructuredData, error)) *MockDecoderDecodeCall
- func (c *MockDecoderDecodeCall) DoAndReturn(f func(context.Context, opencdc.RawData) (opencdc.StructuredData, error)) *MockDecoderDecodeCall
- func (c *MockDecoderDecodeCall) Return(arg0 opencdc.StructuredData, arg1 error) *MockDecoderDecodeCall
- type MockDecoderMockRecorder
- type MockEncoder
- type MockEncoderEncodeCall
- func (c *MockEncoderEncodeCall) Do(f func(context.Context, opencdc.StructuredData) (opencdc.RawData, error)) *MockEncoderEncodeCall
- func (c *MockEncoderEncodeCall) DoAndReturn(f func(context.Context, opencdc.StructuredData) (opencdc.RawData, error)) *MockEncoderEncodeCall
- func (c *MockEncoderEncodeCall) Return(arg0 opencdc.RawData, arg1 error) *MockEncoderEncodeCall
- type MockEncoderMockRecorder
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DecodeProcessor ¶ added in v0.13.4
type DecodeProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example ¶
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleDecodeProcessor", 54322) defer cleanup() client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url)) if err != nil { panic(fmt.Sprintf("failed to create schema registry client: %v", err)) } _, err = client.CreateSchema(context.Background(), "example-decode", sr.Schema{ Type: sr.TypeAvro, Schema: ` { "type":"record", "name":"record", "fields":[ {"name":"myString","type":"string"}, {"name":"myInt","type":"int"} ] }`, }) if err != nil { panic(fmt.Sprintf("failed to create schema: %v", err)) } p := NewDecodeProcessor(log.Nop()) p.SetSchemaRegistry(client) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Decode a record field in Avro format", Description: `This example shows the usage of the ` + "`avro.decode`" + ` processor. The processor decodes the record's` + "`.Key`" + ` field using the schema that is downloaded from the schema registry and needs to exist under the subject` + "`example-decode`" + `. In this example we use the following schema: ` + "```json" + ` { "type":"record", "name":"record", "fields":[ {"name":"myString","type":"string"}, {"name":"myInt","type":"int"} ] } ` + "```", Config: config.Config{ "field": ".Key", }, Have: opencdc.Record{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.RawData([]byte{0, 0, 0, 0, 1, 6, 98, 97, 114, 2}), }, Want: sdk.SingleRecord{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.StructuredData{ "myString": "bar", "myInt": 1, }, }, })
Output: processor transformed record: --- before +++ after @@ -1,12 +1,15 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, - "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002", + "key": { + "myInt": 1, + "myString": "bar" + }, "payload": { "before": null, "after": null } }
func NewDecodeProcessor ¶
func NewDecodeProcessor(logger log.CtxLogger) *DecodeProcessor
func (*DecodeProcessor) Open ¶ added in v0.13.4
func (p *DecodeProcessor) Open(context.Context) error
func (*DecodeProcessor) Process ¶ added in v0.13.4
func (p *DecodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*DecodeProcessor) SetSchemaRegistry ¶ added in v0.13.4
func (p *DecodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)
func (*DecodeProcessor) Specification ¶ added in v0.13.4
func (p *DecodeProcessor) Specification() (sdk.Specification, error)
type EncodeProcessor ¶ added in v0.13.4
type EncodeProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example (AutoRegister) ¶
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleEncodeProcessor_autoRegister", 54322) defer cleanup() client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url)) if err != nil { panic(fmt.Sprintf("failed to create schema registry client: %v", err)) } p := NewEncodeProcessor(log.Nop()) p.SetSchemaRegistry(client) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Auto-register schema", Description: `This example shows the usage of the ` + "`avro.encode`" + ` processor with the ` + "`autoRegister`" + ` schema strategy. The processor encodes the record's ` + "`.Payload.After`" + ` field using the schema that is extracted from the data and registered on the fly under the subject ` + "`example-autoRegister`" + `.`, Config: config.Config{ "schema.strategy": "autoRegister", "schema.autoRegister.subject": "example-autoRegister", }, Have: opencdc.Record{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Payload: opencdc.Change{ After: opencdc.StructuredData{ "myString": "bar", "myInt": 1, "myFloat": 2.3, "myMap": map[string]any{ "foo": true, "bar": 2.2, }, "myStruct": opencdc.StructuredData{ "foo": 1, "bar": false, }, }, }, }, Want: sdk.SingleRecord{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Payload: opencdc.Change{ After: opencdc.RawData([]byte{0, 0, 0, 0, 1, 102, 102, 102, 102, 102, 102, 2, 64, 2, 154, 153, 153, 153, 153, 153, 1, 64, 1, 6, 98, 97, 114, 0, 2}), }, }, })
Output: processor transformed record: --- before +++ after @@ -1,24 +1,12 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, "key": null, "payload": { "before": null, - "after": { - "myFloat": 2.3, - "myInt": 1, - "myMap": { - "bar": 2.2, - "foo": true - }, - "myString": "bar", - "myStruct": { - "bar": false, - "foo": 1 - } - } + "after": "\u0000\u0000\u0000\u0000\u0001ffffff\u0002@\u0002\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd\u0001@\u0001\u0006bar\u0000\u0002" } }
Example (PreRegistered) ¶
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleEncodeProcessor_preRegistered", 54322) defer cleanup() client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url)) if err != nil { panic(fmt.Sprintf("failed to create schema registry client: %v", err)) } _, err = client.CreateSchema(context.Background(), "example-preRegistered", sr.Schema{ Type: sr.TypeAvro, Schema: ` { "type":"record", "name":"record", "fields":[ {"name":"myString","type":"string"}, {"name":"myInt","type":"int"} ] }`, }) if err != nil { panic(fmt.Sprintf("failed to create schema: %v", err)) } p := NewEncodeProcessor(log.Nop()) p.SetSchemaRegistry(client) exampleutil.RunExample(p, exampleutil.Example{ Summary: "Pre-register schema", Description: `This example shows the usage of the ` + "`avro.encode`" + ` processor with the ` + "`preRegistered`" + ` schema strategy. When using this strategy, the schema has to be manually pre-registered. In this example we use the following schema: ` + "```json" + ` { "type":"record", "name":"record", "fields":[ {"name":"myString","type":"string"}, {"name":"myInt","type":"int"} ] } ` + "```" + ` The processor encodes the record's` + "`.Key`" + ` field using the above schema.`, Config: config.Config{ "schema.strategy": "preRegistered", "schema.preRegistered.subject": "example-preRegistered", "schema.preRegistered.version": "1", "field": ".Key", }, Have: opencdc.Record{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.StructuredData{ "myString": "bar", "myInt": 1, }, }, Want: sdk.SingleRecord{ Position: opencdc.Position("test-position"), Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Key: opencdc.RawData([]byte{0, 0, 0, 0, 1, 6, 98, 97, 114, 2}), }, })
Output: processor transformed record: --- before +++ after @@ -1,15 +1,12 @@ { "position": "dGVzdC1wb3NpdGlvbg==", "operation": "create", "metadata": { "key1": "val1" }, - "key": { - "myInt": 1, - "myString": "bar" - }, + "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002", "payload": { "before": null, "after": null } }
func NewEncodeProcessor ¶
func NewEncodeProcessor(logger log.CtxLogger) *EncodeProcessor
func (*EncodeProcessor) Open ¶ added in v0.13.4
func (p *EncodeProcessor) Open(context.Context) error
func (*EncodeProcessor) Process ¶ added in v0.13.4
func (p *EncodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*EncodeProcessor) SetSchemaRegistry ¶ added in v0.13.4
func (p *EncodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)
func (*EncodeProcessor) Specification ¶ added in v0.13.4
func (p *EncodeProcessor) Specification() (sdk.Specification, error)
type MockDecoder ¶
type MockDecoder struct {
// contains filtered or unexported fields
}
MockDecoder is a mock of decoder interface.
func NewMockDecoder ¶
func NewMockDecoder(ctrl *gomock.Controller) *MockDecoder
NewMockDecoder creates a new mock instance.
func (*MockDecoder) Decode ¶
func (m *MockDecoder) Decode(ctx context.Context, b opencdc.RawData) (opencdc.StructuredData, error)
Decode mocks base method.
func (*MockDecoder) EXPECT ¶
func (m *MockDecoder) EXPECT() *MockDecoderMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockDecoderDecodeCall ¶ added in v0.11.0
MockDecoderDecodeCall wrap *gomock.Call
func (*MockDecoderDecodeCall) Do ¶ added in v0.11.0
func (c *MockDecoderDecodeCall) Do(f func(context.Context, opencdc.RawData) (opencdc.StructuredData, error)) *MockDecoderDecodeCall
Do rewrite *gomock.Call.Do
func (*MockDecoderDecodeCall) DoAndReturn ¶ added in v0.11.0
func (c *MockDecoderDecodeCall) DoAndReturn(f func(context.Context, opencdc.RawData) (opencdc.StructuredData, error)) *MockDecoderDecodeCall
DoAndReturn rewrite *gomock.Call.DoAndReturn
func (*MockDecoderDecodeCall) Return ¶ added in v0.11.0
func (c *MockDecoderDecodeCall) Return(arg0 opencdc.StructuredData, arg1 error) *MockDecoderDecodeCall
Return rewrite *gomock.Call.Return
type MockDecoderMockRecorder ¶
type MockDecoderMockRecorder struct {
// contains filtered or unexported fields
}
MockDecoderMockRecorder is the mock recorder for MockDecoder.
func (*MockDecoderMockRecorder) Decode ¶
func (mr *MockDecoderMockRecorder) Decode(ctx, b any) *MockDecoderDecodeCall
Decode indicates an expected call of Decode.
type MockEncoder ¶
type MockEncoder struct {
// contains filtered or unexported fields
}
MockEncoder is a mock of encoder interface.
func NewMockEncoder ¶
func NewMockEncoder(ctrl *gomock.Controller) *MockEncoder
NewMockEncoder creates a new mock instance.
func (*MockEncoder) EXPECT ¶
func (m *MockEncoder) EXPECT() *MockEncoderMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEncoder) Encode ¶
func (m *MockEncoder) Encode(ctx context.Context, sd opencdc.StructuredData) (opencdc.RawData, error)
Encode mocks base method.
type MockEncoderEncodeCall ¶ added in v0.11.0
MockEncoderEncodeCall wrap *gomock.Call
func (*MockEncoderEncodeCall) Do ¶ added in v0.11.0
func (c *MockEncoderEncodeCall) Do(f func(context.Context, opencdc.StructuredData) (opencdc.RawData, error)) *MockEncoderEncodeCall
Do rewrite *gomock.Call.Do
func (*MockEncoderEncodeCall) DoAndReturn ¶ added in v0.11.0
func (c *MockEncoderEncodeCall) DoAndReturn(f func(context.Context, opencdc.StructuredData) (opencdc.RawData, error)) *MockEncoderEncodeCall
DoAndReturn rewrite *gomock.Call.DoAndReturn
func (*MockEncoderEncodeCall) Return ¶ added in v0.11.0
func (c *MockEncoderEncodeCall) Return(arg0 opencdc.RawData, arg1 error) *MockEncoderEncodeCall
Return rewrite *gomock.Call.Return
type MockEncoderMockRecorder ¶
type MockEncoderMockRecorder struct {
// contains filtered or unexported fields
}
MockEncoderMockRecorder is the mock recorder for MockEncoder.
func (*MockEncoderMockRecorder) Encode ¶
func (mr *MockEncoderMockRecorder) Encode(ctx, sd any) *MockEncoderEncodeCall
Encode indicates an expected call of Encode.