datamanager

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2020 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSyncInterval                = 5 * time.Second
	DefaultCheckpointInterval          = 10 * time.Second
	DefaultCheckpointCleanInterval     = 5 * time.Minute
	DefaultEtcdWalCleanInterval        = 2 * time.Second
	DefaultStorageWalCleanInterval     = 5 * time.Minute
	DefaultCompactChangeGroupsInterval = 1 * time.Second
	DefaultEtcdPingerInterval          = 1 * time.Second
	DefaultEtcdWalsKeepNum             = 100
	DefaultMinCheckpointWalsNum        = 100
)
View Source
const (
	DefaultMaxDataFileSize = 10 * 1024 * 1024
)

Variables

View Source
var (
	DataFileRegexp       = regexp.MustCompile(`^([a-zA-Z0-9]+-[a-zA-Z0-9]+)-([a-zA-Z0-9-]+)\.(data|index)$`)
	DataStatusFileRegexp = regexp.MustCompile(`^([a-zA-Z0-9]+-[a-zA-Z0-9]+)\.status$`)
)
View Source
var (
	ErrCompacted   = errors.New("required revision has been compacted")
	ErrConcurrency = errors.New("wal concurrency error: change groups already updated")
)
View Source
var ErrNoDataStatus = errors.New("no data status files")

ErrNoDataStatus represent when there's no data status files in the ost

Functions

This section is empty.

Types

type Action

type Action struct {
	ActionType ActionType
	DataType   string
	ID         string
	Data       []byte
}

type ActionGroup

type ActionGroup struct {
	DataStatusFile          *DataStatusFile
	StartActionIndex        int
	ActionsSize             int
	PreviousDataStatusFiles []*DataStatusFile
}

type ActionType

type ActionType string
const (
	ActionTypePut    ActionType = "put"
	ActionTypeDelete ActionType = "delete"
)

type ChangeGroupsUpdateToken

type ChangeGroupsUpdateToken struct {
	CurRevision           int64                 `json:"cur_revision"`
	ChangeGroupsRevisions changeGroupsRevisions `json:"change_groups_revisions"`
}

type DataEntry

type DataEntry struct {
	ID       string `json:"id,omitempty"`
	DataType string `json:"data_type,omitempty"`
	Data     []byte `json:"data,omitempty"`
}

type DataFileIndex

type DataFileIndex struct {
	Index map[string]int64 `json:"index,omitempty"`
}

type DataManager

type DataManager struct {
	// contains filtered or unexported fields
}

func NewDataManager

func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error)

func (*DataManager) CleanOldCheckpoints added in v0.4.0

func (d *DataManager) CleanOldCheckpoints(ctx context.Context) error

func (*DataManager) DataFileBasePath added in v0.4.0

func (d *DataManager) DataFileBasePath(dataType, name string) string

func (*DataManager) DataFileIndexPath

func (d *DataManager) DataFileIndexPath(dataType, name string) string

func (*DataManager) DataFilePath

func (d *DataManager) DataFilePath(dataType, name string) string

func (*DataManager) DataTypeDir added in v0.4.0

func (d *DataManager) DataTypeDir(dataType string) string

func (*DataManager) Export added in v0.2.0

func (d *DataManager) Export(ctx context.Context, w io.Writer) error

func (*DataManager) FirstAvailableWalData

func (d *DataManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error)

FirstAvailableWalData returns the first (the one with smaller sequence) wal and returns it (or nil if not available) and the etcd revision at the time of the operation

func (*DataManager) GetChangeGroupsUpdateToken

func (d *DataManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error)

func (*DataManager) GetDataStatus added in v0.4.0

func (d *DataManager) GetDataStatus(dataSequence *sequence.Sequence) (*DataStatus, error)

func (*DataManager) GetFirstDataStatus added in v0.4.0

func (d *DataManager) GetFirstDataStatus() (*DataStatus, error)

func (*DataManager) GetFirstDataStatusSequence added in v0.4.0

func (d *DataManager) GetFirstDataStatusSequence() (*sequence.Sequence, error)

func (*DataManager) GetFirstDataStatusSequences added in v0.4.0

func (d *DataManager) GetFirstDataStatusSequences(n int) ([]*sequence.Sequence, error)

func (*DataManager) GetLastDataStatus

func (d *DataManager) GetLastDataStatus() (*DataStatus, error)

func (*DataManager) GetLastDataStatusSequence added in v0.4.0

func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error)

func (*DataManager) GetLastDataStatusSequences added in v0.4.0

func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, error)

func (*DataManager) HasOSTWal

func (d *DataManager) HasOSTWal(walseq string) (bool, error)

func (*DataManager) Import added in v0.2.0

func (d *DataManager) Import(ctx context.Context, r io.Reader) error

func (*DataManager) InitEtcd

func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error

func (*DataManager) LastCommittedStorageWal

func (d *DataManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error)

func (*DataManager) ListEtcdChangeGroups

func (d *DataManager) ListEtcdChangeGroups(ctx context.Context, revision int64) (changeGroupsRevisions, error)

func (*DataManager) ListEtcdWals

func (d *DataManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement

func (*DataManager) ListOSTWals

func (d *DataManager) ListOSTWals(start string) <-chan *WalFile

func (*DataManager) Read

func (d *DataManager) Read(dataType, id string) (io.Reader, error)

func (*DataManager) ReadObject

func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error)

func (*DataManager) ReadWal

func (d *DataManager) ReadWal(walseq string) (*WalHeader, error)

func (*DataManager) ReadWalData

func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error)

func (*DataManager) Run

func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error

func (*DataManager) SetMaintenanceMode added in v0.2.0

func (d *DataManager) SetMaintenanceMode(maintenanceMode bool)

SetMaintenanceMode sets the datamanager in maintenance mode. This method must be called before invoking the Run method

func (*DataManager) Watch

func (d *DataManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement

func (*DataManager) WriteWal

WriteWal writes the provided actions in a wal file. The wal will be marked as "committed" on etcd if the provided group changes aren't changed in the meantime or a optimistic concurrency error will be returned and the wal won't be committed

TODO(sgotti) save inside the wal file also the previous committed wal to handle possible objectstorage list operation eventual consistency gaps (list won't report a wal at seq X but a wal at X+n, if this kind of eventual consistency ever exists)

func (*DataManager) WriteWalAdditionalOps

func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error)

type DataManagerConfig

type DataManagerConfig struct {
	BasePath                string
	E                       *etcd.Store
	OST                     *objectstorage.ObjStorage
	DataTypes               []string
	EtcdWalsKeepNum         int
	CheckpointInterval      time.Duration
	CheckpointCleanInterval time.Duration
	// MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint
	MinCheckpointWalsNum int
	MaxDataFileSize      int64
	MaintenanceMode      bool
}

type DataStatus

type DataStatus struct {
	DataSequence string `json:"data_sequence,omitempty"`
	WalSequence  string `json:"wal_sequence,omitempty"`
	// an entry id ordered list of files for a specific data type (map key)
	Files map[string][]*DataStatusFile `json:"files,omitempty"`
}

type DataStatusFile

type DataStatusFile struct {
	ID string `json:"id,omitempty"`
	// the last entry id in this file
	LastEntryID string `json:"last_entry_id,omitempty"`
}

type ListEtcdWalsElement

type ListEtcdWalsElement struct {
	WalData *WalData
	Err     error
}

type WalChanges

type WalChanges struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewWalChanges

func NewWalChanges(dataTypes []string) *WalChanges

func (*WalChanges) String

func (c *WalChanges) String() string

type WalData

type WalData struct {
	WalDataFileID       string
	WalStatus           WalStatus
	WalSequence         string
	PreviousWalSequence string

	// internal values not saved
	Revision int64 `json:"-"`
}

type WalFile

type WalFile struct {
	WalSequence string
	Err         error
}

type WalHeader

type WalHeader struct {
	WalDataFileID       string
	PreviousWalSequence string
}

type WalStatus

type WalStatus string
const (
	// WalStatusCommitted represent a wal written to the objectstorage
	WalStatusCommitted WalStatus = "committed"
	// WalStatusCommittedStorage represent the .committed marker file written to the objectstorage
	WalStatusCommittedStorage WalStatus = "committed_storage"
	// WalStatusCheckpointed mean that all the wal actions have been executed on the objectstorage
	WalStatusCheckpointed WalStatus = "checkpointed"
)

type WalsData

type WalsData struct {
	LastCommittedWalSequence string
	Revision                 int64 `json:"-"`
}

type WatchElement

type WatchElement struct {
	Revision              int64
	WalData               *WalData
	ChangeGroupsRevisions changeGroupsRevisions

	Err error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL