generic

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2021 License: MIT Imports: 10 Imported by: 0

README

Generic Sink utilities

Sinks should be spoken about in abstract, as managing the flow of data from the changelog into a sink is complex, and we prefer to create reliable abstractions once that can be used everywhere.

With this focus, the generic package provides composable sink constructors.

Documentation

Overview

Suite of components that can be composed to make sinks. By extracting complex, non-sink specific behaviour, we minimise the effort involved in implementing a new sink while ensuring each sink has consistent semantics.

Index

Constants

View Source
const (
	SchemaHandlerFailed SchemaHandlerOutcome = "failed" // configuration failed
	SchemaHandlerNoop                        = "noop"   // nothing was changed, no action required
	SchemaHandlerUpdate                      = "update" // sink was updated, the returned inserter takes precedence
)

Variables

View Source
var EmptyInsertResult = NewInsertResult().Resolve(0, nil, nil)

EmptyInsertResult represents an insertion that did no work. It can be used as the base element when recursively folding insertions together.

View Source
var SinkBuilder = sinkBuilderFunc(func(opts ...func(*sink)) Sink {
	s := &sink{
		builders: []func(AsyncInserter) AsyncInserter{},
		router:   NewRouter(),
	}

	for _, opt := range opts {
		opt(s)
	}

	return s
})

SinkBuilder allows sink implementations to easily compose the sink-specific implementations into a generic sink implementation that fulfils the Sink contract.

Functions

func NewInsertResult

func NewInsertResult() *insertResult

Types

type AckCallback

type AckCallback func(changelog.Entry)

AckCallback will acknowledge successful publication of up-to this message. It is not guaranteed to be called for any intermediate messages.

type AsyncInserter

type AsyncInserter interface {
	// Insert has the same signature as the synchronous Inserter, but returns an
	// InsertResult that will be fulfilled at some later time.
	Insert(context.Context, []*changelog.Modification) InsertResult

	// Flush is called to force a write of buffered chanelog modifications to the underlying
	// inserter. The returned result resolves with the sum of rows inserted, and the highest
	// LSN successfully written, if any that were flushed had an associated LSN.
	Flush(context.Context) InsertResult
}

func NewAsyncInserter

func NewAsyncInserter(i Inserter) AsyncInserter

NewAsyncInserter converts a synchronous inserter to the async contract

func NewBufferedInserter

func NewBufferedInserter(i AsyncInserter, bufferSize int) AsyncInserter

NewBufferedInserter wraps any async inserter with a buffer. When an insertion overflows the buffer, the insert is triggered is passed to the underlying inserter. Calling Flush will also push buffered inserts into the underlying system.

type InsertResult

type InsertResult interface {
	Get(context.Context) (count int, lsn *uint64, err error)
	Fold(InsertResult) InsertResult
}

InsertResult is a promise interface for async insert operations. Get() will block until a value is fulfilled, while Fold() will merge the result of another InsertResult, taking the higher of the two lsns (if provided)

type Inserter

type Inserter interface {
	// Insert receives changelog modifications to insert into a table. It returns the count
	// of rows inserted, and the highest non-nil LSN from the batch of modification
	// confirmed to be written to the sink
	Insert(context.Context, []*changelog.Modification) (count int, lsn *uint64, err error)
}

Inserter provides a synchronous interface around inserting data into a sink

func NewInstrumentedInserter

func NewInstrumentedInserter(route Route, i Inserter) Inserter

NewInstrumentedInserter wraps an existing synchronous inserter, causing every insert to be logged, capture batch size and duration in metrics, and create new spans.

type InserterFunc

type InserterFunc func(context.Context, []*changelog.Modification) (count int, lsn *uint64, err error)

InserterFunc can create an Inserter from an anonymous function.

func (InserterFunc) Insert

func (i InserterFunc) Insert(ctx context.Context, modifications []*changelog.Modification) (count int, lsn *uint64, err error)

type MemoryInserter

type MemoryInserter struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MemoryInserter is a reference implementation of an inserter, storing modifications in an in-memory buffer. It satisfies all requirements of an inserter, including race-safety.

Beyond offering a useful reference implementation, this can be used for testing generic inserter logic without being coupled to an actual backend.

func NewMemoryInserter

func NewMemoryInserter() *MemoryInserter

func (*MemoryInserter) Batches

func (i *MemoryInserter) Batches() [][]*changelog.Modification

func (*MemoryInserter) Insert

func (i *MemoryInserter) Insert(ctx context.Context, modifications []*changelog.Modification) (count int, lsn *uint64, err error)

func (*MemoryInserter) Store

func (i *MemoryInserter) Store() []*changelog.Modification

type Route

type Route string

Route is a string representing a routing key for incoming modifications. Inserters are associated with these routes.

type Router

type Router interface {
	// Register notifies the router that all subsequent insertions for the given namespace
	// should be routed to a new inserter. Register returns an InsertResult from flushing
	// any inserters that were previously routed via this namespace. Any previous inserters
	// flush is added to the routers in-flight result, preserving the semantics of flush to
	// ensure we appropriately handle failed flushes.
	Register(context.Context, Route, AsyncInserter) InsertResult

	// Otherwise, a router looks exactly like a normal AsyncInserter. It should be
	// transparent that each insert is routed to other inserters, and it should be possible
	// to compose this with other inserter constructs.
	AsyncInserter
}

func NewRouter

func NewRouter() Router

type SchemaHandler

type SchemaHandler interface {
	Handle(context.Context, *changelog.Schema) (Inserter, SchemaHandlerOutcome, error)
}

SchemaHandler responds to new schemas by idempotently updating and configuring the Sink to receive corresponding modifications. It returns an Inserter that can be used to handle modification associated with the given schema.

func SchemaHandlerCacheOnFingerprint

func SchemaHandlerCacheOnFingerprint(handler SchemaHandler) SchemaHandler

SchemaHandlerCacheOnFingerprint caches schema handler responses on the fingerprint of the received schema. This means any subsequent identical schemas are provided the old, cached version of the previous handler call.

func SchemaHandlerGlobalInserter

func SchemaHandlerGlobalInserter(inserter Inserter, schemaHandler func(context.Context, *changelog.Schema) error) SchemaHandler

SchemaHandlerGlobalInserter is used to register a single global inserter for all modifications to this sink, along with a handler function that is used to respond to new schemas but is not expected to return a modified inserter.

type SchemaHandlerFunc

type SchemaHandlerFunc func(context.Context, *changelog.Schema) (Inserter, SchemaHandlerOutcome, error)

SchemaHandlerFunc is shorthand for creating a handler from a function

func (SchemaHandlerFunc) Handle

type SchemaHandlerOutcome

type SchemaHandlerOutcome string

type Sink

type Sink interface {
	Consume(context.Context, changelog.Changelog, AckCallback) error
}

Sink is a generic sink destination for a changelog. It will consume entries until either an error, or the entries run out.

If the process producing the changelog is long-running, then the AckCallback is used to acknowledge successfully writes into the sync. If you to wait for all writes to be completely processed to the sync, then wait for Consume to return.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL