objectserver

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2018 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Overview

Package objectserver provides a Object Server implementation.

Hummingbird Replication

The hummingbird object server is backwards-compatible with the python-swift replicator, but hummingbird-to-hummingbird replication uses its own simple protocol that it sends over a hijacked HTTP connection.

Messages are 32-bit length prefixed JSON serialized structures. The basic flow looks like:

replicator sends a BeginReplicationRequest{Device string, Partition string, NeedHashes bool}
server responds with a BeginReplicationResponse{Hashes map[string]string}
for each object file in the partition where suffix hash doesn't match {
	replicator sends a SyncFileRequest{Path string, Xattrs string, Size int}
	server responds with a SyncFileResponse{Exists bool, NewerExists bool, GoAhead bool}
	if response.GoAhead is true {
		replicator sends raw file body
		server responds with a FileUploadResponse{Success bool}
	}
}

The replicator limits concurrency per-device and overall. When the server gets a BeginReplicationRequest, it'll wait up to 60 seconds for a slot to open up before rejecting it.

Unlike python-swift, the replicator will only read each filesystem once per pass.

Index

Constants

This section is empty.

Variables

View Source
var AuditForeverInterval = 30 * time.Second

AuditForeverInterval represents how often a auditor check should be performed.

View Source
var DriveFullError = errors.New("Drive Full")

DriveFullError can be returned by Object.SetData and Object.Delete if the disk is too full for the operation.

View Source
var LockPathError = errors.New("Error locking path")
View Source
var PathNotDirError = errors.New("Path is not a directory")
View Source
var RepUnmountedError = fmt.Errorf("Device unmounted")

Functions

func AsyncDir added in v0.0.2

func AsyncDir(policy int) string

func GetHashes

func GetHashes(driveRoot string, device string, partition string, recalculate []string, reclaimAge int64, policy int, logger srv.LowLevelLogger) (map[string]string, error)

func HashCleanupListDir

func HashCleanupListDir(hashDir string, reclaimAge int64) ([]string, error)

func InvalidateHash

func InvalidateHash(hashDir string) error

InvalidateHash invalidates the hashdir's suffix hash, indicating it needs to be recalculated.

func MetadataHash added in v1.2.0

func MetadataHash(metadata map[string]string) string

MetadataHash returns a hash of the contents of the metadata.

func MetadataMerge added in v1.2.0

func MetadataMerge(a map[string]string, b map[string]string) map[string]string

MetadataMerge will return the result of merging the a and b metadata sets; neither a nor b should be used after calling this method.

func MoveParts

func MoveParts(args []string, cnf srv.ConfigLoader)

MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes.

func NewReplicator

func NewReplicator(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (ipPort *srv.IpPort, server srv.Server, logger srv.LowLevelLogger, err error)

func NewServer added in v0.0.2

func NewServer(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*srv.IpPort, srv.Server, srv.LowLevelLogger, error)

func ObjHash added in v0.0.2

func ObjHash(vars map[string]string, hashPathPrefix string, hashPathSuffix string) string

func ObjHashDir

func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, hashPathSuffix string, policy int) string

func ObjectFiles

func ObjectFiles(directory string) (string, string)

func ObjectMetadata

func ObjectMetadata(dataFile string, metaFile string) (map[string]string, error)

func OneTimeChan

func OneTimeChan() chan time.Time

OneTimeChan returns a channel that will yield the current time once, then is closed.

func OpenObjectMetadata

func OpenObjectMetadata(fd uintptr, metaFile string) (map[string]string, error)

func PolicyDir

func PolicyDir(policy int) string

func QuarantineHash

func QuarantineHash(hashDir string) error

func RecalculateSuffixHash

func RecalculateSuffixHash(suffixDir string, reclaimAge int64) (string, error)

func RegisterObjectEngine

func RegisterObjectEngine(name string, newEngine ObjectEngineConstructor)

RegisterObjectEngine lets you tell hummingbird about a new object engine.

func RestoreDevice

func RestoreDevice(args []string, cnf srv.ConfigLoader)

RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers.

func SendPriRepJob added in v0.0.2

func SendPriRepJob(job *PriorityRepJob, client *http.Client) (string, bool)

func TempDirPath

func TempDirPath(driveRoot string, device string) string

func UnPolicyDir

func UnPolicyDir(dir string) (int, error)

Types

type Auditor

type Auditor struct {
	*AuditorDaemon
	// contains filtered or unexported fields
}

Auditor keeps track of general audit data.

type AuditorDaemon

type AuditorDaemon struct {
	// contains filtered or unexported fields
}

AuditorDaemon keeps track of object specific audit data.

func NewAuditorDaemon added in v0.0.2

func NewAuditorDaemon(serverconf conf.Config, flags *flag.FlagSet, cnf srv.ConfigLoader) (*AuditorDaemon, error)

NewAuditor returns a new AuditorDaemon with the given conf.

func (*AuditorDaemon) Run

func (d *AuditorDaemon) Run()

Run a single audit pass.

func (*AuditorDaemon) RunForever

func (d *AuditorDaemon) RunForever()

RunForever triggering audit passes every time AuditForeverInterval has passed.

type BeginReplicationRequest

type BeginReplicationRequest struct {
	Device     string
	Partition  string
	NeedHashes bool
}

type BeginReplicationResponse

type BeginReplicationResponse struct {
	Hashes map[string]string
}

type DeviceStats added in v0.0.2

type DeviceStats struct {
	Stats              map[string]int64
	LastCheckin        time.Time
	PassStarted        time.Time
	DeviceStarted      time.Time
	LastPassFinishDate time.Time
	LastPassDuration   time.Duration
	CancelCount        int64
	FilesSent          int64
	BytesSent          int64
	PartitionsDone     int64
	PartitionsTotal    int64
	TotalPasses        int64
	PriorityRepsDone   int64
}

type ECAuditFuncs added in v1.2.0

type ECAuditFuncs interface {
	AuditShard(path string, hash string, skipMd5 bool) (int64, error)
	AuditNurseryObject(path string, metabytes []byte, skipMd5 bool) (int64, error)
}

type FileUploadResponse

type FileUploadResponse struct {
	Success bool
	Msg     string
}

type IndexDB added in v1.2.0

type IndexDB struct {
	RingPartPower uint // GLH: Temp exported for fakelist
	// contains filtered or unexported fields
}

IndexDB will track a set of objects.

This is the "index.db" per disk. Right now it just handles whole objects, but eventually we'd like to add either slab support or direct database embedding for small objects. But, those details should be transparent from users of a IndexDB.

This is different from the standard Swift full replica object tracking in that the directory structure is much shallower, there are a configurable number of databases per drive instead of a ton of hashes.pkl files, and the version tracking / consolidation is much simpler.

The IndexDB stores the newest object contents it knows about and discards any older ones, like the standard Swift's .data files. It does not have .meta files at all, and certainly not stacked to infinity .meta files. Instead the metadata is stored in the database as JSON.

A given IndexDB may not even store any metadata, such as in an EC system, with just "key" IndexDBs storing the metadata.

func NewIndexDB added in v1.2.0

func NewIndexDB(dbpath, filepath, temppath string, ringPartPower, dbPartPower, subdirs int, reserve int64, logger *zap.Logger) (*IndexDB, error)

NewIndexDB creates a IndexDB to manage a set of objects.

The ringPartPower is defined by the ring in use, but should be greater than the dbPartPower. The dbPartPower will define how many databases are created (e.g. dbPartPower = 6 gives 64 databases). The subdirs value will define how many subdirectories are created where object content files are placed.

func (*IndexDB) Close added in v1.2.0

func (ot *IndexDB) Close()

Close closes all the underlying databases for the IndexDB; you should discard the IndexDB instance after this call.

func (*IndexDB) Commit added in v1.2.0

func (ot *IndexDB) Commit(f fs.AtomicFileWriter, hsh string, shard int, timestamp int64, method string, metahash string, metadata []byte, nursery bool, shardhash string) error

Commit moves the temporary file (from TempFile) into place and records its information in the database. It may actually discard it completely if there is already a newer object information in place for the hash:shard.

Shard is mostly for EC type policies; just use 0 if you're using a full replica policy.

Timestamp is the timestamp for the object contents, not necessarily the metadata.

func (*IndexDB) List added in v1.2.0

func (ot *IndexDB) List(startHash, stopHash, marker string, limit int) ([]*IndexDBItem, error)

List returns the items for the ringPart given.

This is for replication, auditing, that sort of thing.

func (*IndexDB) ListObjectsToStabilize added in v1.2.0

func (ot *IndexDB) ListObjectsToStabilize() ([]*IndexDBItem, error)

ListObjectsToStabilize lists all objects that are in the nursery or set to restabilzed

func (*IndexDB) Lookup added in v1.2.0

func (ot *IndexDB) Lookup(hsh string, shard int, justStable bool) (*IndexDBItem, error)

Lookup returns the stored information for the hsh and shard. Will return (nil, error) if there is an error. (nil, nil) if not found

func (*IndexDB) Remove added in v1.2.0

func (ot *IndexDB) Remove(hsh string, shard int, timestamp int64, nursery bool) error

Remove removes an entry from the database and its backing disk file.

func (*IndexDB) RingPartRange added in v1.2.0

func (ot *IndexDB) RingPartRange(ringPart int) (string, string)

func (*IndexDB) SetRestablized added in v1.2.0

func (ot *IndexDB) SetRestablized(hsh string, shard int, timestamp int64) error

func (*IndexDB) TempFile added in v1.2.0

func (ot *IndexDB) TempFile(hsh string, shard int, timestamp int64, sizeHint int64, nursery bool) (fs.AtomicFileWriter, error)

TempFile returns a temporary file to write to for eventually adding the hash:shard to the IndexDB with Commit; may return (nil, nil) if there is already a newer or equal timestamp in place for the hash:shard.

func (*IndexDB) WholeObjectPath added in v1.2.0

func (ot *IndexDB) WholeObjectPath(hsh string, shard int, timestamp int64, nursery bool) (string, error)

type IndexDBItem added in v1.2.0

type IndexDBItem struct {
	Hash        string
	Shard       int
	Timestamp   int64
	Metahash    string `json:"-"`
	Nursery     bool
	Metabytes   []byte `json:"-"`
	Deletion    bool
	Path        string
	ShardHash   string
	Restabilize bool
}

IndexDBItem is a single item returned by List.

type NoMoreNodes

type NoMoreNodes struct{}

func (*NoMoreNodes) Next

func (n *NoMoreNodes) Next() *ring.Device

type NurseryObjectEngine added in v0.0.2

type NurseryObjectEngine interface {
	ObjectEngine
	GetObjectsToStabilize(device string, c chan ObjectStabilizer, cancel chan struct{})
	GetObjectsToReplicate(prirep PriorityRepJob, c chan ObjectStabilizer, cancel chan struct{})
}

type Object

type Object interface {
	// Exists determines whether or not there is an object to serve. Deleted objects do not exist, even if there is a tombstone.
	Exists() bool
	// Quarantine removes the file's data, presumably after determining it's been corrupted.
	Quarantine() error
	// Metadata returns the object's metadata.  Will be nil if the object doesn't exist.
	Metadata() map[string]string
	// ContentLength returns the object's content-length.
	ContentLength() int64
	// CopyRange copies a range of data from the object to the writer.
	CopyRange(io.Writer, int64, int64) (int64, error)
	// Copy copies an object's entire contents to the writer(s).
	Copy(...io.Writer) (int64, error)
	// SetData sets the data for the object, given the size (if known).  It returns a writer and an error if any.
	SetData(size int64) (io.Writer, error)
	// Commit saves a new object data that was started with SetData.
	Commit(metadata map[string]string) error
	// CommitMetadata updates the object's metadata.
	CommitMetadata(metadata map[string]string) error
	// Delete deletes the object.
	Delete(metadata map[string]string) error
	// Close releases any resources held by the Object instance.
	Close() error
	// Repr returns a representation of the object, used for logging.
	Repr() string
}

type ObjectEngine

type ObjectEngine interface {
	// New creates a new instance of the Object, for interacting with a single object.
	New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)
	GetReplicationDevice(oring ring.Ring, dev *ring.Device, policy int, r *Replicator) (ReplicationDevice, error)
}

ObjectEngine is the type you have to give hummingbird to create a new object engine.

func SwiftEngineConstructor

func SwiftEngineConstructor(config conf.Config, policy *conf.Policy, flags *flag.FlagSet) (ObjectEngine, error)

SwiftEngineConstructor creates a SwiftEngine given the object server configs.

type ObjectEngineConstructor

type ObjectEngineConstructor func(conf.Config, *conf.Policy, *flag.FlagSet) (ObjectEngine, error)

ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine

func FindEngine

func FindEngine(name string) (ObjectEngineConstructor, error)

FindEngine returns the registered object engine with the given name.

type ObjectServer

type ObjectServer struct {
	// contains filtered or unexported fields
}

func (*ObjectServer) AcquireDevice

func (server *ObjectServer) AcquireDevice(next http.Handler) http.Handler

func (*ObjectServer) Background added in v0.0.2

func (server *ObjectServer) Background(flags *flag.FlagSet) chan struct{}

func (*ObjectServer) DiskUsageHandler

func (server *ObjectServer) DiskUsageHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) Finalize

func (server *ObjectServer) Finalize()

func (*ObjectServer) GetHandler

func (server *ObjectServer) GetHandler(config conf.Config, metricsPrefix string) http.Handler

func (*ObjectServer) HealthcheckHandler

func (server *ObjectServer) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) LogRequest

func (server *ObjectServer) LogRequest(next http.Handler) http.Handler

func (*ObjectServer) ObjDeleteHandler

func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjGetHandler

func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjPostHandler

func (server *ObjectServer) ObjPostHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ObjPutHandler

func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) OptionsHandler

func (server *ObjectServer) OptionsHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) ReconHandler

func (server *ObjectServer) ReconHandler(writer http.ResponseWriter, request *http.Request)

func (*ObjectServer) Type added in v0.0.2

func (server *ObjectServer) Type() string

type ObjectStabilizer added in v0.0.2

type ObjectStabilizer interface {
	Object
	// Stabilize object- move to stable location / erasure code / do nothing / etc
	Stabilize(ring.Ring, *ring.Device, int) error
	Replicate(PriorityRepJob) error
}

type PolicyHandlerRegistrator added in v0.0.2

type PolicyHandlerRegistrator interface {
	RegisterHandlers(addRoute func(method, path string, handler http.HandlerFunc))
}

type PriorityRepJob

type PriorityRepJob struct {
	Partition  uint64       `json:"partition"`
	FromDevice *ring.Device `json:"from_device"`
	ToDevice   *ring.Device `json:"to_device"`
	Policy     int          `json:"policy"`
}

type PriorityReplicationResult added in v1.2.0

type PriorityReplicationResult struct {
	ObjectsReplicated int64
	ObjectsErrored    int64
	Success           bool
	ErrorMsg          string
}

type RealECAuditFuncs added in v1.2.0

type RealECAuditFuncs struct{}

func (RealECAuditFuncs) AuditNurseryObject added in v1.2.0

func (RealECAuditFuncs) AuditNurseryObject(path string, metabytes []byte, skipMd5 bool) (int64, error)

AuditNurseryObject of indexdb shard, does nothing

func (RealECAuditFuncs) AuditShard added in v1.2.0

func (RealECAuditFuncs) AuditShard(path string, hash string, skipMd5 bool) (int64, error)

AuditShardHash of indexdb shard

type RepConn

type RepConn interface {
	SendMessage(v interface{}) error
	RecvMessage(v interface{}) error
	Write(data []byte) (l int, err error)
	Flush() error
	Read(data []byte) (l int, err error)
	Disconnected() bool
	Close()
}

func NewIncomingRepConn

func NewIncomingRepConn(rw *bufio.ReadWriter, c net.Conn, rcTimeout time.Duration) RepConn

func NewRepConn

func NewRepConn(dev *ring.Device, partition string, policy int, headers map[string]string, certFile, keyFile string, rcTimeout time.Duration) (RepConn, error)

type ReplicationDevice

type ReplicationDevice interface {
	Replicate()
	ReplicateLoop()
	Key() string
	Cancel()
	PriorityReplicate(w http.ResponseWriter, pri PriorityRepJob)
	UpdateStat(string, int64)
}

func GetNurseryDevice added in v1.2.0

func GetNurseryDevice(oring ring.Ring, dev *ring.Device, policy int, r *Replicator, f NurseryObjectEngine) (ReplicationDevice, error)

type Replicator

type Replicator struct {
	CertFile string
	KeyFile  string
	// contains filtered or unexported fields
}

Object replicator daemon object

func (*Replicator) Background added in v0.0.2

func (server *Replicator) Background(flags *flag.FlagSet) chan struct{}

func (*Replicator) Finalize added in v0.0.2

func (server *Replicator) Finalize()

func (*Replicator) GetHandler

func (r *Replicator) GetHandler(config conf.Config, metricsPrefix string) http.Handler

func (*Replicator) HealthcheckHandler added in v0.0.2

func (server *Replicator) HealthcheckHandler(writer http.ResponseWriter, request *http.Request)

func (*Replicator) LogRequest

func (r *Replicator) LogRequest(next http.Handler) http.Handler

func (*Replicator) ProgressReportHandler

func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request)

ProgressReportHandler handles HTTP requests for current replication progress

func (*Replicator) Run

func (r *Replicator) Run()

Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl)

func (*Replicator) RunForever

func (r *Replicator) RunForever()

Run replication passes in a loop until forever.

func (*Replicator) Type added in v0.0.2

func (server *Replicator) Type() string

type SwiftEngine

type SwiftEngine struct {
	// contains filtered or unexported fields
}

func (*SwiftEngine) GetReplicationDevice added in v1.2.0

func (f *SwiftEngine) GetReplicationDevice(oring ring.Ring, dev *ring.Device, policy int, r *Replicator) (ReplicationDevice, error)

func (*SwiftEngine) New

func (f *SwiftEngine) New(vars map[string]string, needData bool, asyncWG *sync.WaitGroup) (Object, error)

New returns an instance of SwiftObject with the given parameters. Metadata is read in and if needData is true, the file is opened. AsyncWG is a waitgroup if the object spawns any async operations

type SwiftObject

type SwiftObject struct {
	// contains filtered or unexported fields
}

SwiftObject implements an Object that is compatible with Swift's object server.

func (*SwiftObject) Close

func (o *SwiftObject) Close() error

Close releases any resources used by the instance of SwiftObject

func (*SwiftObject) Commit

func (o *SwiftObject) Commit(metadata map[string]string) error

Commit commits an open data file to disk, given the metadata.

func (*SwiftObject) CommitMetadata added in v0.0.2

func (o *SwiftObject) CommitMetadata(metadata map[string]string) error

func (*SwiftObject) ContentLength

func (o *SwiftObject) ContentLength() int64

ContentLength parses and returns the Content-Length for the object.

func (*SwiftObject) Copy

func (o *SwiftObject) Copy(dsts ...io.Writer) (written int64, err error)

Copy copies all data from the underlying .data file to the given writers.

func (*SwiftObject) CopyRange

func (o *SwiftObject) CopyRange(w io.Writer, start int64, end int64) (int64, error)

CopyRange copies data in the range of start to end from the underlying .data file to the writer.

func (*SwiftObject) Delete

func (o *SwiftObject) Delete(metadata map[string]string) error

Delete deletes the object.

func (*SwiftObject) Exists

func (o *SwiftObject) Exists() bool

Exists returns true if the object exists, that is if it has a .data file.

func (*SwiftObject) Metadata

func (o *SwiftObject) Metadata() map[string]string

Metadata returns the object's metadata.

func (*SwiftObject) Quarantine

func (o *SwiftObject) Quarantine() error

Quarantine removes the object's underlying files to the Quarantined directory on the device.

func (*SwiftObject) Repr

func (o *SwiftObject) Repr() string

Repr returns a string that identifies the object in some useful way, used for logging.

func (*SwiftObject) SetData

func (o *SwiftObject) SetData(size int64) (io.Writer, error)

SetData is called to set the object's data. It takes a size (if available, otherwise set to zero).

type SyncFileRequest

type SyncFileRequest struct {
	Path   string
	Xattrs string
	Size   int64
	Check  bool
	Ping   bool
	Done   bool
}

type SyncFileResponse

type SyncFileResponse struct {
	Exists      bool
	NewerExists bool
	GoAhead     bool
	Msg         string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL