workflow

package
v0.12.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2022 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Overview

Package workflow defines types and functions for working with Vitess workflows.

This is still a very rough sketch, far from a final API, but I want to document some things here as I go:

(1) The lines between package workflow and package workflow/vexec are, uh,

blurry at best, and definitely need serious thinking and refinement. Maybe
there shouldn't even be two separate packages at all. The reason I have the
two packages right now is because I'm operating under the assumption that
there are workflows that are vexec, and then there are other workflows. If
it's true that all workflows are vexec workflows, then probably one single
package could make more sense. For now, two packages seems the way to go,
but like I said, the boundaries are blurry, and things that belong in one
package are in the other, because I haven't gone back and moved things
around.

(2) I'm aiming for this to be a drop-in replacement (more or less) for the

function calls in go/vt/wrangler. However, I'd rather define a better
abstraction if it means having to rewrite even significant portions of the
existing wrangler code to adapt to it, than make a subpar API in the name of
backwards compatibility. I'm not sure if that's a tradeoff I'll even need to
consider in the future, but I'm putting a stake in the ground on which side
of that tradeoff I intend to fall, should it come to it.

(3) Eventually we'll need to consider how the online schema migration workflows

fit into this. I'm trying to at least be somewhat abstract in the
vexec / queryplanner APIs to fit with the QueryParams thing that wrangler
uses, which _should_ work, but who knows?? Time will tell.

Index

Constants

View Source
const (
	StreamTypeUnknown = StreamType(iota)
	StreamTypeSharded
	StreamTypeReference
)

StreamType values.

View Source
const (
	DirectionForward = TrafficSwitchDirection(iota)
	DirectionBackward
)

The following constants define the switching direction.

View Source
const (
	DropTable = TableRemovalType(iota)
	RenameTable
)

The following consts define if DropSource will drop or rename the table.

Variables

View Source
var (
	// ErrInvalidWorkflow is a catchall error type for conditions that should be
	// impossible when operating on a workflow.
	ErrInvalidWorkflow = errors.New("invalid workflow")
	// ErrMultipleSourceKeyspaces occurs when a workflow somehow has multiple
	// source keyspaces across different shard primaries. This should be
	// impossible.
	ErrMultipleSourceKeyspaces = errors.New("multiple source keyspaces for a single workflow")
	// ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple
	// target keyspaces across different shard primaries. This should be
	// impossible.
	ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow")
)
View Source
var (
	// ErrNoStreams occurs when no target streams are found for a workflow in a
	// target keyspace.
	ErrNoStreams = errors.New("no streams found")
)
View Source
var (
	// Frozen is the message value of frozen vreplication streams.
	Frozen = "FROZEN"
)

Functions

func HashStreams added in v0.11.0

func HashStreams(targetKeyspace string, targets map[string]*MigrationTarget) int64

HashStreams produces a stable hash based on the target keyspace and migration targets.

func ReverseWorkflowName added in v0.11.0

func ReverseWorkflowName(workflow string) string

ReverseWorkflowName returns the "reversed" name of a workflow. For a "forward" workflow, this is the workflow name with "_reversed" appended, and for a "reversed" workflow, this is the workflow name with the "_reversed" suffix removed.

func StreamMigratorFinalize added in v0.11.0

func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows []string) error

StreamMigratorFinalize finalizes the stream migration.

(TODO:@ajm88) in the original implementation, "it's a standalone function because it does not use the streamMigrater state". That's still true, but moving this to a method on StreamMigrator would provide a cleaner namespacing in package workflow. But, we would have to update more callers in order to do that (*wrangler.switcher's streamMigrateFinalize would need to change its signature to also take a *workflow.StreamMigrator), so we will do that in a second PR.

Types

type ITrafficSwitcher added in v0.11.0

type ITrafficSwitcher interface {
	TopoServer() *topo.Server
	TabletManagerClient() tmclient.TabletManagerClient
	Logger() logutil.Logger
	// VReplicationExec here is used when we want the (*wrangler.Wrangler)
	// implementation, which does a topo lookup on the tablet alias before
	// calling the underlying TabletManagerClient RPC.
	VReplicationExec(ctx context.Context, alias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error)

	MigrationType() binlogdatapb.MigrationType
	ReverseWorkflowName() string
	SourceKeyspaceName() string
	SourceKeyspaceSchema() *vindexes.KeyspaceSchema
	WorkflowName() string

	ForAllSources(f func(source *MigrationSource) error) error
	ForAllTargets(f func(target *MigrationTarget) error) error
}

