Documentation ¶
Index ¶
- func NewHandler(service *Service) *rpc.Router
- type CodeModePair
- type CodeModePairs
- type Config
- type Handler
- func (h *Handler) Admin() interface{}
- func (h *Handler) Alloc(ctx context.Context, size uint64, blobSize uint32, ...) (*access.Location, error)
- func (h *Handler) Delete(ctx context.Context, location *access.Location) error
- func (h *Handler) Get(ctx context.Context, w io.Writer, location access.Location, ...) (func() error, error)
- func (h *Handler) Put(ctx context.Context, rc io.Reader, size int64, hasherMap access.HasherMap) (*access.Location, error)
- func (h *Handler) PutAt(ctx context.Context, rc io.Reader, clusterID proto.ClusterID, vid proto.Vid, ...) error
- type LimitConfig
- type Limiter
- type Reader
- type Service
- func (s *Service) Alloc(c *rpc.Context)
- func (s *Service) Close()
- func (s *Service) Delete(c *rpc.Context)
- func (s *Service) DeleteBlob(c *rpc.Context)
- func (s *Service) Get(c *rpc.Context)
- func (s *Service) Limit(c *rpc.Context)
- func (s *Service) Put(c *rpc.Context)
- func (s *Service) PutAt(c *rpc.Context)
- func (s *Service) RegisterAdminHandler()
- func (s *Service) RegisterService()
- func (s *Service) Sign(c *rpc.Context)
- type Status
- type StreamConfig
- type StreamHandler
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewHandler ¶
NewHandler returns app server handler
Types ¶
type CodeModePair ¶
CodeModePair codemode with pair of tactic and policy
type CodeModePairs ¶
type CodeModePairs map[codemode.CodeMode]CodeModePair
CodeModePairs map of CodeModePair
func (CodeModePairs) SelectCodeMode ¶
func (c CodeModePairs) SelectCodeMode(size int64) codemode.CodeMode
SelectCodeMode select codemode by size
type Config ¶
type Config struct { cmd.Config ServiceRegister consul.Config `json:"service_register"` Stream StreamConfig `json:"stream"` Limit LimitConfig `json:"limit"` }
Config service configs
type Handler ¶
type Handler struct { StreamConfig // contains filtered or unexported fields }
Handler stream handler
func (*Handler) Admin ¶
func (h *Handler) Admin() interface{}
Admin returns internal admin interface.
func (*Handler) Alloc ¶
func (h *Handler) Alloc(ctx context.Context, size uint64, blobSize uint32, assignClusterID proto.ClusterID, codeMode codemode.CodeMode) (*access.Location, error)
Alloc access interface /alloc
required: size, file size optional: blobSize > 0, alloc with blobSize assignClusterID > 0, assign to alloc in this cluster certainly codeMode > 0, alloc in this codemode return: a location of file
func (*Handler) Get ¶
func (h *Handler) Get(ctx context.Context, w io.Writer, location access.Location, readSize, offset uint64) (func() error, error)
Get read file
required: location, readSize optional: offset(default is 0) first return value is data transfer to copy data after argument checking Read data shards firstly, if blob size is small or read few bytes then ec reconstruct-read, try to reconstruct from N+X to N+M Just read essential bytes in each shard when reconstruct-read. sorted N+X is, such as we use mode EC6P10L2, X=2 and Read from idc=2 shards like this data N 6 | parity M 10 | local L 2 d1 d2 d3 d4 d5 d6 p1 .. p5 p6 .. p10 l1 l2 idc 1 1 1 2 2 2 1 2 1 2
sorted d4 d5 d6 p6 .. p10 d1 d2 d3 p1 .. p5 read-1 [d4 p10] read-2 [d4 p10 d1] read-3 [d4 p10 d1 d2] ... read-9 [d4 p5] failed
func (*Handler) Put ¶
func (h *Handler) Put(ctx context.Context, rc io.Reader, size int64, hasherMap access.HasherMap) (*access.Location, error)
Put put one object
required: size, file size optional: hasher map to calculate hash.Hash
func (*Handler) PutAt ¶
func (h *Handler) PutAt(ctx context.Context, rc io.Reader, clusterID proto.ClusterID, vid proto.Vid, bid proto.BlobID, size int64, hasherMap access.HasherMap) error
PutAt access interface /putat, put one blob
required: rc file reader required: clusterID VolumeID BlobID required: size, one blob size optional: hasherMap, computing hash
type LimitConfig ¶
type LimitConfig struct { NameRps map[string]int `json:"name_rps"` // request with name n/s ReaderMBps int `json:"reader_mbps"` // read with MB/s WriterMBps int `json:"writer_mbps"` // write with MB/s }
LimitConfig configuration of limiter
type Limiter ¶
type Limiter interface { // Acquire acquire with one request per second Acquire(name string) error // Release release of one request per second Release(name string) // Reader return io.Reader with bandwidth rate limit Reader(ctx context.Context, r io.Reader) io.Reader // Writer return io.Writer with bandwidth rate limit Writer(ctx context.Context, w io.Writer) io.Writer // Status returns running status // TODO: calculate rate limit wait concurrent Status() Status }
Limiter rps and bps limiter
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader limited reader
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service rpc service
func (*Service) DeleteBlob ¶
DeleteBlob delete one blob
func (*Service) RegisterAdminHandler ¶
func (s *Service) RegisterAdminHandler()
RegisterAdminHandler register admin handler to profile
func (*Service) RegisterService ¶
func (s *Service) RegisterService()
RegisterService register service to rpc
type Status ¶
type Status struct { Config LimitConfig `json:"config"` // configuration status Running map[string]int `json:"running"` // running request ReadWait int `json:"read_wait"` // wait reading duration WriteWait int `json:"write_wait"` // wait writing duration }
Status running status
type StreamConfig ¶
type StreamConfig struct { IDC string `json:"idc"` MaxBlobSize uint32 `json:"max_blob_size"` DiskPunishIntervalS int `json:"disk_punish_interval_s"` DiskTimeoutPunishIntervalS int `json:"disk_timeout_punish_interval_s"` ServicePunishIntervalS int `json:"service_punish_interval_s"` AllocRetryTimes int `json:"alloc_retry_times"` AllocRetryIntervalMS int `json:"alloc_retry_interval_ms"` EncoderEnableVerify bool `json:"encoder_enableverify"` EncoderConcurrency int `json:"encoder_concurrency"` MinReadShardsX int `json:"min_read_shards_x"` ShardCrcDisabled bool `json:"shard_crc_disabled"` MemPoolSizeClasses map[int]int `json:"mem_pool_size_classes"` // CodeModesPutQuorums // just for one AZ is down, cant write quorum in all AZs CodeModesPutQuorums map[codemode.CodeMode]int `json:"code_mode_put_quorums"` ClusterConfig controller.ClusterConfig `json:"cluster_config"` BlobnodeConfig blobnode.Config `json:"blobnode_config"` ProxyConfig proxy.Config `json:"proxy_config"` // hystrix command config AllocCommandConfig hystrix.CommandConfig `json:"alloc_command_config"` RWCommandConfig hystrix.CommandConfig `json:"rw_command_config"` }
StreamConfig access stream handler config
type StreamHandler ¶
type StreamHandler interface { // Alloc access interface /alloc // required: size, file size // optional: blobSize > 0, alloc with blobSize // assignClusterID > 0, assign to alloc in this cluster certainly // codeMode > 0, alloc in this codemode // return: a location of file Alloc(ctx context.Context, size uint64, blobSize uint32, assignClusterID proto.ClusterID, codeMode codemode.CodeMode) (*access.Location, error) // PutAt access interface /putat, put one blob // required: rc file reader // required: clusterID VolumeID BlobID // required: size, one blob size // optional: hasherMap, computing hash PutAt(ctx context.Context, rc io.Reader, clusterID proto.ClusterID, vid proto.Vid, bid proto.BlobID, size int64, hasherMap access.HasherMap) error // Put put one object // required: size, file size // optional: hasher map to calculate hash.Hash Put(ctx context.Context, rc io.Reader, size int64, hasherMap access.HasherMap) (*access.Location, error) // Get read file // required: location, readSize // optional: offset(default is 0) // // first return value is data transfer to copy data after argument checking // // Read data shards firstly, if blob size is small or read few bytes // then ec reconstruct-read, try to reconstruct from N+X to N+M // // sorted N+X is, such as we use mode EC6P10L2, X=2 and Read from idc=2 // shards like this // data N 6 | parity M 10 | local L 2 // d1 d2 d3 d4 d5 d6 p1 .. p5 p6 .. p10 l1 l2 // idc 1 1 1 2 2 2 1 2 1 2 // //sorted d4 d5 d6 p6 .. p10 d1 d2 d3 p1 .. p5 //read-1 [d4 p10] //read-2 [d4 p10 d1] //read-3 [d4 p10 d1 d2] //... //read-9 [d4 p5] //failed Get(ctx context.Context, w io.Writer, location access.Location, readSize, offset uint64) (func() error, error) // Delete delete all blobs in this location Delete(ctx context.Context, location *access.Location) error // Admin returns internal admin interface. Admin() interface{} }
StreamHandler stream http handler
func NewStreamHandler ¶
func NewStreamHandler(cfg *StreamConfig, stopCh <-chan struct{}) StreamHandler
NewStreamHandler returns a stream handler