Documentation
¶
Index ¶
- Variables
- func Logger(ctx context.Context) *zerolog.Logger
- func ParseConfig(ctx context.Context, cfg config.Config, target any, params config.Parameters) error
- type ErrorRecord
- type FilterRecord
- type ProcessedRecord
- type Processor
- type ProcessorFunc
- type ProcessorMiddleware
- type ProcessorMiddlewareOption
- type ProcessorWithSchemaDecode
- type ProcessorWithSchemaDecodeConfig
- type ProcessorWithSchemaEncode
- type ProcessorWithSchemaEncodeConfig
- type Reference
- type ReferenceResolver
- type SingleRecord
- type Specification
- type UnimplementedProcessor
- func (UnimplementedProcessor) Configure(context.Context, config.Config) error
- func (UnimplementedProcessor) MiddlewareOptions() []ProcessorMiddlewareOption
- func (UnimplementedProcessor) Open(context.Context) error
- func (UnimplementedProcessor) Process(context.Context, []opencdc.Record) []ProcessedRecord
- func (UnimplementedProcessor) Specification() (Specification, error)
- func (UnimplementedProcessor) Teardown(context.Context) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrUnimplemented is returned in functions of plugins that don't implement // a certain method. ErrUnimplemented = errors.New("the processor plugin does not implement " + "this action, please check the source code of the processor and make sure " + "all required processor methods are implemented") ErrFilterRecord = errors.New("filter out this record") )
Functions ¶
func Logger ¶
Logger returns the logger for the processor. Please provide the context that is passed to any of the processor's methods (Configure, Open, Process, Teardown) to ensure that the log messages include contextual information.
func ParseConfig ¶
func ParseConfig( ctx context.Context, cfg config.Config, target any, params config.Parameters, ) error
ParseConfig sanitizes the configuration, applies defaults, validates it and copies the values into the target object. It combines the functionality provided by github.com/conduitio/conduit-commons/config.Config into a single convenient function. It is intended to be used in the Configure method of a processor to parse the configuration map.
The function does the following:
- Removes leading and trailing spaces from all keys and values in the configuration.
- Applies the default values defined in the parameter specifications to the configuration.
- Validates the configuration by checking for unrecognized parameters, type validations, and value validations.
- Copies configuration values into the target object. The target object must be a pointer to a struct.
Example ¶
cfg := map[string]string{ "foo": "bar ", // will be sanitized // "bar" is missing, will be set to the default value "nested.baz": "1m", } params := config.Parameters{ "foo": config.Parameter{Type: config.ParameterTypeString}, "bar": config.Parameter{ Type: config.ParameterTypeInt, Default: "42", }, "nested.baz": config.Parameter{Type: config.ParameterTypeDuration}, } var target struct { Foo string `json:"foo"` Bar int `json:"bar"` Nested struct { Baz time.Duration `json:"baz"` } `json:"nested"` } err := ParseConfig(context.Background(), cfg, &target, params) if err != nil { panic(err) } fmt.Printf("%+v", target)
Output: {Foo:bar Bar:42 Nested:{Baz:1m0s}}
Types ¶
type ErrorRecord ¶
type ErrorRecord struct { // Error is the error cause. Error error `json:"error"` }
ErrorRecord is a record that failed to be processed and will be nacked.
func (ErrorRecord) MarshalJSON ¶ added in v0.2.0
func (e ErrorRecord) MarshalJSON() ([]byte, error)
type FilterRecord ¶
type FilterRecord struct{}
FilterRecord is a record that will be acked and filtered out of the pipeline.
type ProcessedRecord ¶
type ProcessedRecord interface {
// contains filtered or unexported methods
}
ProcessedRecord is a record returned by the processor.
type Processor ¶
type Processor interface { // Specification contains the metadata of this processor like name, version, // description and a list of parameters expected in the configuration. Specification() (Specification, error) // Configure is the first function to be called in a processor. It provides the // processor with the configuration that needs to be validated and stored. // In case the configuration is not valid it should return an error. // Configure should not open connections or any other resources. It should solely // focus on parsing and validating the configuration itself. Configure(context.Context, config.Config) error // Open is called after Configure to signal the processor it can prepare to // start writing records. If needed, the processor should open connections and // start background jobs in this function. Open(context.Context) error // Process takes a number of records and processes them right away. // It should return a slice of ProcessedRecord that matches the length of // the input slice. If an error occurred while processing a specific record // it should be reflected in the ProcessedRecord with the same index as the // input record that caused the error. // Process should be idempotent, as it may be called multiple times with the // same records (e.g. after a restart when records were not flushed). Process(context.Context, []opencdc.Record) []ProcessedRecord // Teardown signals to the processor that the pipeline is shutting down and // there will be no more calls to any other function. After Teardown returns, // the processor will be discarded. Teardown(context.Context) error // MiddlewareOptions returns a list of ProcessorMiddlewareOption that can be // used to configure the default middleware for this processor. MiddlewareOptions() []ProcessorMiddlewareOption // contains filtered or unexported methods }
Processor receives records, manipulates them and returns back the processed records.
func ProcessorWithMiddleware ¶ added in v0.2.0
func ProcessorWithMiddleware(p Processor, middleware ...ProcessorMiddleware) Processor
ProcessorWithMiddleware wraps the processor into the supplied middleware.
type ProcessorFunc ¶
type ProcessorFunc struct { UnimplementedProcessor // contains filtered or unexported fields }
ProcessorFunc is an adapter allowing use of a function as a Processor.
func NewProcessorFunc ¶
func NewProcessorFunc(specs Specification, f func(context.Context, opencdc.Record) (opencdc.Record, error)) ProcessorFunc
NewProcessorFunc creates a ProcessorFunc from a function and specifications. This is useful for creating simple processors without needing to implement the full Processor interface.
func (ProcessorFunc) Process ¶
func (f ProcessorFunc) Process(ctx context.Context, records []opencdc.Record) []ProcessedRecord
func (ProcessorFunc) Specification ¶
func (f ProcessorFunc) Specification() (Specification, error)
type ProcessorMiddleware ¶ added in v0.2.0
ProcessorMiddleware wraps a Processor and adds functionality to it.
func DefaultProcessorMiddleware ¶ added in v0.2.0
func DefaultProcessorMiddleware(opts ...ProcessorMiddlewareOption) []ProcessorMiddleware
DefaultProcessorMiddleware returns a slice of middleware that is added to all processors by default.
type ProcessorMiddlewareOption ¶ added in v0.2.0
type ProcessorMiddlewareOption interface {
Apply(ProcessorMiddleware)
}
ProcessorMiddlewareOption is a function that can be used to configure a ProcessorMiddleware.
type ProcessorWithSchemaDecode ¶ added in v0.2.0
type ProcessorWithSchemaDecode struct {
Config ProcessorWithSchemaDecodeConfig
}
ProcessorWithSchemaDecode is a middleware that decodes the key and/or payload of a record using a schema before passing it to the processor. It takes the schema subject and version from the record metadata, fetches the schema from the schema service, and decodes the key and/or payload using the schema. If the schema subject and version is not found in the record metadata, it will log a warning and skip decoding the key and/or payload. This middleware is useful when the source connector sends the data with the schema attached.
It adds two parameters to the processor config:
- `sdk.schema.decode.key.enabled` - Whether to decode the record key using its corresponding schema from the schema registry.
- `sdk.schema.decode.payload.enabled` - Whether to decode the record payload using its corresponding schema from the schema registry.
func (*ProcessorWithSchemaDecode) Wrap ¶ added in v0.2.0
func (s *ProcessorWithSchemaDecode) Wrap(impl Processor) Processor
Wrap a Processor into the schema middleware. It will apply default configuration values if they are not explicitly set.
type ProcessorWithSchemaDecodeConfig ¶ added in v0.2.0
type ProcessorWithSchemaDecodeConfig struct { // Whether to decode the record payload using its corresponding schema from the schema registry. // If unset, defaults to true. PayloadEnabled *bool // Whether to decode the record key using its corresponding schema from the schema registry. // If unset, defaults to true. KeyEnabled *bool }
ProcessorWithSchemaDecodeConfig is the configuration for the ProcessorWithSchemaDecode middleware. Fields set to their zero value are ignored and will be set to the default value.
ProcessorWithSchemaDecodeConfig can be used as a ProcessorMiddlewareOption.
func (ProcessorWithSchemaDecodeConfig) Apply ¶ added in v0.2.0
func (c ProcessorWithSchemaDecodeConfig) Apply(m ProcessorMiddleware)
Apply sets the default configuration for the ProcessorWithSchemaDecode middleware.
func (ProcessorWithSchemaDecodeConfig) SchemaKeyEnabledParameterName ¶ added in v0.2.0
func (c ProcessorWithSchemaDecodeConfig) SchemaKeyEnabledParameterName() string
func (ProcessorWithSchemaDecodeConfig) SchemaPayloadEnabledParameterName ¶ added in v0.2.0
func (c ProcessorWithSchemaDecodeConfig) SchemaPayloadEnabledParameterName() string
type ProcessorWithSchemaEncode ¶ added in v0.2.0
type ProcessorWithSchemaEncode struct {
Config ProcessorWithSchemaEncodeConfig
}
ProcessorWithSchemaEncode is a middleware that encodes the record payload and/or key with a schema. It only encodes the record key/payload if the schema subject and version in the record metadata match the schema subject and version of the incoming record. If the schema subject and version is not found in the record metadata or it was changed by the processor, it will log a warning and skip the encoding.
func (*ProcessorWithSchemaEncode) Wrap ¶ added in v0.2.0
func (p *ProcessorWithSchemaEncode) Wrap(impl Processor) Processor
Wrap a Processor into the schema middleware. It will apply default configuration values if they are not explicitly set.
type ProcessorWithSchemaEncodeConfig ¶ added in v0.2.0
type ProcessorWithSchemaEncodeConfig struct { // Whether to encode the record payload using its corresponding schema from the schema registry. // If unset, defaults to true. PayloadEnabled *bool // Whether to encode the record key using its corresponding schema from the schema registry. // If unset, defaults to true. KeyEnabled *bool }
ProcessorWithSchemaEncodeConfig is the configuration for the ProcessorWithSchemaEncode middleware. Fields set to their zero value are ignored and will be set to the default value.
ProcessorWithSchemaEncodeConfig can be used as a ProcessorMiddlewareOption.
func (ProcessorWithSchemaEncodeConfig) Apply ¶ added in v0.2.0
func (c ProcessorWithSchemaEncodeConfig) Apply(m ProcessorMiddleware)
Apply sets the default configuration for the ProcessorWithSchemaEncode middleware.
func (ProcessorWithSchemaEncodeConfig) SchemaKeyEnabledParameterName ¶ added in v0.2.0
func (c ProcessorWithSchemaEncodeConfig) SchemaKeyEnabledParameterName() string
func (ProcessorWithSchemaEncodeConfig) SchemaPayloadEnabledParameterName ¶ added in v0.2.0
func (c ProcessorWithSchemaEncodeConfig) SchemaPayloadEnabledParameterName() string
type Reference ¶
Reference is an interface that represents a reference to a field in a record. It can be used to get and set the value of the field dynamically using input provided by the user.
type ReferenceResolver ¶
ReferenceResolver is a type that knows how to resolve a reference to a field in a record. It is used to specify the target of a processor's output.
Example (Nested) ¶
rec := opencdc.Record{ Key: opencdc.StructuredData{ "foo": map[string]any{ "bar": "baz", }, }, } resolver, err := NewReferenceResolver(".Key.foo.bar") if err != nil { panic(err) } ref, err := resolver.Resolve(&rec) if err != nil { panic(err) } fmt.Println("ref value:", ref.Get()) fmt.Println("setting the field now ...") err = ref.Set("qux") if err != nil { panic(err) } fmt.Println("new value:", rec.Key)
Output: ref value: baz setting the field now ... new value: map[foo:map[bar:qux]]
Example (SetNonExistingField) ¶
rec := opencdc.Record{} // empty record resolver, err := NewReferenceResolver(".Payload.After.foo.bar") if err != nil { panic(err) } ref, err := resolver.Resolve(&rec) if err != nil { panic(err) } fmt.Println("ref value:", ref.Get()) fmt.Println("setting the field now ...") err = ref.Set("hello") if err != nil { panic(err) } fmt.Println("new value:", rec.Payload.After)
Output: ref value: <nil> setting the field now ... new value: map[foo:map[bar:hello]]
Example (Simple) ¶
rec := opencdc.Record{ Position: []byte("my position"), } resolver, err := NewReferenceResolver(".Position") if err != nil { panic(err) } ref, err := resolver.Resolve(&rec) if err != nil { panic(err) } fmt.Println("ref value:", ref.Get()) fmt.Println("setting the position is not allowed, let's try it") err = ref.Set("foo") fmt.Println(err)
Output: ref value: my position setting the position is not allowed, let's try it cannot set .Position: cannot set immutable reference
func NewReferenceResolver ¶
func NewReferenceResolver(input string) (ReferenceResolver, error)
NewReferenceResolver creates a new reference resolver from the input string. The input string is a reference to a field in a record. It can be a simple field name or a path to a nested field. The returned resolver can be used to resolve a reference to the specified field in a record and manipulate that field (get or set the value).
Examples of valid references include:
- .Position
- .Operation
- .Key
- .Metadata.foo (to access a simple metadata value)
- .Metadata["f.o.123"] (to access a metadata value via a key containing non-alpha-numeric characters)
- .Payload.Before.foo (to access a nested field in payload before)
- .Payload.After["1"]["2"] (to access nested fields in payload after containing non-alpha-numeric characters)
func (ReferenceResolver) Resolve ¶
func (r ReferenceResolver) Resolve(rec *opencdc.Record) (Reference, error)
Resolve resolves the reference to a field in the record. If the reference cannot be resolved an error is returned. If the reference is valid but the field does not exist in the record, the field will be created. The returned reference can be used to set the value of the field.
type SingleRecord ¶
SingleRecord is a single processed record that will continue down the pipeline.
type Specification ¶
type Specification struct { // Name is the name of the processor. Name string `json:"name"` // Summary is a brief description of the processor and what it does. Summary string `json:"summary"` // Description is a more long form area appropriate for README-like text // that the author can provide for documentation about the specified // Parameters. Description string `json:"description"` // Version string. Should be a semver prepended with `v`, e.g. `v1.54.3`. Version string `json:"version"` // Author declares the entity that created or maintains this processor. Author string `json:"author"` // Parameters describe how to configure the processor. Parameters config.Parameters `json:"parameters"` }
Specification is returned by a processor when Specify is called. It contains information about the configuration parameters for processors and allows them to describe their parameters.
type UnimplementedProcessor ¶
type UnimplementedProcessor struct{}
UnimplementedProcessor should be embedded to have forward compatible implementations.
func (UnimplementedProcessor) Configure ¶
Configure is optional and can be overridden in the actual implementation.
func (UnimplementedProcessor) MiddlewareOptions ¶ added in v0.2.0
func (UnimplementedProcessor) MiddlewareOptions() []ProcessorMiddlewareOption
MiddlewareOptions is optional and can be overridden in the actual implementation.
func (UnimplementedProcessor) Open ¶
func (UnimplementedProcessor) Open(context.Context) error
Open is optional and can be overridden in the actual implementation.
func (UnimplementedProcessor) Process ¶
func (UnimplementedProcessor) Process(context.Context, []opencdc.Record) []ProcessedRecord
Process needs to be overridden in the actual implementation.
func (UnimplementedProcessor) Specification ¶
func (UnimplementedProcessor) Specification() (Specification, error)
Specification needs to be overridden in the actual implementation.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
internal
|
|
Package pprocutils provides the functionality for Conduit to set up utilities for processors.
|
Package pprocutils provides the functionality for Conduit to set up utilities for processors. |
mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
proto
|
|
Package wasm provides the functionality for communicating with Conduit as a standalone plugin.
|
Package wasm provides the functionality for communicating with Conduit as a standalone plugin. |