Documentation ¶
Overview ¶
Package sdk implements utilities for implementing a Conduit connector.
Getting started ¶
Conduit connectors can be thought of as the edges of a Conduit pipeline. They are responsible for reading records from and writing records to third party systems. Conduit uses connectors as plugins that hide the intricacies of working with a particular third party system, so that Conduit itself can focus on efficiently processing records and moving them safely from sources to destinations.
To implement a connector, start by defining a global variable of type Connector, preferably in connector.go at the root of your project to make it easy to discover.
var Connector = sdk.Connector { NewSpecification: Specification, // Specification is my connector's specification NewSource: NewSource, // NewSource is the constructor for my source NewDestination: NewDestination, // NewDestination is the constructor for my destination }
Connector will be used as the starting point for accessing three main connector components that you need to provide:
- Specification contains general information about the plugin like its name and what it does. Writing a specification is relatively simple and straightforward, for more info check the corresponding field docs of Specification.
- Source is the connector part that knows how to fetch data from the third party system and convert it to a [Record].
- Destination is the connector part that knows how to write a [Record] to the third party system.
General advice for implementing connectors:
- The SDK provides a structured logger that can be retrieved with Logger. It allows you to create structured and leveled output that will be included as part of the Conduit logs.
- If you want to add logging to the hot path (i.e. code that is executed for every record that is read or written) you should use the log level "trace", otherwise it can greatly impact the performance of your connector.
Source ¶
A Source is responsible for continuously reading data from a third party system and returning it in form of a [Record].
Every Source implementation needs to include an UnimplementedSource to satisfy the interface. This allows us to potentially change the interface in the future while remaining backwards compatible with existing Source implementations.
type Source struct { sdk.UnimplementedSource }
You need to implement the functions required by Source and provide your own implementations. Please look at the documentation of Source for further information about individual functions.
You should also create a constructor function for your source struct. Note that this is the same function that should be set as the value of Connector.NewSource. The constructor should be used to wrap your source in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.
func NewSource() sdk.Source { return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) }
Additional tips for implementing a source:
- The SDK provides utilities for certain operations like creating records in SourceUtil. You can access it through the global variable Util.Source.
- The function Source.Ack is optional and does not have to be implemented.
- Source is responsible for creating record positions that should ideally uniquely identify a record. Think carefully about what you will store in the position, it should give the source enough information to resume reading records at that specific position.
- The SDK provides acceptance tests, if your source doesn't pass it means your implementation has a bug¹.
Destination ¶
A Destination is responsible for writing [Record] to third party systems.
Every Destination implementation needs to include an UnimplementedDestination to satisfy the interface. This allows us to potentially change the interface in the future while remaining backwards compatible with existing Destination implementations.
type Destination struct { sdk.UnimplementedSource }
You need to implement the functions required by Destination and provide your own implementations. Please look at the documentation of Destination for further information about individual functions.
You should also create a constructor function for your destination struct. Note that this is the same function that should be set as the value of Connector.NewDestination. The constructor should be used to wrap your destination in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.
func NewDestination() sdk.Destination { return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) }
Additional tips for implementing a destination:
- The SDK provides utilities for certain operations like routing records based on their operation in DestinationUtil. You can access it through the global variable Util.Destination.
- If your destination writes records as a whole to the destination you should use [Record].Bytes to get the raw record representation.
- If possible, make your destination writes idempotent. It is possible that the destination will receive the same record twice after a pipeline restart.
- Some sources won't be able to distinguish create and update operations. In case your destination is updating data in place, we recommend to upsert the record on a create or update.
- The SDK provides acceptance tests, if your destination doesn't pass it means your implementation has a bug¹.
Acceptance tests ¶
The SDK provides acceptance tests that can be run in a simple Go test.¹
To run acceptance tests you should create a test file, preferably named acceptance_test.go at the root of your project to make it easy to discover. Inside create a Go test where you trigger the function AcceptanceTest.
func TestAcceptance(t *testing.T) { // set up dependencies here sdk.AcceptanceTest(t, sdk.ConfigurableAcceptanceTestDriver{ Config: sdk.ConfigurableAcceptanceTestDriverConfig{ Connector: Connector, // Connector is the global variable from your connector SourceConfig: map[string]string{ … }, DestinationConfig: map[string]string{ … }, }, } }
AcceptanceTest uses the AcceptanceTestDriver for certain operations. The SDK already provides a default implementation for the driver with ConfigurableAcceptanceTestDriver, although you can supply your own implementation if you need to adjust the behavior of acceptance tests for your connector.
Some acceptance tests will try to write data using the destination and then read the same data using the source. Because of that you need to make sure that the configurations point both to the same exact data store (e.g. in case of the file connector the source and destination need to read and write to the same file).
If your connector does not implement both sides of the connector (a source and a destination) you will need to write a custom driver that knows how to read or write, depending on which side of the connector is not implemented. Here is an example how to do that:
type CustomAcceptanceTestDriver struct { sdk.ConfigurableAcceptanceTestDriver } func (d *CustomAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record { // implement read } func (d *CustomAcceptanceTestDriver) WriteToSource(t *testing.T, records []opencdc.Record) []opencdc.Record { // implement write }
For more information about what behavior can be customized please refer to the AcceptanceTestDriver interface.
¹Acceptance tests are currently still experimental.
Index ¶
- Variables
- func AcceptanceTest(t *testing.T, driver AcceptanceTestDriver)
- func BenchmarkSource(b *testing.B, s Source, cfg map[string]string)
- func ConnectorIDFromContext(ctx context.Context) string
- func Logger(ctx context.Context) *zerolog.Logger
- func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig) pconnector.DestinationPlugin
- func NewSourcePlugin(impl Source, cfg pconnector.PluginConfig) pconnector.SourcePlugin
- func NewSpecifierPlugin(specs Specification, source Source, dest Destination) pconnector.SpecifierPlugin
- func Serve(c Connector)
- type AcceptanceTestDriver
- type ConfigurableAcceptanceTestDriver
- func (d ConfigurableAcceptanceTestDriver) AfterTest(t *testing.T)
- func (d ConfigurableAcceptanceTestDriver) BeforeTest(t *testing.T)
- func (d ConfigurableAcceptanceTestDriver) Connector() Connector
- func (d ConfigurableAcceptanceTestDriver) Context() context.Context
- func (d ConfigurableAcceptanceTestDriver) DestinationConfig(*testing.T) config.Config
- func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) opencdc.Data
- func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op opencdc.Operation) opencdc.Record
- func (d ConfigurableAcceptanceTestDriver) GenerateValue(t *testing.T) interface{}
- func (d ConfigurableAcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.Option
- func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record
- func (d ConfigurableAcceptanceTestDriver) ReadTimeout() time.Duration
- func (d ConfigurableAcceptanceTestDriver) Skip(t *testing.T)
- func (d ConfigurableAcceptanceTestDriver) SourceConfig(*testing.T) config.Config
- func (d ConfigurableAcceptanceTestDriver) WriteTimeout() time.Duration
- func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []opencdc.Record) []opencdc.Record
- type ConfigurableAcceptanceTestDriverConfig
- type Connector
- type Converter
- type DebeziumConverter
- type Destination
- type DestinationMiddleware
- type DestinationMiddlewareOption
- type DestinationUtil
- type DestinationWithBatch
- type DestinationWithBatchConfig
- type DestinationWithRateLimit
- type DestinationWithRateLimitConfig
- type DestinationWithRecordFormat
- type DestinationWithRecordFormatConfig
- func (c DestinationWithRecordFormatConfig) Apply(m DestinationMiddleware)
- func (c DestinationWithRecordFormatConfig) DefaultRecordSerializers() []RecordSerializer
- func (c DestinationWithRecordFormatConfig) RecordFormatOptionsParameterName() string
- func (c DestinationWithRecordFormatConfig) RecordFormatParameterName() string
- type DestinationWithSchemaExtraction
- type DestinationWithSchemaExtractionConfig
- type Encoder
- type GenerateDataType
- type GenericRecordSerializer
- type JSONEncoder
- type OpenCDCConverter
- type RecordSerializer
- type Source
- type SourceMiddleware
- type SourceMiddlewareOption
- type SourceUtil
- func (SourceUtil) NewRecordCreate(position opencdc.Position, metadata opencdc.Metadata, key opencdc.Data, ...) opencdc.Record
- func (SourceUtil) NewRecordDelete(position opencdc.Position, metadata opencdc.Metadata, key opencdc.Data, ...) opencdc.Record
- func (SourceUtil) NewRecordSnapshot(position opencdc.Position, metadata opencdc.Metadata, key opencdc.Data, ...) opencdc.Record
- func (SourceUtil) NewRecordUpdate(position opencdc.Position, metadata opencdc.Metadata, key opencdc.Data, ...) opencdc.Record
- type SourceWithBatch
- type SourceWithBatchConfig
- type SourceWithEncoding
- type SourceWithSchemaContext
- type SourceWithSchemaContextConfig
- type SourceWithSchemaExtraction
- type SourceWithSchemaExtractionConfig
- func (c SourceWithSchemaExtractionConfig) Apply(m SourceMiddleware)
- func (c SourceWithSchemaExtractionConfig) SchemaKeyEnabledParameterName() string
- func (c SourceWithSchemaExtractionConfig) SchemaKeySubjectParameterName() string
- func (c SourceWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName() string
- func (c SourceWithSchemaExtractionConfig) SchemaPayloadSubjectParameterName() string
- func (c SourceWithSchemaExtractionConfig) SchemaTypeParameterName() string
- type Specification
- type TemplateRecordSerializer
- type UnimplementedDestination
- func (UnimplementedDestination) Configure(context.Context, config.Config) error
- func (UnimplementedDestination) LifecycleOnCreated(context.Context, config.Config) error
- func (UnimplementedDestination) LifecycleOnDeleted(context.Context, config.Config) error
- func (UnimplementedDestination) LifecycleOnUpdated(context.Context, config.Config, config.Config) error
- func (UnimplementedDestination) Open(context.Context) error
- func (UnimplementedDestination) Parameters() config.Parameters
- func (UnimplementedDestination) Teardown(context.Context) error
- func (UnimplementedDestination) Write(context.Context, []opencdc.Record) (int, error)
- type UnimplementedSource
- func (UnimplementedSource) Ack(context.Context, opencdc.Position) error
- func (UnimplementedSource) Configure(context.Context, config.Config) error
- func (UnimplementedSource) LifecycleOnCreated(context.Context, config.Config) error
- func (UnimplementedSource) LifecycleOnDeleted(context.Context, config.Config) error
- func (UnimplementedSource) LifecycleOnUpdated(context.Context, config.Config, config.Config) error
- func (UnimplementedSource) Open(context.Context, opencdc.Position) error
- func (UnimplementedSource) Parameters() config.Parameters
- func (UnimplementedSource) Read(context.Context) (opencdc.Record, error)
- func (s UnimplementedSource) ReadN(context.Context, int) ([]opencdc.Record, error)
- func (UnimplementedSource) Teardown(context.Context) error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrBackoffRetry can be returned by Source.Read to signal the SDK there is // no record to fetch right now and it should try again later. ErrBackoffRetry = errors.New("backoff retry") // ErrUnimplemented is returned in functions of plugins that don't implement // a certain method. ErrUnimplemented = errors.New("the connector plugin does not implement this action, please check the source code of the connector and make sure all required connector methods are implemented") )
var Util = struct { // SourceUtil provides utility methods for implementing a source. Source SourceUtil // SourceUtil provides utility methods for implementing a destination. Destination DestinationUtil // ParseConfig sanitizes the configuration, applies defaults, validates it and // copies the values into the target object. It combines the functionality // provided by github.com/conduitio/conduit-commons/config.Config into a single // convenient function. It is intended to be used in the Configure method of a // connector to parse the configuration map. // // The configuration parameters should be provided through NewSource().Parameters() // or NewDestination().Parameters() so that parameters from SDK middlewares are // included too. // // The function does the following: // - Removes leading and trailing spaces from all keys and values in the // configuration. // - Applies the default values defined in the parameter specifications to the // configuration. // - Validates the configuration by checking for unrecognized parameters, type // validations, and value validations. // - Copies configuration values into the target object. The target object must // be a pointer to a struct. ParseConfig func(ctx context.Context, cfg config.Config, target any, params config.Parameters) error }{ ParseConfig: parseConfig, }
Util provides utilities for implementing connectors.
Functions ¶
func AcceptanceTest ¶ added in v0.3.0
func AcceptanceTest(t *testing.T, driver AcceptanceTestDriver)
AcceptanceTest is the acceptance test that all connector implementations should pass. It should manually be called from a test case in each implementation:
func TestAcceptance(t *testing.T) { // set up test dependencies ... sdk.AcceptanceTest(t, sdk.ConfigurableAcceptanceTestDriver{ Config: sdk.ConfigurableAcceptanceTestDriverConfig{ Connector: myConnector, SourceConfig: config.Config{...}, // valid source config DestinationConfig: config.Config{...}, // valid destination config }, }) }
func BenchmarkSource ¶ added in v0.9.0
BenchmarkSource is a benchmark that any source implementation can run to figure out its performance. The benchmark expects that the source resource contains at least b.N number of records. This should be prepared before the benchmark is executed. The function should be manually called from a benchmark function:
func BenchmarkConnector(b *testing.B) { // set up test dependencies and write b.N records to source resource ... sdk.BenchmarkSource( b, mySourceConnector, map[string]string{...}, // valid source config ) }
The benchmark can be run with a specific number of records by supplying the option -benchtime=Nx, where N is the number of records to be benchmarked (e.g. -benchtime=100x benchmarks reading 100 records).
func ConnectorIDFromContext ¶ added in v0.10.1
ConnectorIDFromContext fetches the connector ID from the context. If the context does not contain a connector ID it returns an empty string.
func Logger ¶
Logger returns an instance of a logger that can be used for leveled and structured logging in a plugin. The logger will respect the log level configured in Conduit.
func NewDestinationPlugin ¶
func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig) pconnector.DestinationPlugin
NewDestinationPlugin takes a Destination and wraps it into an adapter that converts it into a pconnector.DestinationPlugin. If the parameter is nil it will wrap UnimplementedDestination instead.
func NewSourcePlugin ¶
func NewSourcePlugin(impl Source, cfg pconnector.PluginConfig) pconnector.SourcePlugin
NewSourcePlugin takes a Source and wraps it into an adapter that converts it into a pconnector.SourcePlugin. If the parameter is nil it will wrap UnimplementedSource instead.
func NewSpecifierPlugin ¶
func NewSpecifierPlugin(specs Specification, source Source, dest Destination) pconnector.SpecifierPlugin
NewSpecifierPlugin takes a Specification and wraps it into an adapter that converts it into a pconnector.SpecifierPlugin.
func Serve ¶
func Serve(c Connector)
Serve starts the plugin and takes care of its whole lifecycle by blocking until the plugin can safely stop running. Any fixable errors will be output to os.Stderr and the process will exit with a status code of 1. Serve will panic for unexpected conditions where a user's fix is unknown.
It is essential that nothing gets written to stdout or stderr before this function is called, as the first output is used to perform the initial handshake.
Plugins should call Serve in their main() functions.
Types ¶
type AcceptanceTestDriver ¶ added in v0.3.0
type AcceptanceTestDriver interface { // Context returns the context to use in tests. Context() context.Context // Connector is the connector to be tested. Connector() Connector // SourceConfig should be a valid config for a source connector, reading // from the same location as the destination will write to. SourceConfig(*testing.T) config.Config // DestinationConfig should be a valid config for a destination connector, // writing to the same location as the source will read from. DestinationConfig(*testing.T) config.Config // BeforeTest is executed before each acceptance test. BeforeTest(*testing.T) // AfterTest is executed after each acceptance test. AfterTest(*testing.T) // GoleakOptions will be applied to goleak.VerifyNone. Can be used to // suppress false positive goroutine leaks. GoleakOptions(*testing.T) []goleak.Option // GenerateRecord will generate a new Record for a certain Operation. It's // the responsibility of the AcceptanceTestDriver implementation to provide // records with appropriate contents (e.g. appropriate type of payload). // The generated record will contain mixed data types in the field Key and // Payload (i.e. RawData and StructuredData), unless configured otherwise // (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType). GenerateRecord(*testing.T, opencdc.Operation) opencdc.Record // WriteToSource receives a slice of records that should be prepared in the // 3rd party system so that the source will read them. The returned slice // will be used to verify the source connector can successfully execute // reads. // It is encouraged for the driver to return the same slice, unless there is // no way to write the records to the 3rd party system, then the returning // slice should contain the expected records a source should read. WriteToSource(*testing.T, []opencdc.Record) []opencdc.Record // ReadFromDestination should return a slice with the records that were // written to the destination. The slice will be used to verify the // destination has successfully executed writes. // The parameter contains records that were actually written to the // destination. These will be compared to the returned slice of records. It // is encouraged for the driver to only touch the input records to change // the order of records and to not change the records themselves. ReadFromDestination(*testing.T, []opencdc.Record) []opencdc.Record // ReadTimeout controls the time the test should wait for a read operation // to return before it considers the operation as failed. ReadTimeout() time.Duration // WriteTimeout controls the time the test should wait for a write operation // to return before it considers the operation as failed. WriteTimeout() time.Duration }
AcceptanceTestDriver is the object that each test uses for fetching the connector and its configurations. The SDK provides a default implementation ConfigurableAcceptanceTestDriver that should fit most use cases. In case more flexibility is needed you can create your own driver, include the default driver in the struct and override methods as needed.
type ConfigurableAcceptanceTestDriver ¶ added in v0.3.0
type ConfigurableAcceptanceTestDriver struct { Config ConfigurableAcceptanceTestDriverConfig // contains filtered or unexported fields }
ConfigurableAcceptanceTestDriver is the default implementation of AcceptanceTestDriver. It provides a convenient way of configuring the driver without the need of implementing a custom driver from scratch.
func (ConfigurableAcceptanceTestDriver) AfterTest ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) AfterTest(t *testing.T)
func (ConfigurableAcceptanceTestDriver) BeforeTest ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) BeforeTest(t *testing.T)
func (ConfigurableAcceptanceTestDriver) Connector ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) Connector() Connector
func (ConfigurableAcceptanceTestDriver) Context ¶ added in v0.9.0
func (d ConfigurableAcceptanceTestDriver) Context() context.Context
func (ConfigurableAcceptanceTestDriver) DestinationConfig ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) DestinationConfig(*testing.T) config.Config
func (ConfigurableAcceptanceTestDriver) GenerateData ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) opencdc.Data
GenerateData generates either RawData or StructuredData depending on the configured data type (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType).
func (ConfigurableAcceptanceTestDriver) GenerateRecord ¶ added in v0.3.0
func (ConfigurableAcceptanceTestDriver) GenerateValue ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) GenerateValue(t *testing.T) interface{}
GenerateValue generates a random value of a random builtin type.
func (ConfigurableAcceptanceTestDriver) GoleakOptions ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.Option
func (ConfigurableAcceptanceTestDriver) ReadFromDestination ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record
ReadFromDestination by default opens the source and reads all records from the source. It is expected that the destination is writing to the same location the source is reading from. If the connector does not implement a source the function will fail the test.
func (ConfigurableAcceptanceTestDriver) ReadTimeout ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) ReadTimeout() time.Duration
func (ConfigurableAcceptanceTestDriver) Skip ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) Skip(t *testing.T)
func (ConfigurableAcceptanceTestDriver) SourceConfig ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) SourceConfig(*testing.T) config.Config
func (ConfigurableAcceptanceTestDriver) WriteTimeout ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) WriteTimeout() time.Duration
func (ConfigurableAcceptanceTestDriver) WriteToSource ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []opencdc.Record) []opencdc.Record
WriteToSource by default opens the destination and writes records to the destination. It is expected that the destination is writing to the same location the source is reading from. If the connector does not implement a destination the function will fail the test.
type ConfigurableAcceptanceTestDriverConfig ¶ added in v0.3.0
type ConfigurableAcceptanceTestDriverConfig struct { // Context is the context to use in tests. The default is a context with a // logger that writes to the test output. //nolint:containedctx // We are using this as a configuration option. Context context.Context // Connector is the connector to be tested. Connector Connector // SourceConfig should be a valid config for a source connector, reading // from the same location as the destination will write to. SourceConfig config.Config // DestinationConfig should be a valid config for a destination connector, // writing to the same location as the source will read from. DestinationConfig config.Config // BeforeTest is executed before each acceptance test. BeforeTest func(t *testing.T) // AfterTest is executed after each acceptance test. AfterTest func(t *testing.T) // GoleakOptions will be applied to goleak.VerifyNone. Can be used to // suppress false positive goroutine leaks. GoleakOptions []goleak.Option // Skip is a slice of regular expressions used to identify tests that should // be skipped. The full test name will be matched against all regular // expressions and the test will be skipped if a match is found. Skip []string // GenerateDataType controls which Data type will be generated in test // records. The default is GenerateMixedData which will produce both RawData // and StructuredData. To generate only one type of data set this field to // GenerateRawData or GenerateStructuredData. GenerateDataType GenerateDataType // ReadTimeout controls the time the test should wait for a read operation // to return a record before it considers the operation as failed. The // default timeout is 5 seconds. This value should be changed only if there // is a good reason (uncontrollable limitations of the 3rd party system). ReadTimeout time.Duration // WriteTimeout controls the time the test should wait for a write operation // to return a record before it considers the operation as failed. The // default timeout is 5 seconds. This value should be changed only if there // is a good reason (uncontrollable limitations of the 3rd party system). WriteTimeout time.Duration }
ConfigurableAcceptanceTestDriverConfig contains the configuration for ConfigurableAcceptanceTestDriver.
type Connector ¶ added in v0.3.0
type Connector struct { // NewSpecification should create a new Specification that describes the // connector. This field is mandatory, if it is empty the connector won't // work. NewSpecification func() Specification // NewSource should create a new Source plugin. If the plugin doesn't // implement a source connector this field can be nil. NewSource func() Source // NewDestination should create a new Destination plugin. If the plugin // doesn't implement a destination connector this field can be nil. NewDestination func() Destination }
Connector combines all constructors for each plugin into one struct.
type Converter ¶ added in v0.5.0
type Converter interface { Name() string Configure(map[string]string) (Converter, error) Convert(opencdc.Record) (any, error) }
Converter is a type that can change the structure of a Record. It's used in destination connectors to change the output structure (e.g. opencdc records, debezium records etc.).
type DebeziumConverter ¶ added in v0.5.0
DebeziumConverter outputs a Debezium record.
func (DebeziumConverter) Configure ¶ added in v0.5.0
func (c DebeziumConverter) Configure(opt map[string]string) (Converter, error)
func (DebeziumConverter) Convert ¶ added in v0.5.0
func (c DebeziumConverter) Convert(r opencdc.Record) (any, error)
func (DebeziumConverter) Name ¶ added in v0.5.0
func (c DebeziumConverter) Name() string
type Destination ¶
type Destination interface { // Parameters is a map of named Parameters that describe how to configure // the Destination. Parameters() config.Parameters // Configure is the first function to be called in a connector. It provides the // connector with the configuration that needs to be validated and stored. // In case the configuration is not valid it should return an error. // Testing if your connector can reach the configured data source should be // done in Open, not in Configure. // The connector SDK will sanitize, apply defaults and validate the // configuration before calling this function. This means that the // configuration will always contain all keys defined in Parameters // (unprovided keys will have their default values) and all non-empty // values will be of the correct type. Configure(context.Context, config.Config) error // Open is called after Configure to signal the plugin it can prepare to // start writing records. If needed, the plugin should open connections in // this function. Open(context.Context) error // Write writes len(r) records from r to the destination right away without // caching. It should return the number of records written from r // (0 <= n <= len(r)) and any error encountered that caused the write to // stop early. Write must return a non-nil error if it returns n < len(r). Write(ctx context.Context, r []opencdc.Record) (n int, err error) // Teardown signals to the plugin that all records were written and there // will be no more calls to any other function. After Teardown returns, the // plugin should be ready for a graceful shutdown. Teardown(context.Context) error // LifecycleOnCreated is called after Configure and before Open when the // connector is run for the first time. This call will be skipped if the // connector was already started before. This method can be used to do some // initialization that needs to happen only once in the lifetime of a // connector (e.g. create a bucket). Anything that the connector creates in // this method is considered to be owned by this connector and should be // cleaned up in LifecycleOnDeleted. LifecycleOnCreated(ctx context.Context, config config.Config) error // LifecycleOnUpdated is called after Configure and before Open when the // connector configuration has changed since the last run. This call will be // skipped if the connector configuration did not change. It can be used to // update anything that was initialized in LifecycleOnCreated, in case the // configuration change affects it. LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error // LifecycleOnDeleted is called when the connector was deleted. It will be // the only method that is called in that case. This method can be used to // clean up anything that was initialized in LifecycleOnCreated. LifecycleOnDeleted(ctx context.Context, config config.Config) error // contains filtered or unexported methods }
Destination receives records from Conduit and writes them to 3rd party resources. All implementations must embed UnimplementedDestination for forward compatibility.
func DestinationWithMiddleware ¶ added in v0.3.0
func DestinationWithMiddleware(d Destination, middleware ...DestinationMiddleware) Destination
DestinationWithMiddleware wraps the destination into the supplied middleware.
type DestinationMiddleware ¶ added in v0.3.0
type DestinationMiddleware interface {
Wrap(Destination) Destination
}
DestinationMiddleware wraps a Destination and adds functionality to it.
func DefaultDestinationMiddleware ¶ added in v0.3.0
func DefaultDestinationMiddleware(opts ...DestinationMiddlewareOption) []DestinationMiddleware
DefaultDestinationMiddleware returns a slice of middleware that should be added to all destinations unless there's a good reason not to.
type DestinationMiddlewareOption ¶ added in v0.10.0
type DestinationMiddlewareOption interface {
Apply(DestinationMiddleware)
}
DestinationMiddlewareOption can be used to change the behavior of the default destination middleware created with DefaultDestinationMiddleware.
type DestinationUtil ¶ added in v0.3.0
type DestinationUtil struct{}
DestinationUtil provides utility methods for implementing a destination. Use it by calling Util.Destination.*.
func (DestinationUtil) Route ¶ added in v0.3.0
func (DestinationUtil) Route( ctx context.Context, rec opencdc.Record, handleCreate func(context.Context, opencdc.Record) error, handleUpdate func(context.Context, opencdc.Record) error, handleDelete func(context.Context, opencdc.Record) error, handleSnapshot func(context.Context, opencdc.Record) error, ) error
Route makes it easier to implement a destination that mutates entities in place and thus handles different operations differently. It will inspect the operation on the record and based on that choose which handler to call.
Example usage:
func (d *Destination) Write(ctx context.Context, r opencdc.Record) error { return d.Util.Route(ctx, r, d.handleInsert, d.handleUpdate, d.handleDelete, d.handleSnapshot, // we could also reuse d.handleInsert ) } func (d *Destination) handleInsert(ctx context.Context, r opencdc.Record) error { ... }
type DestinationWithBatch ¶ added in v0.3.0
type DestinationWithBatch struct {
Config DestinationWithBatchConfig
}
DestinationWithBatch adds support for batching on the destination. It adds two parameters to the destination config:
- `sdk.batch.size` - Maximum size of batch before it gets written to the destination.
- `sdk.batch.delay` - Maximum delay before an incomplete batch is written to the destination.
To change the defaults of these parameters use the fields of this struct.
func (*DestinationWithBatch) Wrap ¶ added in v0.3.0
func (d *DestinationWithBatch) Wrap(impl Destination) Destination
Wrap a Destination into the batching middleware.
type DestinationWithBatchConfig ¶ added in v0.10.0
type DestinationWithBatchConfig struct { // BatchSize is the default value for the batch size. BatchSize int // BatchDelay is the default value for the batch delay. BatchDelay time.Duration }
DestinationWithBatchConfig is the configuration for the DestinationWithBatch middleware. Fields set to their zero value are ignored and will be set to the default value.
DestinationWithBatchConfig can be used as a DestinationMiddlewareOption.
func (DestinationWithBatchConfig) Apply ¶ added in v0.10.0
func (c DestinationWithBatchConfig) Apply(m DestinationMiddleware)
Apply sets the default configuration for the DestinationWithBatch middleware.
func (DestinationWithBatchConfig) BatchDelayParameterName ¶ added in v0.10.0
func (c DestinationWithBatchConfig) BatchDelayParameterName() string
func (DestinationWithBatchConfig) BatchSizeParameterName ¶ added in v0.10.0
func (c DestinationWithBatchConfig) BatchSizeParameterName() string
type DestinationWithRateLimit ¶ added in v0.3.0
type DestinationWithRateLimit struct {
Config DestinationWithRateLimitConfig
}
DestinationWithRateLimit adds support for rate limiting to the destination. It adds two parameters to the destination config:
- `sdk.rate.perSecond` - Maximum number of records written per second (0 means no rate limit).
- `sdk.rate.burst` - Allow bursts of at most X records (0 or less means that bursts are not limited). Only takes effect if a rate limit per second is set. Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the effective batch size will be equal to `sdk.rate.burst`.
To change the defaults of these parameters use the fields of this struct.
func (*DestinationWithRateLimit) Wrap ¶ added in v0.3.0
func (d *DestinationWithRateLimit) Wrap(impl Destination) Destination
Wrap a Destination into the rate limiting middleware.
type DestinationWithRateLimitConfig ¶ added in v0.10.0
type DestinationWithRateLimitConfig struct { // RatePerSecond is the default value for the rate per second. RatePerSecond float64 // Burst is the default value for the allowed burst count. Burst int }
DestinationWithRateLimitConfig is the configuration for the DestinationWithRateLimit middleware. Fields set to their zero value are ignored and will be set to the default value.
DestinationWithRateLimitConfig can be used as a DestinationMiddlewareOption.
func (DestinationWithRateLimitConfig) Apply ¶ added in v0.10.0
func (c DestinationWithRateLimitConfig) Apply(m DestinationMiddleware)
Apply sets the default configuration for the DestinationWithRateLimit middleware.
func (DestinationWithRateLimitConfig) RateBurstParameterName ¶ added in v0.10.0
func (c DestinationWithRateLimitConfig) RateBurstParameterName() string
func (DestinationWithRateLimitConfig) RatePerSecondParameterName ¶ added in v0.10.0
func (c DestinationWithRateLimitConfig) RatePerSecondParameterName() string
type DestinationWithRecordFormat ¶ added in v0.5.0
type DestinationWithRecordFormat struct {
Config DestinationWithRecordFormatConfig
}
DestinationWithRecordFormat adds support for changing the output format of records, specifically of the Record.Bytes method. It adds two parameters to the destination config:
- `sdk.record.format` - The format of the output record. The inclusion validation exposes a list of valid options.
- `sdk.record.format.options` - Options are used to configure the format.
func (*DestinationWithRecordFormat) Wrap ¶ added in v0.5.0
func (d *DestinationWithRecordFormat) Wrap(impl Destination) Destination
Wrap a Destination into the record format middleware.
type DestinationWithRecordFormatConfig ¶ added in v0.10.0
type DestinationWithRecordFormatConfig struct { // DefaultRecordFormat is the default record format. DefaultRecordFormat string RecordSerializers []RecordSerializer }
DestinationWithRecordFormatConfig is the configuration for the DestinationWithRecordFormat middleware. Fields set to their zero value are ignored and will be set to the default value.
DestinationWithRecordFormatConfig can be used as a DestinationMiddlewareOption.
func (DestinationWithRecordFormatConfig) Apply ¶ added in v0.10.0
func (c DestinationWithRecordFormatConfig) Apply(m DestinationMiddleware)
Apply sets the default configuration for the DestinationWithRecordFormat middleware.
func (DestinationWithRecordFormatConfig) DefaultRecordSerializers ¶ added in v0.10.0
func (c DestinationWithRecordFormatConfig) DefaultRecordSerializers() []RecordSerializer
DefaultRecordSerializers returns the list of record serializers that are used if DestinationWithRecordFormatConfig.RecordSerializers is nil.
func (DestinationWithRecordFormatConfig) RecordFormatOptionsParameterName ¶ added in v0.10.0
func (c DestinationWithRecordFormatConfig) RecordFormatOptionsParameterName() string
func (DestinationWithRecordFormatConfig) RecordFormatParameterName ¶ added in v0.10.0
func (c DestinationWithRecordFormatConfig) RecordFormatParameterName() string
type DestinationWithSchemaExtraction ¶ added in v0.10.0
type DestinationWithSchemaExtraction struct {
Config DestinationWithSchemaExtractionConfig
}
DestinationWithSchemaExtraction is a middleware that extracts and decodes the key and/or payload of a record using a schema. It takes the schema subject and version from the record metadata, fetches the schema from the schema service, and decodes the key and/or payload using the schema. If the schema subject and version is not found in the record metadata, it will log a warning and skip decoding the key and/or payload. This middleware is useful when the source connector sends the data with the schema attached. This middleware is the counterpart of SourceWithSchemaExtraction.
It adds two parameters to the destination config:
- `sdk.schema.extract.key.enabled` - Whether to extract and decode the record key with a schema.
- `sdk.schema.extract.payload.enabled` - Whether to extract and decode the record payload with a schema.
func (*DestinationWithSchemaExtraction) Wrap ¶ added in v0.10.0
func (s *DestinationWithSchemaExtraction) Wrap(impl Destination) Destination
Wrap a Destination into the schema middleware. It will apply default configuration values if they are not explicitly set.
type DestinationWithSchemaExtractionConfig ¶ added in v0.10.0
type DestinationWithSchemaExtractionConfig struct { // Whether to extract and decode the record payload with a schema. // If unset, defaults to true. PayloadEnabled *bool // Whether to extract and decode the record key with a schema. // If unset, defaults to true. KeyEnabled *bool }
DestinationWithSchemaExtractionConfig is the configuration for the DestinationWithSchemaExtraction middleware. Fields set to their zero value are ignored and will be set to the default value.
DestinationWithSchemaExtractionConfig can be used as a DestinationMiddlewareOption.
func (DestinationWithSchemaExtractionConfig) Apply ¶ added in v0.10.0
func (c DestinationWithSchemaExtractionConfig) Apply(m DestinationMiddleware)
Apply sets the default configuration for the DestinationWithSchemaExtraction middleware.
func (DestinationWithSchemaExtractionConfig) SchemaKeyEnabledParameterName ¶ added in v0.10.0
func (c DestinationWithSchemaExtractionConfig) SchemaKeyEnabledParameterName() string
func (DestinationWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName ¶ added in v0.10.0
func (c DestinationWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName() string
type Encoder ¶ added in v0.5.0
type Encoder interface { Name() string Configure(options map[string]string) (Encoder, error) Encode(r any) ([]byte, error) }
Encoder is a type that can encode a random struct into a byte slice. It's used in destination connectors to encode records into different formats (e.g. JSON, Avro etc.).
type GenerateDataType ¶ added in v0.3.0
type GenerateDataType int
GenerateDataType is used in acceptance tests to control what data type will be generated.
const ( GenerateMixedData GenerateDataType = iota GenerateRawData GenerateStructuredData )
type GenericRecordSerializer ¶ added in v0.10.0
GenericRecordSerializer is a serializer that uses a Converter and Encoder to serialize a record.
func (GenericRecordSerializer) Configure ¶ added in v0.10.0
func (rf GenericRecordSerializer) Configure(optRaw string) (RecordSerializer, error)
func (GenericRecordSerializer) Name ¶ added in v0.10.0
func (rf GenericRecordSerializer) Name() string
Name returns the name of the record serializer combined from the converter name and encoder name.
type JSONEncoder ¶ added in v0.5.0
type JSONEncoder struct{}
JSONEncoder is an Encoder that outputs JSON.
func (JSONEncoder) Configure ¶ added in v0.5.0
func (e JSONEncoder) Configure(map[string]string) (Encoder, error)
func (JSONEncoder) Name ¶ added in v0.5.0
func (e JSONEncoder) Name() string
type OpenCDCConverter ¶ added in v0.5.0
type OpenCDCConverter struct{}
OpenCDCConverter outputs an OpenCDC record (it does not change the structure of the record).
func (OpenCDCConverter) Configure ¶ added in v0.5.0
func (c OpenCDCConverter) Configure(map[string]string) (Converter, error)
func (OpenCDCConverter) Convert ¶ added in v0.5.0
func (c OpenCDCConverter) Convert(r opencdc.Record) (any, error)
func (OpenCDCConverter) Name ¶ added in v0.5.0
func (c OpenCDCConverter) Name() string
type RecordSerializer ¶ added in v0.10.0
type RecordSerializer interface { Name() string Configure(string) (RecordSerializer, error) opencdc.RecordSerializer }
RecordSerializer is a type that can format a record to bytes. It's used in destination connectors to change the output structure and format.
type Source ¶
type Source interface { // Parameters is a map of named Parameters that describe how to configure // the Source. Parameters() config.Parameters // Configure is the first function to be called in a connector. It provides the // connector with the configuration that needs to be validated and stored. // In case the configuration is not valid it should return an error. // Testing if your connector can reach the configured data source should be // done in Open, not in Configure. // The connector SDK will sanitize, apply defaults and validate the // configuration before calling this function. This means that the // configuration will always contain all keys defined in Parameters // (unprovided keys will have their default values) and all non-empty // values will be of the correct type. Configure(context.Context, config.Config) error // Open is called after Configure to signal the plugin it can prepare to // start producing records. If needed, the plugin should open connections in // this function. The position parameter will contain the position of the // last record that was successfully processed, Source should therefore // start producing records after this position. The context passed to Open // will be cancelled once the plugin receives a stop signal from Conduit. Open(context.Context, opencdc.Position) error // Read returns a new Record and is supposed to block until there is either // a new record or the context gets cancelled. It can also return the error // ErrBackoffRetry to signal to the SDK it should call Read again with a // backoff retry. // If Read receives a cancelled context or the context is cancelled while // Read is running it must stop retrieving new records from the source // system and start returning records that have already been buffered. If // there are no buffered records left Read must return the context error to // signal a graceful stop. If Read returns ErrBackoffRetry while the context // is cancelled it will also signal that there are no records left and Read // won't be called again. // After Read returns an error the function won't be called again (except if // the error is ErrBackoffRetry, as mentioned above). // Read can be called concurrently with Ack. Read(context.Context) (opencdc.Record, error) // ReadN is the same as Read, but returns a batch of records. The connector // is expected to return at most n records. If there are fewer records // available, it should return all of them. If there are no records available // it should block until there are records available or the context is // cancelled. If the context is cancelled while ReadN is running, it should // return the context error. ReadN(context.Context, int) ([]opencdc.Record, error) // Ack signals to the implementation that the record with the supplied // position was successfully processed. This method might be called after // the context of Read is already cancelled, since there might be // outstanding acks that need to be delivered. When Teardown is called it is // guaranteed there won't be any more calls to Ack. // Ack can be called concurrently with Read. Ack(context.Context, opencdc.Position) error // Teardown signals to the plugin that there will be no more calls to any // other function. After Teardown returns, the plugin should be ready for a // graceful shutdown. Teardown(context.Context) error // LifecycleOnCreated is called after Configure and before Open when the // connector is run for the first time. This call will be skipped if the // connector was already started before. This method can be used to do some // initialization that needs to happen only once in the lifetime of a // connector (e.g. create a logical replication slot). Anything that the // connector creates in this method is considered to be owned by this // connector and should be cleaned up in LifecycleOnDeleted. LifecycleOnCreated(ctx context.Context, config config.Config) error // LifecycleOnUpdated is called after Configure and before Open when the // connector configuration has changed since the last run. This call will be // skipped if the connector configuration did not change. It can be used to // update anything that was initialized in LifecycleOnCreated, in case the // configuration change affects it. LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error // LifecycleOnDeleted is called when the connector was deleted. It will be // the only method that is called in that case. This method can be used to // clean up anything that was initialized in LifecycleOnCreated. LifecycleOnDeleted(ctx context.Context, config config.Config) error // contains filtered or unexported methods }
Source fetches records from 3rd party resources and sends them to Conduit. All implementations must embed UnimplementedSource for forward compatibility.
func SourceWithMiddleware ¶ added in v0.3.0
func SourceWithMiddleware(s Source, middleware ...SourceMiddleware) Source
SourceWithMiddleware wraps the source into the supplied middleware.
type SourceMiddleware ¶ added in v0.3.0
SourceMiddleware wraps a Source and adds functionality to it.
func DefaultSourceMiddleware ¶ added in v0.3.0
func DefaultSourceMiddleware(opts ...SourceMiddlewareOption) []SourceMiddleware
DefaultSourceMiddleware returns a slice of middleware that should be added to all sources unless there's a good reason not to.
type SourceMiddlewareOption ¶ added in v0.10.0
type SourceMiddlewareOption interface {
Apply(SourceMiddleware)
}
SourceMiddlewareOption can be used to change the behavior of the default source middleware created with DefaultSourceMiddleware.
type SourceUtil ¶ added in v0.3.0
type SourceUtil struct{}
SourceUtil provides utility methods for implementing a source. Use it by calling Util.Source.*.
func (SourceUtil) NewRecordCreate ¶ added in v0.3.0
func (SourceUtil) NewRecordCreate( position opencdc.Position, metadata opencdc.Metadata, key opencdc.Data, payload opencdc.Data, ) opencdc.Record
NewRecordCreate can be used to instantiate a record with OperationCreate.
func (SourceUtil) NewRecordDelete ¶ added in v0.3.0
func (SourceUtil) NewRecordDelete( position opencdc.Position, metadata opencdc.Metadata, key opencdc.Data, payloadBefore opencdc.Data, ) opencdc.Record
NewRecordDelete can be used to instantiate a record with OperationDelete.
func (SourceUtil) NewRecordSnapshot ¶ added in v0.3.0
func (SourceUtil) NewRecordSnapshot( position opencdc.Position, metadata opencdc.Metadata, key opencdc.Data, payload opencdc.Data, ) opencdc.Record
NewRecordSnapshot can be used to instantiate a record with OperationSnapshot.
func (SourceUtil) NewRecordUpdate ¶ added in v0.3.0
func (SourceUtil) NewRecordUpdate( position opencdc.Position, metadata opencdc.Metadata, key opencdc.Data, payloadBefore opencdc.Data, payloadAfter opencdc.Data, ) opencdc.Record
NewRecordUpdate can be used to instantiate a record with OperationUpdate.
type SourceWithBatch ¶ added in v0.11.1
type SourceWithBatch struct {
Config SourceWithBatchConfig
}
SourceWithBatch adds support for batching on the source. It adds two parameters to the source config:
- `sdk.batch.size` - Maximum size of batch before it gets written to the source.
- `sdk.batch.delay` - Maximum delay before an incomplete batch is written to the source.
To change the defaults of these parameters use the fields of this struct.
func (*SourceWithBatch) Wrap ¶ added in v0.11.1
func (s *SourceWithBatch) Wrap(impl Source) Source
Wrap a Source into the batching middleware.
type SourceWithBatchConfig ¶ added in v0.11.1
type SourceWithBatchConfig struct { // BatchSize is the default value for the batch size. BatchSize *int // BatchDelay is the default value for the batch delay. BatchDelay *time.Duration }
SourceWithBatchConfig is the configuration for the SourceWithBatch middleware. Fields set to their zero value are ignored and will be set to the default value.
SourceWithBatchConfig can be used as a SourceMiddlewareOption.
func (SourceWithBatchConfig) Apply ¶ added in v0.11.1
func (c SourceWithBatchConfig) Apply(m SourceMiddleware)
Apply sets the default configuration for the SourceWithBatch middleware.
func (SourceWithBatchConfig) BatchDelayParameterName ¶ added in v0.11.1
func (c SourceWithBatchConfig) BatchDelayParameterName() string
func (SourceWithBatchConfig) BatchSizeParameterName ¶ added in v0.11.1
func (c SourceWithBatchConfig) BatchSizeParameterName() string
type SourceWithEncoding ¶ added in v0.11.1
type SourceWithEncoding struct{}
SourceWithEncoding is a middleware that encodes the record payload and key with the provided schema. The schema is registered with the schema service and the schema subject is attached to the record metadata.
func (SourceWithEncoding) Apply ¶ added in v0.11.1
func (s SourceWithEncoding) Apply(SourceMiddleware)
func (SourceWithEncoding) Wrap ¶ added in v0.11.1
func (s SourceWithEncoding) Wrap(impl Source) Source
type SourceWithSchemaContext ¶ added in v0.10.0
type SourceWithSchemaContext struct {
Config SourceWithSchemaContextConfig
}
SourceWithSchemaContext is a middleware that makes it possible to configure the schema context for records read by a source.
func (*SourceWithSchemaContext) Wrap ¶ added in v0.10.0
func (s *SourceWithSchemaContext) Wrap(impl Source) Source
Wrap a Source into the schema middleware. It will apply default configuration values if they are not explicitly set.
type SourceWithSchemaContextConfig ¶ added in v0.10.0
SourceWithSchemaContextConfig is the configuration for the SourceWithSchemaContext middleware. Fields set to their zero value are ignored and will be set to the default value.
SourceWithSchemaContextConfig can be used as a SourceMiddlewareOption.
func (SourceWithSchemaContextConfig) Apply ¶ added in v0.10.0
func (c SourceWithSchemaContextConfig) Apply(m SourceMiddleware)
Apply sets the default configuration for the SourceWithSchemaExtraction middleware.
func (SourceWithSchemaContextConfig) EnabledParameterName ¶ added in v0.10.0
func (c SourceWithSchemaContextConfig) EnabledParameterName() string
func (SourceWithSchemaContextConfig) NameParameterName ¶ added in v0.10.0
func (c SourceWithSchemaContextConfig) NameParameterName() string
type SourceWithSchemaExtraction ¶ added in v0.10.0
type SourceWithSchemaExtraction struct {
Config SourceWithSchemaExtractionConfig
}
SourceWithSchemaExtraction is a middleware that extracts a record's payload and key schemas. The schema is extracted from the record data for each record produced by the source. The schema is registered with the schema service and the schema subject is attached to the record metadata.
func (*SourceWithSchemaExtraction) Wrap ¶ added in v0.10.0
func (s *SourceWithSchemaExtraction) Wrap(impl Source) Source
Wrap a Source into the schema middleware. It will apply default configuration values if they are not explicitly set.
type SourceWithSchemaExtractionConfig ¶ added in v0.10.0
type SourceWithSchemaExtractionConfig struct { // The type of the payload schema. Defaults to Avro. SchemaType schema.Type // Whether to extract and encode the record payload with a schema. // If unset, defaults to true. PayloadEnabled *bool // The subject of the payload schema. If unset, defaults to "payload". PayloadSubject *string // Whether to extract and encode the record key with a schema. // If unset, defaults to true. KeyEnabled *bool // The subject of the key schema. If unset, defaults to "key". KeySubject *string }
SourceWithSchemaExtractionConfig is the configuration for the SourceWithSchemaExtraction middleware. Fields set to their zero value are ignored and will be set to the default value.
SourceWithSchemaExtractionConfig can be used as a SourceMiddlewareOption.
func (SourceWithSchemaExtractionConfig) Apply ¶ added in v0.10.0
func (c SourceWithSchemaExtractionConfig) Apply(m SourceMiddleware)
Apply sets the default configuration for the SourceWithSchemaExtraction middleware.
func (SourceWithSchemaExtractionConfig) SchemaKeyEnabledParameterName ¶ added in v0.10.0
func (c SourceWithSchemaExtractionConfig) SchemaKeyEnabledParameterName() string
func (SourceWithSchemaExtractionConfig) SchemaKeySubjectParameterName ¶ added in v0.10.0
func (c SourceWithSchemaExtractionConfig) SchemaKeySubjectParameterName() string
func (SourceWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName ¶ added in v0.10.0
func (c SourceWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName() string
func (SourceWithSchemaExtractionConfig) SchemaPayloadSubjectParameterName ¶ added in v0.10.0
func (c SourceWithSchemaExtractionConfig) SchemaPayloadSubjectParameterName() string
func (SourceWithSchemaExtractionConfig) SchemaTypeParameterName ¶ added in v0.10.0
func (c SourceWithSchemaExtractionConfig) SchemaTypeParameterName() string
type Specification ¶
type Specification pconnector.Specification
Specification contains general information regarding the plugin like its name and what it does.
type TemplateRecordSerializer ¶ added in v0.10.0
type TemplateRecordSerializer struct {
// contains filtered or unexported fields
}
TemplateRecordSerializer is a RecordSerializer that serializes a record using a Go template.
func (TemplateRecordSerializer) Configure ¶ added in v0.10.0
func (e TemplateRecordSerializer) Configure(tmpl string) (RecordSerializer, error)
func (TemplateRecordSerializer) Name ¶ added in v0.10.0
func (e TemplateRecordSerializer) Name() string
type UnimplementedDestination ¶
type UnimplementedDestination struct{}
UnimplementedDestination should be embedded to have forward compatible implementations.
func (UnimplementedDestination) Configure ¶
Configure needs to be overridden in the actual implementation.
func (UnimplementedDestination) LifecycleOnCreated ¶ added in v0.6.0
LifecycleOnCreated won't do anything by default.
func (UnimplementedDestination) LifecycleOnDeleted ¶ added in v0.6.0
LifecycleOnDeleted won't do anything by default.
func (UnimplementedDestination) LifecycleOnUpdated ¶ added in v0.6.0
func (UnimplementedDestination) LifecycleOnUpdated(context.Context, config.Config, config.Config) error
LifecycleOnUpdated won't do anything by default.
func (UnimplementedDestination) Open ¶
func (UnimplementedDestination) Open(context.Context) error
Open needs to be overridden in the actual implementation.
func (UnimplementedDestination) Parameters ¶ added in v0.3.0
func (UnimplementedDestination) Parameters() config.Parameters
Parameters needs to be overridden in the actual implementation.
type UnimplementedSource ¶
type UnimplementedSource struct{}
UnimplementedSource should be embedded to have forward compatible implementations.
func (UnimplementedSource) Ack ¶
Ack should be overridden if acks need to be forwarded to the source, otherwise it is optional.
func (UnimplementedSource) Configure ¶
Configure needs to be overridden in the actual implementation.
func (UnimplementedSource) LifecycleOnCreated ¶ added in v0.6.0
LifecycleOnCreated won't do anything by default.
func (UnimplementedSource) LifecycleOnDeleted ¶ added in v0.6.0
LifecycleOnDeleted won't do anything by default.
func (UnimplementedSource) LifecycleOnUpdated ¶ added in v0.6.0
LifecycleOnUpdated won't do anything by default.
func (UnimplementedSource) Parameters ¶ added in v0.3.0
func (UnimplementedSource) Parameters() config.Parameters
Parameters needs to be overridden in the actual implementation.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package kafkaconnect contains utility functions and structures for processing Kafka Connect compatible data.
|
Package kafkaconnect contains utility functions and structures for processing Kafka Connect compatible data. |