Documentation
¶
Index ¶
- func NewErrorProcessor(log.CtxLogger) sdk.Processor
- type ErrorProcessor
- func (p *ErrorProcessor) Configure(ctx context.Context, cfg config.Config) error
- func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
- func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
- func (p *ErrorProcessor) Specification() (sdk.Specification, error)
- type FilterProcessor
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ErrorProcessor ¶ added in v0.13.4
type ErrorProcessor struct { sdk.UnimplementedProcessor // contains filtered or unexported fields }
Example ¶
p := NewErrorProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Error record with custom error message`, Description: `This example shows how to configure the error processor to return a custom error message for a record using a Go template.`, Config: config.Config{ "message": "custom error message with data from record: {{.Metadata.foo}}", }, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"foo": "bar"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, Want: sdk.ErrorRecord{ Error: cerrors.New("custom error message with data from record: bar"), }, })
Output: processor returned error: custom error message with data from record: bar
func (*ErrorProcessor) MiddlewareOptions ¶ added in v0.13.4
func (*ErrorProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
func (*ErrorProcessor) Process ¶ added in v0.13.4
func (p *ErrorProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*ErrorProcessor) Specification ¶ added in v0.13.4
func (p *ErrorProcessor) Specification() (sdk.Specification, error)
type FilterProcessor ¶ added in v0.13.4
type FilterProcessor struct {
sdk.UnimplementedProcessor
}
Example ¶
p := NewFilterProcessor(log.Nop()) exampleutil.RunExample(p, exampleutil.Example{ Summary: `Filter out the record`, Config: config.Config{}, Have: opencdc.Record{ Operation: opencdc.OperationCreate, Metadata: map[string]string{"key1": "val1"}, Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}, Before: opencdc.StructuredData{"bar": "baz"}}, }, Want: sdk.FilterRecord{}, })
Output: processor filtered record out
func NewFilterProcessor ¶
func NewFilterProcessor(log.CtxLogger) *FilterProcessor
func (*FilterProcessor) MiddlewareOptions ¶ added in v0.13.4
func (*FilterProcessor) MiddlewareOptions() []sdk.ProcessorMiddlewareOption
func (*FilterProcessor) Process ¶ added in v0.13.4
func (p *FilterProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord
func (*FilterProcessor) Specification ¶ added in v0.13.4
func (p *FilterProcessor) Specification() (sdk.Specification, error)
Click to show internal directories.
Click to hide internal directories.