ITrafficSwitcher is a temporary hack to allow us to move streamMigrater out of package wrangler without also needing to move trafficSwitcher in the same changeset.

After moving TrafficSwitcher to this package, this type should be removed, and StreamMigrator should be updated to contain a field of type *TrafficSwitcher instead of ITrafficSwitcher.

type MigrationSource added in v0.11.0

type MigrationSource struct {
	Position  string
	Journaled bool
	// contains filtered or unexported fields
}

MigrationSource contains the metadata for each migration source.

func NewMigrationSource added in v0.11.0

func NewMigrationSource(si *topo.ShardInfo, primary *topo.TabletInfo) *MigrationSource

NewMigrationSource returns a MigrationSource for the given shard and primary.

(TODO|@ajm188): do we always want to start with (position:"", journaled:false)?

func (*MigrationSource) GetPrimary added in v0.11.0

func (source *MigrationSource) GetPrimary() *topo.TabletInfo

GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration source.

func (*MigrationSource) GetShard added in v0.11.0

func (source *MigrationSource) GetShard() *topo.ShardInfo

GetShard returns the *topo.ShardInfo for the migration source.

type MigrationTarget added in v0.11.0

type MigrationTarget struct {
	Sources  map[uint32]*binlogdatapb.BinlogSource
	Position string
	// contains filtered or unexported fields
}

MigrationTarget contains the metadata for each migration target.

func (*MigrationTarget) GetPrimary added in v0.11.0

func (target *MigrationTarget) GetPrimary() *topo.TabletInfo

GetPrimary returns the *topo.TabletInfo for the primary tablet of the migration target.

func (*MigrationTarget) GetShard added in v0.11.0

func (target *MigrationTarget) GetShard() *topo.ShardInfo

GetShard returns the *topo.ShardInfo for the migration target.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server provides an API to work with Vitess workflows, like vreplication workflows (MoveTables, Reshard, etc) and schema migration workflows.

NB: This is in alpha, and you probably don't want to depend on it (yet!). Currently, it provides only a read-only API to vreplication workflows. Write actions on vreplication workflows, and schema migration workflows entirely, are not yet supported, but planned.

func NewServer

func NewServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *Server

NewServer returns a new server instance with the given topo.Server and TabletManagerClient.

func (*Server) CheckReshardingJournalExistsOnTablet added in v0.11.0

func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error)

CheckReshardingJournalExistsOnTablet returns the journal (or an empty journal) and a boolean to indicate if the resharding_journal table exists on the given tablet.

(TODO:@ajm188) This should not be part of the final public API, and should be un-exported after all places in package wrangler that call this have been migrated over.

func (*Server) GetCellsWithShardReadsSwitched added in v0.11.0

func (s *Server) GetCellsWithShardReadsSwitched(
	ctx context.Context,
	keyspace string,
	si *topo.ShardInfo,
	tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error)

GetCellsWithShardReadsSwitched returns the topo cells partitioned into two slices: one with the cells where shard reads have been switched for the given tablet type and one with the cells where shard reads have not been switched for the given tablet type.

This function is for use in Reshard, and "switched reads" is defined as if any one of the source shards has the query service disabled in its tablet control record.

func (*Server) GetCellsWithTableReadsSwitched added in v0.11.0

func (s *Server) GetCellsWithTableReadsSwitched(
	ctx context.Context,
	keyspace string,
	table string,
	tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error)

GetCellsWithTableReadsSwitched returns the topo cells partitioned into two slices: one with the cells where table reads have been switched for the given tablet type and one with the cells where table reads have not been switched for the given tablet type.

This function is for use in MoveTables, and "switched reads" is defined as if the routing rule for a (table, tablet_type) is pointing to the target keyspace.

func (*Server) GetWorkflows

GetWorkflows returns a list of all workflows that exist in a given keyspace, with some additional filtering depending on the request parameters (for example, ActiveOnly=true restricts the search to only workflows that are currently running).

It has the same signature as the vtctlservicepb.VtctldServer's GetWorkflows rpc, and grpcvtctldserver delegates to this function.

type State added in v0.11.0

type State struct {
	Workflow       string
	SourceKeyspace string
	TargetKeyspace string
	WorkflowType   Type

	ReplicaCellsSwitched    []string
	ReplicaCellsNotSwitched []string

	RdonlyCellsSwitched    []string
	RdonlyCellsNotSwitched []string

	WritesSwitched bool
}

State represents the state of a workflow.

type StreamMigrator added in v0.11.0

type StreamMigrator struct {
	// contains filtered or unexported fields
}

func BuildStreamMigrator added in v0.11.0

func BuildStreamMigrator(ctx context.Context, ts ITrafficSwitcher, cancelMigrate bool) (*StreamMigrator, error)

BuildStreamMigrator creates a new StreamMigrator based on the given TrafficSwitcher.

func (*StreamMigrator) CancelMigration added in v0.11.0

func (sm *StreamMigrator) CancelMigration(ctx context.Context)

func (*StreamMigrator) MigrateStreams added in v0.11.0

func (sm *StreamMigrator) MigrateStreams(ctx context.Context) error

func (*StreamMigrator) StopStreams added in v0.11.0

func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error)

func (*StreamMigrator) Streams added in v0.11.0

func (sm *StreamMigrator) Streams() map[string][]*VReplicationStream

Streams returns a deep-copy of the StreamMigrator's streams map.

func (*StreamMigrator) Templates added in v0.11.0

func (sm *StreamMigrator) Templates() []*VReplicationStream

Templates returns a copy of the StreamMigrator's template streams.

type StreamType added in v0.11.0

type StreamType int

StreamType is an enum representing the kind of stream.

(TODO:@ajm188) This should be made package-private once the last references in package wrangler are removed.

type TableRemovalType added in v0.11.0

type TableRemovalType int

TableRemovalType specifies the way the a table will be removed during a DropSource for a MoveTables workflow.

func (TableRemovalType) String added in v0.11.0

func (trt TableRemovalType) String() string

String returns a string representation of a TableRemovalType

type TargetInfo added in v0.11.0

type TargetInfo struct {
	Targets        map[string]*MigrationTarget
	Frozen         bool
	OptCells       string
	OptTabletTypes string
}

TargetInfo contains the metadata for a set of targets involved in a workflow.

func BuildTargets added in v0.11.0

func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error)

BuildTargets collects MigrationTargets and other metadata (see TargetInfo) from a workflow in the target keyspace.

It returns ErrNoStreams if there are no targets found for the workflow.

type TrafficSwitchDirection added in v0.11.0

type TrafficSwitchDirection int

TrafficSwitchDirection specifies the switching direction.

type Type added in v0.11.0

type Type string

Type is the type of a workflow.

const (
	TypeReshard    Type = "Reshard"
	TypeMoveTables Type = "MoveTables"
)

Workflow types.

type VReplicationStream added in v0.11.0

type VReplicationStream struct {
	ID           uint32
	Workflow     string
	BinlogSource *binlogdatapb.BinlogSource
	Position     mysql.Position
}

VReplicationStream represents a single stream of a vreplication workflow.

type VReplicationStreams added in v0.11.0

type VReplicationStreams []*VReplicationStream

VReplicationStreams wraps a slice of VReplicationStream objects to provide some aggregate functionality.

func (VReplicationStreams) Copy added in v0.11.0

Copy returns a copy of the list of streams. All fields except .Position are copied.

func (VReplicationStreams) ToSlice added in v0.11.0

func (streams VReplicationStreams) ToSlice() []*VReplicationStream

ToSlice unwraps a VReplicationStreams object into the underlying slice of VReplicationStream objects.

func (VReplicationStreams) Values added in v0.11.0

func (streams VReplicationStreams) Values() string

Values returns a string representing the IDs of the VReplicationStreams for use in an IN clause.

(TODO|@ajm188) This currently returns the literal ")" if len(streams) == 0. We should probably update this function to return the full "IN" clause, and then if len(streams) == 0, return "1 != 1" so that we can still execute a valid SQL query.

func (VReplicationStreams) Workflows added in v0.11.0

func (streams VReplicationStreams) Workflows() []string

Workflows returns a list of unique workflow names in the list of vreplication streams.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL