Documentation ¶
Index ¶
- Constants
- Variables
- type Action
- type ActionGroup
- type ActionType
- type ChangeGroupsUpdateToken
- type DataEntry
- type DataFileIndex
- type DataManager
- func (d *DataManager) DataFileIndexPath(dataType, id string) string
- func (d *DataManager) DataFilePath(dataType, id string) string
- func (d *DataManager) Export(ctx context.Context, w io.Writer) error
- func (d *DataManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error)
- func (d *DataManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error)
- func (d *DataManager) GetLastDataStatus() (*DataStatus, error)
- func (d *DataManager) GetLastDataStatusPath() (string, error)
- func (d *DataManager) HasOSTWal(walseq string) (bool, error)
- func (d *DataManager) Import(ctx context.Context, r io.Reader) error
- func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error
- func (d *DataManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error)
- func (d *DataManager) ListEtcdChangeGroups(ctx context.Context, revision int64) (changeGroupsRevisions, error)
- func (d *DataManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement
- func (d *DataManager) ListOSTWals(start string) <-chan *WalFile
- func (d *DataManager) Read(dataType, id string) (io.Reader, error)
- func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error)
- func (d *DataManager) ReadWal(walseq string) (io.ReadCloser, error)
- func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error)
- func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error
- func (d *DataManager) SetMaintenanceMode(maintenanceMode bool)
- func (d *DataManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement
- func (d *DataManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error)
- func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, ...) (*ChangeGroupsUpdateToken, error)
- type DataManagerConfig
- type DataStatus
- type DataStatusFile
- type ListEtcdWalsElement
- type WalChanges
- type WalData
- type WalFile
- type WalHeader
- type WalStatus
- type WalsData
- type WatchElement
Constants ¶
const ( DefaultCheckpointInterval = 10 * time.Second DefaultEtcdWalsKeepNum = 100 DefaultMinCheckpointWalsNum = 100 )
const (
DefaultMaxDataFileSize = 10 * 1024 * 1024
)
Variables ¶
var ( ErrCompacted = errors.New("required revision has been compacted") ErrConcurrency = errors.New("wal concurrency error: change groups already updated") )
Functions ¶
This section is empty.
Types ¶
type ActionGroup ¶
type ActionGroup struct { DataStatusFile *DataStatusFile StartActionIndex int ActionsSize int PreviousDataStatusFiles []*DataStatusFile }
type ActionType ¶
type ActionType string
const ( ActionTypePut ActionType = "put" ActionTypeDelete ActionType = "delete" )
type ChangeGroupsUpdateToken ¶
type ChangeGroupsUpdateToken struct { CurRevision int64 `json:"cur_revision"` ChangeGroupsRevisions changeGroupsRevisions `json:"change_groups_revisions"` }
type DataFileIndex ¶
type DataManager ¶
type DataManager struct {
// contains filtered or unexported fields
}
func NewDataManager ¶
func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error)
func (*DataManager) DataFileIndexPath ¶
func (d *DataManager) DataFileIndexPath(dataType, id string) string
func (*DataManager) DataFilePath ¶
func (d *DataManager) DataFilePath(dataType, id string) string
func (*DataManager) FirstAvailableWalData ¶
FirstAvailableWalData returns the first (the one with smaller sequence) wal and returns it (or nil if not available) and the etcd revision at the time of the operation
func (*DataManager) GetChangeGroupsUpdateToken ¶
func (d *DataManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error)
func (*DataManager) GetLastDataStatus ¶
func (d *DataManager) GetLastDataStatus() (*DataStatus, error)
func (*DataManager) GetLastDataStatusPath ¶
func (d *DataManager) GetLastDataStatusPath() (string, error)
func (*DataManager) InitEtcd ¶
func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error
func (*DataManager) LastCommittedStorageWal ¶
func (*DataManager) ListEtcdChangeGroups ¶
func (d *DataManager) ListEtcdChangeGroups(ctx context.Context, revision int64) (changeGroupsRevisions, error)
func (*DataManager) ListEtcdWals ¶
func (d *DataManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement
func (*DataManager) ListOSTWals ¶
func (d *DataManager) ListOSTWals(start string) <-chan *WalFile
func (*DataManager) ReadObject ¶
func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error)
func (*DataManager) ReadWal ¶
func (d *DataManager) ReadWal(walseq string) (io.ReadCloser, error)
func (*DataManager) ReadWalData ¶
func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error)
func (*DataManager) Run ¶
func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error
func (*DataManager) SetMaintenanceMode ¶ added in v0.2.0
func (d *DataManager) SetMaintenanceMode(maintenanceMode bool)
SetMaintenanceMode sets the datamanager in maintenance mode. This method must be called before invoking the Run method
func (*DataManager) Watch ¶
func (d *DataManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement
func (*DataManager) WriteWal ¶
func (d *DataManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error)
WriteWal writes the provided actions in a wal file. The wal will be marked as "committed" on etcd if the provided group changes aren't changed in the meantime or a optimistic concurrency error will be returned and the wal won't be committed
TODO(sgotti) save inside the wal file also the previous committed wal to handle possible objectstorage list operation eventual consistency gaps (list won't report a wal at seq X but a wal at X+n, if this kind of eventual consistency ever exists)
func (*DataManager) WriteWalAdditionalOps ¶
func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error)
type DataManagerConfig ¶
type DataManagerConfig struct { BasePath string E *etcd.Store OST *objectstorage.ObjStorage DataTypes []string EtcdWalsKeepNum int CheckpointInterval time.Duration // MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint MinCheckpointWalsNum int MaxDataFileSize int64 MaintenanceMode bool }
type DataStatus ¶
type DataStatus struct { DataSequence string `json:"data_sequence,omitempty"` WalSequence string `json:"wal_sequence,omitempty"` // an entry id ordered list of files for a specific data type (map key) Files map[string][]*DataStatusFile `json:"files,omitempty"` }
type DataStatusFile ¶
type ListEtcdWalsElement ¶
type WalChanges ¶
func NewWalChanges ¶
func NewWalChanges(dataTypes []string) *WalChanges
func (*WalChanges) String ¶
func (c *WalChanges) String() string
type WalStatus ¶
type WalStatus string
const ( // WalStatusCommitted represent a wal written to the objectstorage WalStatusCommitted WalStatus = "committed" // WalStatusCommittedStorage represent the .committed marker file written to the objectstorage WalStatusCommittedStorage WalStatus = "committed_storage" // WalStatusCheckpointed mean that all the wal actions have been executed on the objectstorage WalStatusCheckpointed WalStatus = "checkpointed" )