all

package
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

package all

This package is the canonical location for all migrations being made against the single shared kv.Store implementation used by InfluxDB (while it remains a single store).

The array all.Migrations contains the list of migration specifications which drives the serial set of migration operations required to correctly configure the backing metadata store for InfluxDB.

This package is arranged like so:

doc.go - this piece of documentation.
all.go - definition of Migration array referencing each of the name migrations in number migration files (below).
migration.go - an implementation of migration.Spec for convenience.
000X_migration_name.go (example) - N files contains the specific implementations of each migration enumerated in `all.go`.
...

Managing this list of files and all.go can be fiddly. There is a buildable cli utility called `kvmigrate` in the `internal/cmd/kvmigrate` package. This has a command `create` which automatically creates a new migration in the expected location and appends it appropriately into the all.go Migration array.

Index

Constants

This section is empty.

Variables

View Source
var Migration0001_InitialMigration = kv.InitialMigration{}

Migration0001_InitialMigration contains all the buckets created before the time of migrations in kv

Migration0002_AddURMByUserIndex creates the URM by user index and populates missing entries based on the source.

View Source
var Migration0003_TaskOwnerIDUpMigration = UpOnlyMigration(
	"migrate task owner id",
	func(ctx context.Context, store kv.SchemaStore) error {
		var ownerlessTasks []*taskmodel.Task

		err := store.View(ctx, func(tx kv.Tx) error {
			taskBucket, err := tx.Bucket(taskBucket)
			if err != nil {
				return taskmodel.ErrUnexpectedTaskBucketErr(err)
			}

			c, err := taskBucket.ForwardCursor([]byte{})
			if err != nil {
				return taskmodel.ErrUnexpectedTaskBucketErr(err)
			}

			for k, v := c.Next(); k != nil; k, v = c.Next() {
				kvTask := &kvTask{}
				if err := json.Unmarshal(v, kvTask); err != nil {
					return taskmodel.ErrInternalTaskServiceError(err)
				}

				t := kvToInfluxTask(kvTask)

				if !t.OwnerID.Valid() {
					ownerlessTasks = append(ownerlessTasks, t)
				}
			}
			if err := c.Err(); err != nil {
				return err
			}

			return c.Close()
		})
		if err != nil {
			return err
		}

		for _, t := range ownerlessTasks {

			err := store.Update(ctx, func(tx kv.Tx) error {
				taskKey, err := taskKey(t.ID)
				if err != nil {
					return err
				}
				b, err := tx.Bucket(taskBucket)
				if err != nil {
					return taskmodel.ErrUnexpectedTaskBucketErr(err)
				}

				if !t.OwnerID.Valid() {
					v, err := b.Get(taskKey)
					if kv.IsNotFound(err) {
						return taskmodel.ErrTaskNotFound
					}
					authType := struct {
						AuthorizationID platform.ID `json:"authorizationID"`
					}{}
					if err := json.Unmarshal(v, &authType); err != nil {
						return taskmodel.ErrInternalTaskServiceError(err)
					}

					encodedID, err := authType.AuthorizationID.Encode()
					if err == nil {
						authBucket, err := tx.Bucket([]byte("authorizationsv1"))
						if err != nil {
							return err
						}

						a, err := authBucket.Get(encodedID)
						if err == nil {
							auth := &influxdb.Authorization{}
							if err := json.Unmarshal(a, auth); err != nil {
								return err
							}

							t.OwnerID = auth.GetUserID()
						}
					}

				}

				if !t.OwnerID.Valid() {
					b, err := tx.Bucket([]byte("userresourcemappingsv1"))
					if err != nil {
						return err
					}

					id, err := t.OrganizationID.Encode()
					if err != nil {
						return err
					}

					cur, err := b.ForwardCursor(id, kv.WithCursorPrefix(id))
					if err != nil {
						return err
					}

					for k, v := cur.Next(); k != nil; k, v = cur.Next() {
						m := &influxdb.UserResourceMapping{}
						if err := json.Unmarshal(v, m); err != nil {
							return err
						}
						if m.ResourceID == t.OrganizationID && m.ResourceType == influxdb.OrgsResourceType && m.UserType == influxdb.Owner {
							t.OwnerID = m.UserID
							break
						}
					}

					if err := cur.Close(); err != nil {
						return err
					}
				}

				if !t.OwnerID.Valid() {
					return &errors.Error{
						Code: errors.EInternal,
						Msg:  "could not populate owner ID for task",
					}
				}

				taskBytes, err := json.Marshal(t)
				if err != nil {
					return taskmodel.ErrInternalTaskServiceError(err)
				}

				err = b.Put(taskKey, taskBytes)
				if err != nil {
					return taskmodel.ErrUnexpectedTaskBucketErr(err)
				}
				return nil
			})
			if err != nil {
				return err
			}
		}
		return nil
	},
)

Migration0003_TaskOwnerIDUpMigration adds missing owner IDs to some legacy tasks

View Source
var Migration0004_AddDbrpBuckets = migration.CreateBuckets(
	"create DBRP buckets",
	dbrpBucket,
	dbrpIndexBucket,
	dbrpDefaultBucket,
)

Migration0004_AddDbrpBuckets creates the buckets necessary for the DBRP Service to operate.

View Source
var Migration0005_AddPkgerBuckets = migration.CreateBuckets(
	"create pkger stacks buckets",
	pkgerStacksBucket,
	pkgerStackIndexBucket,
)

Migration0005_AddPkgerBuckets creates the buckets necessary for the pkger service to operate.

View Source
var Migration0006_DeleteBucketSessionsv1 = migration.DeleteBuckets("delete sessionsv1 bucket", []byte("sessionsv1"))

Migration0006_DeleteBucketSessionsv1 removes the sessionsv1 bucket from the backing kv store.

View Source
var Migration0007_CreateMetaDataBucket = migration.CreateBuckets(
	"Create TSM metadata buckets",
	meta.BucketName)
View Source
var Migration0008_LegacyAuthBuckets = migration.CreateBuckets(
	"Create Legacy authorization buckets",
	[]byte("legacy/authorizationsv1"), []byte("legacy/authorizationindexv1"))
View Source
var Migration0009_LegacyAuthPasswordBuckets = migration.CreateBuckets(
	"Create legacy auth password bucket",
	[]byte("legacy/authorizationPasswordv1"))

Migration0010_AddIndexTelegrafByOrg adds the index telegraf configs by organization ID

View Source
var Migration0011_PopulateDashboardsOwnerId = UpOnlyMigration("populate dashboards owner id", func(ctx context.Context, store kv.SchemaStore) error {
	var urmBucket = []byte("userresourcemappingsv1")
	type userResourceMapping struct {
		UserID       platform.ID           `json:"userID"`
		UserType     influxdb.UserType     `json:"userType"`
		MappingType  influxdb.MappingType  `json:"mappingType"`
		ResourceType influxdb.ResourceType `json:"resourceType"`
		ResourceID   platform.ID           `json:"resourceID"`
	}

	var mappings []*userResourceMapping
	if err := store.View(ctx, func(tx kv.Tx) error {
		bkt, err := tx.Bucket(urmBucket)
		if err != nil {
			return err
		}

		cursor, err := bkt.ForwardCursor(nil)
		if err != nil {
			return err
		}

		return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
			var mapping userResourceMapping
			if err := json.Unmarshal(v, &mapping); err != nil {
				return false, err
			}

			if mapping.ResourceType == influxdb.DashboardsResourceType &&
				mapping.UserType == influxdb.Owner {
				mappings = append(mappings, &mapping)
			}

			return true, nil
		})
	}); err != nil {
		return err
	}

	var dashboardsBucket = []byte("dashboardsv2")
	// dashboard represents all visual and query data for a dashboard.
	type dashboard struct {
		ID             platform.ID            `json:"id,omitempty"`
		OrganizationID platform.ID            `json:"orgID,omitempty"`
		Name           string                 `json:"name"`
		Description    string                 `json:"description"`
		Cells          []*influxdb.Cell       `json:"cells"`
		Meta           influxdb.DashboardMeta `json:"meta"`
		OwnerID        *platform.ID           `json:"owner,omitempty"`
	}

	var (
		batchSize = 100
		flush     = func(batch []*userResourceMapping) (err error) {
			ids := make([][]byte, len(batch))
			for i, urm := range batch {
				ids[i], err = urm.ResourceID.Encode()
				if err != nil {
					return
				}
			}

			return store.Update(ctx, func(tx kv.Tx) error {
				bkt, err := tx.Bucket(dashboardsBucket)
				if err != nil {
					return err
				}

				values, err := bkt.GetBatch(ids...)
				if err != nil {
					return err
				}

				for i, value := range values {
					var dashboard dashboard
					if err := json.Unmarshal(value, &dashboard); err != nil {
						return err
					}

					if dashboard.OwnerID != nil {
						fmt.Printf("dashboard %q already has owner %q", dashboard.ID, dashboard.OwnerID)
						continue
					}

					dashboard.OwnerID = &batch[i].UserID

					updated, err := json.Marshal(dashboard)
					if err != nil {
						return err
					}

					return bkt.Put(ids[i], updated)
				}

				return nil
			})
		}
	)

	for i := 0; i < len(mappings); i += batchSize {
		end := i + batchSize
		if end > len(mappings) {
			end = len(mappings)
		}

		flush(mappings[i:end])
	}

	return nil
})

Migration0011_PopulateDashboardsOwnerId backfills owner IDs on dashboards based on the presence of user resource mappings

View Source
var Migration0013_RepairDBRPOwnerAndBucketIDs = UpOnlyMigration(
	"repair DBRP owner and bucket IDs",
	func(ctx context.Context, store kv.SchemaStore) error {
		type oldStyleMapping struct {
			ID              platform.ID `json:"id"`
			Database        string      `json:"database"`
			RetentionPolicy string      `json:"retention_policy"`
			Default         bool        `json:"default"`

			// These 2 fields were renamed.
			OrganizationID platform.ID `json:"organization_id"`
			BucketID       platform.ID `json:"bucket_id"`
		}

		// Collect DBRPs that are using the old schema.
		var mappings []*oldStyleMapping
		if err := store.View(ctx, func(tx kv.Tx) error {
			bkt, err := tx.Bucket(dbrpBucket)
			if err != nil {
				return err
			}

			cursor, err := bkt.ForwardCursor(nil)
			if err != nil {
				return err
			}

			return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
				var mapping oldStyleMapping
				if err := json.Unmarshal(v, &mapping); err != nil {
					return false, err
				}

				if mapping.OrganizationID.Valid() && mapping.BucketID.Valid() {
					mappings = append(mappings, &mapping)
				}

				return true, nil
			})
		}); err != nil {
			return err
		}

		type newStyleDbrpMapping struct {
			ID              platform.ID `json:"id"`
			Database        string      `json:"database"`
			RetentionPolicy string      `json:"retention_policy"`
			Default         bool        `json:"default"`

			// New names for the 2 renamed fields.
			OrganizationID platform.ID `json:"orgID"`
			BucketID       platform.ID `json:"bucketID"`
		}
		batchSize := 100
		writeBatch := func(batch []*oldStyleMapping) (err error) {
			ids := make([][]byte, len(batch))
			for i, mapping := range batch {
				ids[i], err = mapping.ID.Encode()
				if err != nil {
					return
				}
			}

			return store.Update(ctx, func(tx kv.Tx) error {
				bkt, err := tx.Bucket(dbrpBucket)
				if err != nil {
					return err
				}

				values, err := bkt.GetBatch(ids...)
				if err != nil {
					return err
				}

				for i, value := range values {
					var mapping newStyleDbrpMapping
					if err := json.Unmarshal(value, &mapping); err != nil {
						return err
					}

					if !mapping.OrganizationID.Valid() {
						mapping.OrganizationID = batch[i].OrganizationID
					}
					if !mapping.BucketID.Valid() {
						mapping.BucketID = batch[i].BucketID
					}

					updated, err := json.Marshal(mapping)
					if err != nil {
						return err
					}

					if err := bkt.Put(ids[i], updated); err != nil {
						return err
					}
				}

				return nil
			})
		}

		for i := 0; i < len(mappings); i += batchSize {
			end := i + batchSize
			if end > len(mappings) {
				end = len(mappings)
			}
			if err := writeBatch(mappings[i:end]); err != nil {
				return err
			}
		}

		return nil
	},
)
View Source
var Migration0014_ReindexDBRPs = kv.NewIndexMigration(dbrp.ByOrgIDIndexMapping)
View Source
var Migration0015_RecordShardGroupDurationsInBucketMetadata = UpOnlyMigration(
	"record shard group durations in bucket metadata",
	repairMissingShardGroupDurations,
)
View Source
var Migration0016_AddAnnotationsNotebooksToOperToken = &Migration{
	name: "add annotations and notebooks resource types to operator token",
	up: migrateTokensMigration(
		func(t influxdb.Authorization) bool {
			return permListsMatch(preNotebooksAnnotationsOpPerms(), t.Permissions)
		},
		func(t *influxdb.Authorization) {
			t.Permissions = append(t.Permissions, notebooksAndAnnotationsPerms(0)...)
		},
	),
	down: migrateTokensMigration(
		func(t influxdb.Authorization) bool {
			return permListsMatch(append(preNotebooksAnnotationsOpPerms(), notebooksAndAnnotationsPerms(0)...), t.Permissions)
		},
		func(t *influxdb.Authorization) {
			newPerms := t.Permissions[:0]
			for _, p := range t.Permissions {
				switch p.Resource.Type {
				case influxdb.AnnotationsResourceType:
				case influxdb.NotebooksResourceType:
				default:
					newPerms = append(newPerms, p)
				}
			}
			t.Permissions = newPerms
		},
	),
}
View Source
var Migration0017_AddAnnotationsNotebooksToAllAccessTokens = &Migration{
	name: "add annotations and notebooks resource types to all-access tokens",
	up: migrateTokensMigration(
		func(t influxdb.Authorization) bool {
			return permListsMatch(preNotebooksAnnotationsAllAccessPerms(t.OrgID, t.UserID), t.Permissions)
		},
		func(t *influxdb.Authorization) {
			t.Permissions = append(t.Permissions, notebooksAndAnnotationsPerms(t.OrgID)...)
		},
	),
	down: migrateTokensMigration(
		func(t influxdb.Authorization) bool {
			return permListsMatch(append(preNotebooksAnnotationsAllAccessPerms(t.OrgID, t.UserID), notebooksAndAnnotationsPerms(t.OrgID)...), t.Permissions)
		},
		func(t *influxdb.Authorization) {
			newPerms := t.Permissions[:0]
			for _, p := range t.Permissions {
				switch p.Resource.Type {
				case influxdb.AnnotationsResourceType:
				case influxdb.NotebooksResourceType:
				default:
					newPerms = append(newPerms, p)
				}
			}
			t.Permissions = newPerms
		},
	),
}
View Source
var Migration0018_RepairMissingShardGroupDurations = UpOnlyMigration(
	"repair missing shard group durations",
	repairMissingShardGroupDurations,
)

NOTE: Down() is purposefully left as a no-op here because this migration fills in values that were missing because of a logic bug, and doesn't actually modify the metadata schema.

View Source
var Migration0019_AddRemotesReplicationsToTokens = &Migration{
	name: "add remotes and replications resource types to operator and all-access tokens",
	up: migrateTokensMigration(
		func(t influxdb.Authorization) bool {
			return permListsMatch(preReplicationOpPerms(), t.Permissions) ||
				permListsMatch(preReplicationAllAccessPerms(t.OrgID, t.UserID), t.Permissions)
		},
		func(t *influxdb.Authorization) {
			if permListsMatch(preReplicationOpPerms(), t.Permissions) {
				t.Permissions = append(t.Permissions, remotesAndReplicationsPerms(0)...)
			} else {
				t.Permissions = append(t.Permissions, remotesAndReplicationsPerms(t.OrgID)...)
			}
		},
	),
	down: migrateTokensMigration(
		func(t influxdb.Authorization) bool {
			return permListsMatch(append(preReplicationOpPerms(), remotesAndReplicationsPerms(0)...), t.Permissions) ||
				permListsMatch(append(preReplicationAllAccessPerms(t.OrgID, t.UserID), remotesAndReplicationsPerms(t.OrgID)...), t.Permissions)
		},
		func(t *influxdb.Authorization) {
			newPerms := t.Permissions[:0]
			for _, p := range t.Permissions {
				switch p.Resource.Type {
				case influxdb.RemotesResourceType:
				case influxdb.ReplicationsResourceType:
				default:
					newPerms = append(newPerms, p)
				}
			}
			t.Permissions = newPerms
		},
	),
}
View Source
var Migration0020_Add_remotes_replications_metrics_buckets = migration.CreateBuckets(
	"create remotes and replications metrics buckets",
	remoteMetricsBucket,
	replicationsMetricsBucket,
)

Migrations contains all the migrations required for the entire of the kv store backing influxdb's metadata.

Functions

func Up

func Up(ctx context.Context, logger *zap.Logger, store kv.SchemaStore) error

Up is a convenience methods which creates a migrator for all migrations and calls Up on it.

Types

type Migration

type Migration struct {
	// contains filtered or unexported fields
}

Migration is a type which implements the migration packages Spec interface It can be used to conveniently create migration specs for the all package

func UpOnlyMigration

func UpOnlyMigration(name string, up MigrationFunc) *Migration

UpOnlyMigration is a migration with an up function and a noop down function

func (*Migration) Down

func (m *Migration) Down(ctx context.Context, store kv.SchemaStore) error

Down delegates to the underlying anonymous down migration function

func (*Migration) MigrationName

func (m *Migration) MigrationName() string

MigrationName returns the underlying name of the migation

func (*Migration) Up

func (m *Migration) Up(ctx context.Context, store kv.SchemaStore) error

Up delegates to the underlying anonymous up migration function

type MigrationFunc

type MigrationFunc func(context.Context, kv.SchemaStore) error

MigrationFunc is a function which can be used as either an up or down operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL