conveyor

package
v0.0.0-...-1f15b0d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package conveyor delivers mutations to target.

Index

Constants

This section is empty.

Variables

Set is used by Wire.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Switch between BestEffort mode and Core if the applied resolved
	// timestamp is older than this threshold. A negative or zero value
	// will disable BestEffort switching.
	BestEffortWindow time.Duration

	// Force the use of BestEffort mode.
	BestEffortOnly bool

	// Write directly to staging tables. May limit compatibility with
	// schemas that contain foreign keys.
	Immediate bool
}

Config defines the behavior for a Conveyor.

func (*Config) Bind

func (c *Config) Bind(f *pflag.FlagSet)

Bind adds configuration flags to the set.

func (*Config) Preflight

func (c *Config) Preflight() error

Preflight ensures the Config is in a known-good state.

type Conveyor

type Conveyor struct {
	// contains filtered or unexported fields
}

A Conveyor delivers mutations to a target, possibly asynchronously. It provides an abstraction over various delivery strategies and it manages checkpoints across multiple partitions for a table group.

func (*Conveyor) AcceptMultiBatch

func (c *Conveyor) AcceptMultiBatch(
	ctx context.Context, batch *types.MultiBatch, options *types.AcceptOptions,
) error

AcceptMultiBatch transmits the batch. The options may be nil.

func (*Conveyor) Advance

func (c *Conveyor) Advance(ctx context.Context, partition ident.Ident, ts hlc.Time) error

Advance the checkpoint for all the named partitions.

func (*Conveyor) Ensure

func (c *Conveyor) Ensure(ctx context.Context, partitions []ident.Ident) error

Ensure that a checkpoint exists for all named partitions.

func (*Conveyor) Range

func (c *Conveyor) Range() *notify.Var[hlc.Range]

Range returns the range of resolved timestamps to be processed.

func (*Conveyor) Refresh

func (c *Conveyor) Refresh()

Refresh is used for testing to refresh the checkpoint.

func (*Conveyor) Stat

func (c *Conveyor) Stat() *notify.Var[sequencer.Stat]

Stat returns the progress of all tables being managed.

func (*Conveyor) TableGroup

func (c *Conveyor) TableGroup() *types.TableGroup

TableGroup returns the TableGroup associated to this conveyor.

func (*Conveyor) Watcher

func (c *Conveyor) Watcher() types.Watcher

Watcher is used for testing to gain access to the underlying schema.

type Conveyors

type Conveyors struct {
	// contains filtered or unexported fields
}

Conveyors manages the plumbing necessary to deliver mutations to a target schema across multiple partitions. It is also responsible for mode-switching.

func ProvideConveyors

func ProvideConveyors(
	ctx *stopper.Context,
	acc *apply.Acceptor,
	cfg *Config,
	checkpoints *checkpoint.Checkpoints,
	retire *retire.Retire,
	sw *switcher.Switcher,
	watchers types.Watchers,
) (*Conveyors, error)

ProvideConveyors is called by Wire.

func (*Conveyors) Bootstrap

func (c *Conveyors) Bootstrap() error

Bootstrap existing schemas for recovery cases.

func (*Conveyors) Get

func (c *Conveyors) Get(schema ident.Schema) (*Conveyor, error)

Get returns a conveyor for a specific schema.

func (*Conveyors) WithKind

func (c *Conveyors) WithKind(kind string) *Conveyors

WithKind returns a new Conveyors factory for the named kind.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL