access

package
v3.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: Apache-2.0, Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// HashSize dummy hash size
	HashSize = 0

	// MaxLocationBlobs max blobs length in Location
	MaxLocationBlobs int = 4
	// MaxDeleteLocations max locations of delete request
	MaxDeleteLocations int = 1024
	// MaxBlobSize max blob size for allocation
	MaxBlobSize uint32 = 1 << 25 // 32MB
)
View Source
const (
	GetShardModeRandom = GetShardMode(iota)
	GetShardModeLeader
)

Variables

View Source
var (
	ErrInvalidLengthStreamBlob        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowStreamBlob          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupStreamBlob = fmt.Errorf("proto: unexpected end of group")
)
View Source
var ClientWithReqidContext = withReqidContext

Functions

func WithRequestID

func WithRequestID(ctx context.Context, rid interface{}) context.Context

WithRequestID trace request id in full life of the request The second parameter rid could be the one of type below:

a string,
an interface { String() string },
an interface { TraceID() string },
an interface { RequestID() string },

Types

type API

type API interface {
	// Put object once if size is not greater than MaxSizePutOnce, otherwise put blobs one by one.
	// return a location and map of hash summary bytes you excepted.
	//
	// If PutArgs' body is of type *bytes.Buffer, *bytes.Reader, or *strings.Reader,
	// GetBody is populated, then the Put once request has retry ability.
	Put(ctx context.Context, args *PutArgs) (location proto.Location, hashSumMap HashSumMap, err error)
	// Get object, range is supported.
	Get(ctx context.Context, args *GetArgs) (body io.ReadCloser, err error)
	// Delete all blobs in these locations.
	//
	// Returns:
	// - (nil, nil): all blobs deleted successfully.
	// - (nil, ErrIllegalArguments): when args is invalid.
	// - (failedLocations, err): returns the list of locations that have not yet been deleted.
	Delete(ctx context.Context, args *DeleteArgs) (failedLocations []proto.Location, err error)
}

API access api for s3 To trace request id, the ctx is better WithRequestID(ctx, rid).

func New

func New(cfg Config) (API, error)

New returns an access API

type AllocArgs

type AllocArgs struct {
	Size            uint64            `json:"size"`
	BlobSize        uint32            `json:"blob_size"`
	AssignClusterID proto.ClusterID   `json:"assign_cluster_id"`
	CodeMode        codemode.CodeMode `json:"code_mode"`
}

AllocArgs for service /alloc

func (*AllocArgs) IsValid

func (args *AllocArgs) IsValid() bool

IsValid is valid alloc args

type AllocResp

type AllocResp struct {
	Location proto.Location `json:"location"`
	Tokens   []string       `json:"tokens"`
}

AllocResp alloc response result with tokens if size mod blobsize == 0, length of tokens equal length of location blobs otherwise additional token for the last blob uploading

type AllocSliceArgs

type AllocSliceArgs struct {
	ClusterID proto.ClusterID
	CodeMode  codemode.CodeMode
	BlobName  string
	Size      uint64
	FailSlice proto.Slice
}

func (*AllocSliceArgs) IsValid

func (args *AllocSliceArgs) IsValid() bool

type Client

type Client interface {
	API
	ListBlob(ctx context.Context, args *ListBlobArgs) (shardnode.ListBlobRet, error)
	GetBlob(ctx context.Context, args *GetBlobArgs) (io.ReadCloser, error)
	DeleteBlob(ctx context.Context, args *DelBlobArgs) error
	PutBlob(ctx context.Context, args *PutBlobArgs) (proto.ClusterID, HashSumMap, error)
}

type Config

type Config struct {
	// ConnMode rpc connection timeout setting
	ConnMode RPCConnectMode `json:"connection_mode"`
	// ClientTimeoutMs the whole request and response timeout
	ClientTimeoutMs int64 `json:"client_timeout_ms"`
	// BodyBandwidthMBPs reading body timeout, request or response
	//   timeout = ContentLength/BodyBandwidthMBPs + BodyBaseTimeoutMs
	BodyBandwidthMBPs float64 `json:"body_bandwidth_mbps"`
	// BodyBaseTimeoutMs base timeout for read body
	BodyBaseTimeoutMs int64 `json:"body_base_timeout_ms"`

	// Consul is consul config for discovering service
	Consul ConsulConfig `json:"consul"`
	// ServiceIntervalS is interval seconds for discovering service hosts,
	//   at least 5 seconds and default is 5 minutes.
	ServiceIntervalS int `json:"service_interval_s"`
	// PriorityAddrs priority addrs of access service when retry
	PriorityAddrs []string `json:"priority_addrs"`
	// MaxSizePutOnce max size using once-put object interface, default is 256MB.
	MaxSizePutOnce int64 `json:"max_size_put_once"`
	// MaxPartRetry max retry times when putting one part, 0 means forever
	MaxPartRetry int `json:"max_part_retry"`
	// MaxHostRetry max retry hosts of access, default all hosts.
	MaxHostRetry int `json:"max_host_retry"`
	// PartConcurrence concurrence of put parts
	PartConcurrence int `json:"part_concurrence"`

	// rpc selector config
	// Failure retry interval, default value is 300s,
	// if FailRetryIntervalS < 0, remove failed hosts will not work.
	FailRetryIntervalS int `json:"fail_retry_interval_s"`
	// Within MaxFailsPeriodS, if the number of failures is greater than or equal to MaxFails,
	// the host is considered disconnected.
	MaxFailsPeriodS int `json:"max_fails_period_s"`
	// HostTryTimes Number of host failure retries
	HostTryTimes int `json:"host_try_times"`

	// RPCConfig user-defined rpc config
	// All connections will use the config if it's not nil
	// ConnMode will be ignored if rpc config is setting
	RPCConfig *rpc.Config `json:"rpc_config"`

	// LogLevel client output logging level.
	LogLevel log.Level `json:"log_level"`

	// Logger trace all logging to the logger if setting.
	// It is an io.WriteCloser that writes to the specified filename.
	// YOU should CLOSE it after you do not use the client anymore.
	Logger *Logger `json:"logger"`
}

Config access client config

type ConsulConfig

type ConsulConfig = api.Config

ConsulConfig alias of consul api.Config Fixup: client and sdk using the same config type

type CreateBlobArgs

type CreateBlobArgs struct {
	ClusterID proto.ClusterID
	CodeMode  codemode.CodeMode
	BlobName  string
	Size      uint64
	SliceSize uint32
}

func (*CreateBlobArgs) IsValid

func (args *CreateBlobArgs) IsValid() bool

type CreateBlobRet

type CreateBlobRet struct {
	Location proto.Location
}

type DelBlobArgs

type DelBlobArgs struct {
	ClusterID proto.ClusterID
	BlobName  string
}

func (*DelBlobArgs) IsValid

func (args *DelBlobArgs) IsValid() bool

type DeleteArgs

type DeleteArgs struct {
	Locations []proto.Location `json:"locations"`
}

DeleteArgs for service /delete

func (*DeleteArgs) IsValid

func (args *DeleteArgs) IsValid() bool

IsValid is valid delete args

type DeleteBlobArgs

type DeleteBlobArgs struct {
	ClusterID proto.ClusterID `json:"clusterid"`
	Vid       proto.Vid       `json:"volumeid"`
	BlobID    proto.BlobID    `json:"blobid"`
	Size      int64           `json:"size"`
	Token     string          `json:"token"`
}

DeleteBlobArgs for service /deleteblob

func (*DeleteBlobArgs) IsValid

func (args *DeleteBlobArgs) IsValid() bool

IsValid is valid delete blob args

type DeleteResp

type DeleteResp struct {
	FailedLocations []proto.Location `json:"failed_locations,omitempty"`
}

DeleteResp delete response with failed locations

type GetArgs

type GetArgs struct {
	Location proto.Location `json:"location"`
	Offset   uint64         `json:"offset"`
	ReadSize uint64         `json:"read_size"`
	Writer   io.Writer      `json:"-"`
}

GetArgs for service /get

func (*GetArgs) IsValid

func (args *GetArgs) IsValid() bool

IsValid is valid get args

type GetBlobArgs

type GetBlobArgs struct {
	ClusterID proto.ClusterID
	Mode      GetShardMode
	BlobName  string

	Offset   uint64
	ReadSize uint64
	Writer   io.Writer
}

func (*GetBlobArgs) IsValid

func (args *GetBlobArgs) IsValid() bool

IsValid is valid get args

type GetShardCommonArgs

type GetShardCommonArgs struct {
	ClusterID proto.ClusterID
	ShardID   proto.ShardID
	Mode      GetShardMode
	BlobName  string
}

type GetShardMode

type GetShardMode int

type HashAlgorithm

type HashAlgorithm uint8

HashAlgorithm hash.Hash algorithm when uploading data

const (
	HashAlgDummy  HashAlgorithm = 1 << iota
	HashAlgCRC32                // crc32 with IEEE
	HashAlgMD5                  // md5
	HashAlgSHA1                 // sha1
	HashAlgSHA256               // sha256
)

defined hash algorithm

func (HashAlgorithm) ToHashSumMap

func (alg HashAlgorithm) ToHashSumMap() HashSumMap

ToHashSumMap returns a new HashSumMap, decode from rpc url argument

func (HashAlgorithm) ToHasher

func (alg HashAlgorithm) ToHasher() hash.Hash

ToHasher returns a new hash.Hash computing checksum the value of algorithm should be one of HashAlg*

type HashSumMap

type HashSumMap map[HashAlgorithm][]byte

HashSumMap save checksum in rpc calls

func (HashSumMap) All

func (h HashSumMap) All() map[string]interface{}

All returns readable checksum

func (HashSumMap) GetSum

func (h HashSumMap) GetSum(key HashAlgorithm) (interface{}, bool)

GetSum get checksum value and ok via HashAlgorithm

HashAlgDummy  returns nil, bool
HashAlgCRC32  returns uint32, bool
HashAlgMD5    returns string(32), bool
HashAlgSHA1   returns string(40), bool
HashAlgSHA256 returns string(64), bool

func (HashSumMap) GetSumVal

func (h HashSumMap) GetSumVal(key HashAlgorithm) interface{}

GetSumVal get checksum only value via HashAlgorithm

func (HashSumMap) ToHashAlgorithm

func (h HashSumMap) ToHashAlgorithm() HashAlgorithm

ToHashAlgorithm returns HashAlgorithm, encode to rpc url argument

type HasherMap

type HasherMap map[HashAlgorithm]hash.Hash

HasherMap map hasher of HashAlgorithm

func (HasherMap) ToHashAlgorithm

func (h HasherMap) ToHashAlgorithm() HashAlgorithm

ToHashAlgorithm returns HashAlgorithm

func (HasherMap) ToWriter

func (h HasherMap) ToWriter() io.Writer

ToWriter returns io writer

type ListBlobArgs

type ListBlobArgs struct {
	ClusterID proto.ClusterID
	ShardID   proto.ShardID
	Mode      GetShardMode
	Prefix    string
	Marker    string
	Count     uint64
}

func (*ListBlobArgs) IsValid

func (args *ListBlobArgs) IsValid() bool

type ListBlobEncodeMarker

type ListBlobEncodeMarker struct {
	Range                sharding.Range `protobuf:"bytes,1,opt,name=range,proto3" json:"range"`
	Marker               string         `protobuf:"bytes,2,opt,name=marker,proto3" json:"marker,omitempty"`
	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
	XXX_unrecognized     []byte         `json:"-"`
	XXX_sizecache        int32          `json:"-"`
}

func (*ListBlobEncodeMarker) Descriptor

func (*ListBlobEncodeMarker) Descriptor() ([]byte, []int)

func (*ListBlobEncodeMarker) GetMarker

func (m *ListBlobEncodeMarker) GetMarker() string

func (*ListBlobEncodeMarker) GetRange

func (m *ListBlobEncodeMarker) GetRange() sharding.Range

func (*ListBlobEncodeMarker) Marshal

func (m *ListBlobEncodeMarker) Marshal() (dAtA []byte, err error)

func (*ListBlobEncodeMarker) MarshalTo

func (m *ListBlobEncodeMarker) MarshalTo(dAtA []byte) (int, error)

func (*ListBlobEncodeMarker) MarshalToSizedBuffer

func (m *ListBlobEncodeMarker) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ListBlobEncodeMarker) MarshalToString

func (args *ListBlobEncodeMarker) MarshalToString() (string, error)

func (*ListBlobEncodeMarker) ProtoMessage

func (*ListBlobEncodeMarker) ProtoMessage()

func (*ListBlobEncodeMarker) Reset

func (m *ListBlobEncodeMarker) Reset()

func (*ListBlobEncodeMarker) Size

func (m *ListBlobEncodeMarker) Size() (n int)

func (*ListBlobEncodeMarker) String

func (m *ListBlobEncodeMarker) String() string

func (*ListBlobEncodeMarker) Unmarshal

func (m *ListBlobEncodeMarker) Unmarshal(dAtA []byte) error

func (*ListBlobEncodeMarker) UnmarshalFromString

func (args *ListBlobEncodeMarker) UnmarshalFromString(marker string) error

func (*ListBlobEncodeMarker) XXX_DiscardUnknown

func (m *ListBlobEncodeMarker) XXX_DiscardUnknown()

func (*ListBlobEncodeMarker) XXX_Marshal

func (m *ListBlobEncodeMarker) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ListBlobEncodeMarker) XXX_Merge

func (m *ListBlobEncodeMarker) XXX_Merge(src proto.Message)

func (*ListBlobEncodeMarker) XXX_Size

func (m *ListBlobEncodeMarker) XXX_Size() int

func (*ListBlobEncodeMarker) XXX_Unmarshal

func (m *ListBlobEncodeMarker) XXX_Unmarshal(b []byte) error

type Logger

type Logger = lumberjack.Logger

Logger alias of lumberjack Logger See more at: https://github.com/natefinch/lumberjack

type PutArgs

type PutArgs struct {
	Size   int64         `json:"size"`
	Hashes HashAlgorithm `json:"hashes,omitempty"`
	Body   io.Reader     `json:"-"`

	// GetBody defines an optional func to return a new copy of Body.
	// It is used for client requests when a redirect requires reading
	// the body more than once. Use of GetBody still requires setting Body.
	//
	// There force reset request.GetBody if it is setting.
	GetBody func() (io.ReadCloser, error) `json:"-"`
}

PutArgs for service /put Hashes means how to calculate check sum, HashAlgCRC32 | HashAlgMD5 equal 2 + 4 = 6

func (*PutArgs) IsValid

func (args *PutArgs) IsValid() bool

IsValid is valid put args

type PutAtArgs

type PutAtArgs struct {
	ClusterID proto.ClusterID `json:"clusterid"`
	Vid       proto.Vid       `json:"volumeid"`
	BlobID    proto.BlobID    `json:"blobid"`
	Size      int64           `json:"size"`
	Hashes    HashAlgorithm   `json:"hashes,omitempty"`
	Token     string          `json:"token"`
	Body      io.Reader       `json:"-"`
}

PutAtArgs for service /putat

func (*PutAtArgs) IsValid

func (args *PutAtArgs) IsValid() bool

IsValid is valid putat args

type PutAtResp

type PutAtResp struct {
	HashSumMap HashSumMap `json:"hashsum"`
}

PutAtResp putat response result

type PutBlobArgs

type PutBlobArgs struct {
	CodeMode codemode.CodeMode
	BlobName string
	NeedSeal bool

	Size   uint64
	Hashes HashAlgorithm
	Body   io.Reader
}

func (*PutBlobArgs) IsValid

func (args *PutBlobArgs) IsValid() bool

type PutResp

type PutResp struct {
	Location   proto.Location `json:"location"`
	HashSumMap HashSumMap     `json:"hashsum,omitempty"`
}

PutResp put response result

type RPCConnectMode

type RPCConnectMode uint8

RPCConnectMode self-defined rpc client connection config setting

const (
	DefaultConnMode RPCConnectMode = iota
	QuickConnMode
	GeneralConnMode
	SlowConnMode
	NoLimitConnMode
)

timeout: [short - - - - - - - - -> long] ----- quick --> general --> default --> slow --> nolimit speed: 40MB --> 20MB --> 10MB --> 4MB --> nolimit

type SealBlobArgs

type SealBlobArgs struct {
	ClusterID proto.ClusterID
	BlobName  string
	Size      uint64
	Slices    []proto.Slice
}

func (*SealBlobArgs) IsValid

func (args *SealBlobArgs) IsValid() bool

type SignArgs

type SignArgs struct {
	Locations []proto.Location `json:"locations"`
	Location  proto.Location   `json:"location"`
}

SignArgs for service /sign Locations are signed location getting from /alloc Location is to be signed location which merged by yourself

func (*SignArgs) IsValid

func (args *SignArgs) IsValid() bool

IsValid is valid sign args

type SignResp

type SignResp struct {
	Location proto.Location `json:"location"`
}

SignResp sign response location with crc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL