avro

package
v0.13.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package avro is a generated GoMock package.

Package avro is a generated GoMock package.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DecodeProcessor added in v0.13.4

type DecodeProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleDecodeProcessor", 54322)
defer cleanup()

client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url))
if err != nil {
	panic(fmt.Sprintf("failed to create schema registry client: %v", err))
}

_, err = client.CreateSchema(context.Background(), "example-decode", sr.Schema{
	Type: sr.TypeAvro,
	Schema: `
{
  "type":"record",
  "name":"record",
  "fields":[
    {"name":"myString","type":"string"},
    {"name":"myInt","type":"int"}
  ]
}`,
})
if err != nil {
	panic(fmt.Sprintf("failed to create schema: %v", err))
}

p := NewDecodeProcessor(log.Nop())
p.SetSchemaRegistry(client)

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Decode a record field in Avro format",
	Description: `This example shows the usage of the ` + "`avro.decode`" + ` processor.
The processor decodes the record's` + "`.Key`" + ` field using the schema that is
downloaded from the schema registry and needs to exist under the subject` + "`example-decode`" + `.
In this example we use the following schema:

` + "```json" + `
{
  "type":"record",
  "name":"record",
  "fields":[
    {"name":"myString","type":"string"},
    {"name":"myInt","type":"int"}
  ]
}
` + "```",
	Config: config.Config{
		"field": ".Key",
	},
	Have: opencdc.Record{
		Position:  opencdc.Position("test-position"),
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Key:       opencdc.RawData([]byte{0, 0, 0, 0, 1, 6, 98, 97, 114, 2}),
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("test-position"),
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Key: opencdc.StructuredData{
			"myString": "bar",
			"myInt":    1,
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,12 +1,15 @@
 {
   "position": "dGVzdC1wb3NpdGlvbg==",
   "operation": "create",
   "metadata": {
     "key1": "val1"
   },
-  "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002",
+  "key": {
+    "myInt": 1,
+    "myString": "bar"
+  },
   "payload": {
     "before": null,
     "after": null
   }
 }

func NewDecodeProcessor

func NewDecodeProcessor(logger log.CtxLogger) *DecodeProcessor

func (*DecodeProcessor) Configure added in v0.13.4

func (p *DecodeProcessor) Configure(ctx context.Context, c config.Config) error

func (*DecodeProcessor) Open added in v0.13.4

func (*DecodeProcessor) Process added in v0.13.4

func (p *DecodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*DecodeProcessor) SetSchemaRegistry added in v0.13.4

func (p *DecodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)

func (*DecodeProcessor) Specification added in v0.13.4

func (p *DecodeProcessor) Specification() (sdk.Specification, error)

func (*DecodeProcessor) Teardown added in v0.13.4

func (p *DecodeProcessor) Teardown(ctx context.Context) error

type EncodeProcessor added in v0.13.4

type EncodeProcessor struct {
	sdk.UnimplementedProcessor
	// contains filtered or unexported fields
}
Example (AutoRegister)
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleEncodeProcessor_autoRegister", 54322)
defer cleanup()

client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url))
if err != nil {
	panic(fmt.Sprintf("failed to create schema registry client: %v", err))
}

p := NewEncodeProcessor(log.Nop())
p.SetSchemaRegistry(client)

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Auto-register schema",
	Description: `This example shows the usage of the ` + "`avro.encode`" + ` processor
with the ` + "`autoRegister`" + ` schema strategy. The processor encodes the record's
` + "`.Payload.After`" + ` field using the schema that is extracted from the data
and registered on the fly under the subject ` + "`example-autoRegister`" + `.`,
	Config: config.Config{
		"schema.strategy":             "autoRegister",
		"schema.autoRegister.subject": "example-autoRegister",
	},
	Have: opencdc.Record{
		Position:  opencdc.Position("test-position"),
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Payload: opencdc.Change{
			After: opencdc.StructuredData{
				"myString": "bar",
				"myInt":    1,
				"myFloat":  2.3,
				"myMap": map[string]any{
					"foo": true,
					"bar": 2.2,
				},
				"myStruct": opencdc.StructuredData{
					"foo": 1,
					"bar": false,
				},
			},
		},
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("test-position"),
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Payload: opencdc.Change{
			After: opencdc.RawData([]byte{0, 0, 0, 0, 1, 102, 102, 102, 102, 102, 102, 2, 64, 2, 154, 153, 153, 153, 153, 153, 1, 64, 1, 6, 98, 97, 114, 0, 2}),
		},
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,24 +1,12 @@
 {
   "position": "dGVzdC1wb3NpdGlvbg==",
   "operation": "create",
   "metadata": {
     "key1": "val1"
   },
   "key": null,
   "payload": {
     "before": null,
-    "after": {
-      "myFloat": 2.3,
-      "myInt": 1,
-      "myMap": {
-        "bar": 2.2,
-        "foo": true
-      },
-      "myString": "bar",
-      "myStruct": {
-        "bar": false,
-        "foo": 1
-      }
-    }
+    "after": "\u0000\u0000\u0000\u0000\u0001ffffff\u0002@\u0002\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd\u0001@\u0001\u0006bar\u0000\u0002"
   }
 }
Example (PreRegistered)
url, cleanup := schemaregistrytest.ExampleSchemaRegistryURL("ExampleEncodeProcessor_preRegistered", 54322)
defer cleanup()

client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url))
if err != nil {
	panic(fmt.Sprintf("failed to create schema registry client: %v", err))
}

