Documentation
¶
Index ¶
- type CDC
- type Change
- type Changes
- type ChangesHandler
- type ChangesHandlerFunc
- type Engine
- type Installer
- type Operation
- type Option
- type Signal
- type SignalEvent
- type TriggerEngine
- func (c *TriggerEngine) Bootstrap(ctx context.Context) error
- func (c *TriggerEngine) BootstrapAndCDC(ctx context.Context) error
- func (c *TriggerEngine) CDC(ctx context.Context) error
- func (c *TriggerEngine) Close(ctx context.Context) error
- func (c *TriggerEngine) Setup(ctx context.Context) error
- func (c *TriggerEngine) Teardown(ctx context.Context) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CDC ¶
CDC represents a complete implementation of a CDC for SQLite.
Installer methods should not be called if any Engine methods are running. Only one Engine method may be called per instance of CDC with the exception of Close which may be called at any time to end a running operation. The instance is no longer valid after an Engine method completes.
func NewTriggerEngine ¶
func NewTriggerEngine(db *sql.DB, handler ChangesHandler, tables []string, options ...Option) (CDC, error)
NewTriggerEngine returns a CDC implementation based on table triggers.
This implementation works with any SQLite driver and uses only SQL operations to implement CDC. For each specified table to monitor, the implementation creates triggers for AFTER INSERT, AFTER UPDATE, and AFTER DELETE. These triggers populate a log table, named __cdc_log by default. The log table entries contain effectively identical information as the Change struct.
The before and after images are stored as JSON objects in the log table. The JSON objects are generated from the column names and values in the table.
See the TriggerEngine documentation for more details.
type Change ¶
type Change struct { Table string `json:"table"` Timestamp time.Time `json:"timestamp"` Operation Operation `json:"operation"` Before json.RawMessage `json:"before"` After json.RawMessage `json:"after"` }
type ChangesHandler ¶
ChangesHandler implementations are given batches of database changes. Each batch of changes is in the same order as the CDC log table. Batches may contain changes from multiple tables.
If a handler returns an error then the entire batch is considered failed and retried. If a handler returns nil then the entire batch is considered successful and the relevant entries are removed from the CDC log table.
type ChangesHandlerFunc ¶
ChangesHandlerFunc is an adaptor to allow the use of ordinary functions as ChangesHandler implementations. Note that you should not use this type directly and should instead always target the ChangesHandler type. For example, the appropriate use of this adaptor is:
var handler ChangesHandler = ChangesHandlerFunc(func(changes Changes) error { // handle changes })
func (ChangesHandlerFunc) HandleChanges ¶
func (fn ChangesHandlerFunc) HandleChanges(ctx context.Context, changes Changes) error
type Engine ¶
type Engine interface { // CDC-only mode processes changes made to the database. // // This mode only operates on changes that have been captured by the CDC // implementation and does not process any unchanged records. This means // that CDC-only mode does not process existing, unmodified records like // Bootstrap does. // // This mode runs until stopped or it encounters an error. CDC(ctx context.Context) error // Bootstrap-only mode processes all existing records in the database. // // This mode only operates on the current state of existing records and does // not process any captured changes. All existing records are processed as // though they are INSERT operations. For convenience, the first change // handled by bootstrap mode for any table is always a BOOTSTRAP operation // with an empty before and after image. This signal may be used in systems // that manage state based on the stream of captured changes to indicate // that the previously accumulated state is likely invalid and must be // re-built from the bootstrap data. // // This mode runs until it completes a scan of each table and then exits. Bootstrap(ctx context.Context) error // Bootstrap-and-CDC mode is a combination of the bootstrap and CDC modes. // // This mode starts with a bootstrap and then enters CDC mode once it is // complete. Any changes to data during hte bootstrap are captured and // emitted once the engine enters CDC mode. // // This mode runs until stopped or it encounters an error. BootstrapAndCDC(ctx context.Context) error // Stop any ongoing CDC operations and shut down. Close(ctx context.Context) error }
Engine represents the change handling portion of a CDC implementation.
Only one of these methods may be called per instance of CDC with the exception of Close which may be called at any time to end a running operation. Once any of these methods returns then the instance is no longer valid.
type Installer ¶
type Installer interface { // Perform any setup necessary to support CDC. Setup(ctx context.Context) error // Perform any teardown necessary to remove CDC. Teardown(ctx context.Context) error }
Installer is an optional set of methods that a CDC implementation may offer.
These methods must be present but may be no-ops if an implementation has no setup or teardown requirements. Generally, the setup method should be called before any Engine methods.
type Option ¶
type Option func(*TriggerEngine) error
func WithBlobSupport ¶
WithBlobSupport can enable or disable the storage of BLOB columns in the log table. This defaults to false because of the performance impacts of encoding BLOB type data.
func WithLogTableName ¶
WithLogTableName specifies the name of the log table. This defaults to __cdc_log but may be customized if needed.
func WithMaxBatchSize ¶
WithMaxBatchSize specifies the maximum number of changes to process in a single batch. This defaults to 50.
func WithSignal ¶
WithSignal installs a custom awakening signal that triggers the inspection of the log table when in CDC mode. The default signal is a combination of a filesystem watcher that signals when the SQLite files have changed and a 250ms timer used as a backstop for any missed filesystem events.
func WithoutSubsecondTime ¶
WithoutSubsecondTime can disable the use of subsecond timestamps in the log table. This is only needed for old versions of SQLite and should be avoided otherwise.
type Signal ¶
type Signal interface { // Waker returns a channel that will receive wake signals. If the channel // is closed by the Signal then the Signal has entered a terminal state. Waker() <-chan SignalEvent // Start implementations establish any state required to generate wake // signals. Long running tasks, such as continuous monitoring, should be // started in goroutines so that this method does not block. Start(ctx context.Context) error // Close releases any resources used by the Signal including any goroutines // started. Once a Signal is closed then it is in a terminal state and // cannot be re-used. Close() error }
Signal implementations tell a CDC engine when to awaken while in CDC mode in order to process new changes.
func NewChannelSignal ¶
func NewChannelSignal(input <-chan SignalEvent) (Signal, error)
NewChannelSignal returns a Signal implementation that proxies events from the given channel to the waker channel. This is useful for awakening based on external triggers.
func NewFSNotifySignal ¶
NewFSNotifySignal creates a new Signal implementation that uses filesystem notifications to detect changes. This implementation uses the given database client to determine the path of the main SQLite database as well as any supplemental files such as the WAL when in WAL mode.
func NewMultiSignal ¶
NewMultiSignal returns a Signal implementation that combines multiple signals into a single channel.
type SignalEvent ¶
SignalEvent indicates that a Signal has been triggered.
An engine should only check for changes if the Wake field is true. A non-nil error represents a terminal error for the Signal.
type TriggerEngine ¶
type TriggerEngine struct {
// contains filtered or unexported fields
}
TriggerEngine implements CDC using table triggers.
This implementation targets a specified set of tables and captures changes by using AFTER triggers to populate a change log table. The setup and teardown methods manage both the triggers and the log table. Currently, all target tables must be set up and torn down together and cannot be targeted individually.
The bootstrap mode is implemented by selecting batches of records from target tables. These are passed through to the bound ChangesHandler as they are selected. Each table bootstrap begins with the specified BOOTSTRAP operation event. Because this implementation of CDC uses table triggers and a persistent chang log table, a bootstrap is usually only needed once after running setup. If your system encounters a critical fault and needs to rebuild state from a bootstrap then you can safely run bootstrap again. However, subsequent runs of bootstrap mode do not clear the change log table.
The cdc mode is implemented by selecting batches of records from the change log table. The order of the log selection matches the natural sort order of the table which, itself, matches the order in which changes were made to the data. The frequency with which cdc mode checks for changes is determined by the bound Signal implementation. The default signal is a combination of a filesystem watcher and a time based interval. The filesystem watcher detects changes to the underlying SQLite files with the intent to handle changes as quickly as possible once they are persisted. However, the filesystem watcher is not infallible so a time based interval signal is included as a backstop. Generally, you are recommended to have some form of time based interval signal to augment any other signal choices.
By default, all change log entries are recorded with a millisecond precision timestamp. This precision is only available in SQLite 3.42.0 and later. If any system accessing the SQLite database is older than 3.42.0 then you must disable the subsecond timestamp with the WithoutSubsecondTime option.
By default, support for BLOB data is disabled and BLOB type columns are not included in change log records due to the performance impacts of encoding BLOB type data. If you need to handle BLOB type data then you must enable BLOB support with the WithBlobSupport option. Note, however, that this implementations identification of BLOB data is based on the declared column type and not the underlying data type. Any BLOB data in a non-BLOB column will cause a fault in this implementation. You are strongly recommended to use STRICT tables to avoid accidental BLOB data in a non-BLOB column.
func (*TriggerEngine) BootstrapAndCDC ¶
func (c *TriggerEngine) BootstrapAndCDC(ctx context.Context) error