_, err = client.CreateSchema(context.Background(), "example-preRegistered", sr.Schema{
	Type: sr.TypeAvro,
	Schema: `
{
  "type":"record",
  "name":"record",
  "fields":[
    {"name":"myString","type":"string"},
    {"name":"myInt","type":"int"}
  ]
}`,
})
if err != nil {
	panic(fmt.Sprintf("failed to create schema: %v", err))
}

p := NewEncodeProcessor(log.Nop())
p.SetSchemaRegistry(client)

exampleutil.RunExample(p, exampleutil.Example{
	Summary: "Pre-register schema",
	Description: `This example shows the usage of the ` + "`avro.encode`" + ` processor
with the ` + "`preRegistered`" + ` schema strategy. When using this strategy, the
schema has to be manually pre-registered. In this example we use the following schema:

` + "```json" + `
{
  "type":"record",
  "name":"record",
  "fields":[
    {"name":"myString","type":"string"},
    {"name":"myInt","type":"int"}
  ]
}
` + "```" + `

The processor encodes the record's` + "`.Key`" + ` field using the above schema.`,
	Config: config.Config{
		"schema.strategy":              "preRegistered",
		"schema.preRegistered.subject": "example-preRegistered",
		"schema.preRegistered.version": "1",
		"field":                        ".Key",
	},
	Have: opencdc.Record{
		Position:  opencdc.Position("test-position"),
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Key: opencdc.StructuredData{
			"myString": "bar",
			"myInt":    1,
		},
	},
	Want: sdk.SingleRecord{
		Position:  opencdc.Position("test-position"),
		Operation: opencdc.OperationCreate,
		Metadata:  map[string]string{"key1": "val1"},
		Key:       opencdc.RawData([]byte{0, 0, 0, 0, 1, 6, 98, 97, 114, 2}),
	},
})
Output:

processor transformed record:
--- before
+++ after
@@ -1,15 +1,12 @@
 {
   "position": "dGVzdC1wb3NpdGlvbg==",
   "operation": "create",
   "metadata": {
     "key1": "val1"
   },
-  "key": {
-    "myInt": 1,
-    "myString": "bar"
-  },
+  "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002",
   "payload": {
     "before": null,
     "after": null
   }
 }

func NewEncodeProcessor

func NewEncodeProcessor(logger log.CtxLogger) *EncodeProcessor

func (*EncodeProcessor) Configure added in v0.13.4

func (p *EncodeProcessor) Configure(ctx context.Context, c config.Config) error

func (*EncodeProcessor) Open added in v0.13.4

func (*EncodeProcessor) Process added in v0.13.4

func (p *EncodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord

func (*EncodeProcessor) SetSchemaRegistry added in v0.13.4

func (p *EncodeProcessor) SetSchemaRegistry(registry schemaregistry.Registry)

func (*EncodeProcessor) Specification added in v0.13.4

func (p *EncodeProcessor) Specification() (sdk.Specification, error)

func (*EncodeProcessor) Teardown added in v0.13.4

func (p *EncodeProcessor) Teardown(context.Context) error

type MockDecoder

type MockDecoder struct {
	// contains filtered or unexported fields
}

MockDecoder is a mock of decoder interface.

func NewMockDecoder

func NewMockDecoder(ctrl *gomock.Controller) *MockDecoder

NewMockDecoder creates a new mock instance.

func (*MockDecoder) Decode

Decode mocks base method.

func (*MockDecoder) EXPECT

func (m *MockDecoder) EXPECT() *MockDecoderMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

type MockDecoderDecodeCall added in v0.11.0

type MockDecoderDecodeCall struct {
	*gomock.Call
}

MockDecoderDecodeCall wrap *gomock.Call

func (*MockDecoderDecodeCall) Do added in v0.11.0

Do rewrite *gomock.Call.Do

func (*MockDecoderDecodeCall) DoAndReturn added in v0.11.0

DoAndReturn rewrite *gomock.Call.DoAndReturn

func (*MockDecoderDecodeCall) Return added in v0.11.0

Return rewrite *gomock.Call.Return

type MockDecoderMockRecorder

type MockDecoderMockRecorder struct {
	// contains filtered or unexported fields
}

MockDecoderMockRecorder is the mock recorder for MockDecoder.

func (*MockDecoderMockRecorder) Decode

Decode indicates an expected call of Decode.

type MockEncoder

type MockEncoder struct {
	// contains filtered or unexported fields
}

MockEncoder is a mock of encoder interface.

func NewMockEncoder

func NewMockEncoder(ctrl *gomock.Controller) *MockEncoder

NewMockEncoder creates a new mock instance.

func (*MockEncoder) EXPECT

func (m *MockEncoder) EXPECT() *MockEncoderMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockEncoder) Encode

Encode mocks base method.

type MockEncoderEncodeCall added in v0.11.0

type MockEncoderEncodeCall struct {
	*gomock.Call
}

MockEncoderEncodeCall wrap *gomock.Call

func (*MockEncoderEncodeCall) Do added in v0.11.0

Do rewrite *gomock.Call.Do

func (*MockEncoderEncodeCall) DoAndReturn added in v0.11.0

DoAndReturn rewrite *gomock.Call.DoAndReturn

func (*MockEncoderEncodeCall) Return added in v0.11.0

Return rewrite *gomock.Call.Return

type MockEncoderMockRecorder

type MockEncoderMockRecorder struct {
	// contains filtered or unexported fields
}

MockEncoderMockRecorder is the mock recorder for MockEncoder.

func (*MockEncoderMockRecorder) Encode

func (mr *MockEncoderMockRecorder) Encode(ctx, sd any) *MockEncoderEncodeCall

Encode indicates an expected call of Encode.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL