cache

package
v0.0.0-...-50c0b52 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2026 License: AGPL-3.0 Imports: 58 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SourceModeJuiceFS    string = "juicefs"
	SourceModeMountPoint string = "mountpoint"
)
View Source
const (
	HostPrefix string = "cache-host"
	Version    string = "dev"
)

Variables

View Source
var (
	BufferSize1MB  = 1 * 1024 * 1024
	BufferSize4MB  = 4 * 1024 * 1024
	BufferSize16MB = 16 * 1024 * 1024
)

Standard buffer sizes aligned with typical chunk sizes

View Source
var (
	ErrCoordinatorUnavailable  = errors.New("cache coordinator unavailable")
	ErrInvalidHostRegistration = errors.New("invalid cache host registration")
)
View Source
var (
	ErrHostNotFound            = errors.New("host not found")
	ErrUnableToReachHost       = errors.New("unable to reach host")
	ErrInvalidHostVersion      = errors.New("invalid host version")
	ErrContentNotFound         = errors.New("content not found")
	ErrClientNotFound          = errors.New("client not found")
	ErrCacheLockHeld           = errors.New("cache lock held")
	ErrUnableToPopulateContent = errors.New("unable to populate content from original source")
	ErrFSMountFailure          = errors.New("failed to mount cachefs")
	ErrUnableToAcquireLock     = errors.New("unable to acquire lock")
)
View Source
var (
	ErrChannelClosed    = errors.New("redis: channel closed")
	ErrConnectionIssue  = errors.New("redis: connection issue")
	ErrUnknownRedisMode = errors.New("redis: unknown mode")
)
View Source
var (
	Logger *logger
)
View Source
var MetadataKeys = &metadataKeys{}

Functions

func DialWithTimeout

func DialWithTimeout(ctx context.Context, addr string) (net.Conn, error)

func GenerateFsID

func GenerateFsID(name string) string

Generates a directory ID based on parent ID and name.

func GetLogger

func GetLogger() *logger

func GetPrivateIpAddr

func GetPrivateIpAddr() (string, error)

func GetPublicIpAddr

func GetPublicIpAddr() (string, error)

func InitLogger

func InitLogger(debugMode bool, prettyLogs bool)

func Mount

func Mount(ctx context.Context, opts FSSystemOpts) (func() error, <-chan error, *fuse.Server, error)

func SHA1StringToUint64

func SHA1StringToUint64(hash string) (uint64, error)

SHA1StringToUint64 converts the first 8 bytes of a SHA-1 hash string to a uint64

func ToSlice

func ToSlice(v interface{}) []interface{}

Flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.

func ToStruct

func ToStruct(m map[string]string, out interface{}) error

Copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.

func WithClientName

func WithClientName(name string) func(*redis.UniversalOptions)

Types

type BufferPool

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool provides a pool of reusable byte slices to reduce allocations Optimized for 1-4MB chunks as recommended in the optimization plan

func NewBufferPool

func NewBufferPool() *BufferPool

NewBufferPool creates a new buffer pool with predefined size buckets

func (*BufferPool) Get

func (bp *BufferPool) Get(size int) []byte

Get retrieves a buffer of at least the requested size

func (*BufferPool) Put

func (bp *BufferPool) Put(buf []byte)

Put returns a buffer to the pool

type CacheFS

type CacheFS struct {
	MetadataStore CacheMetadataStore
	Client        *Client
	Config        ClientConfig
	// contains filtered or unexported fields
}

func NewFileSystem

func NewFileSystem(ctx context.Context, opts FSSystemOpts) (*CacheFS, error)

NewFileSystem initializes a new CacheFS with root metadata.

func (*CacheFS) Root

func (bfs *CacheFS) Root() (fs.InodeEmbedder, error)

type CacheFSNode

type CacheFSNode struct {
	Path     string
	ID       string
	PID      string
	Name     string
	Target   string
	Hash     string
	Attr     fuse.Attr
	Prefetch *bool
}

type CacheMetadataStore

type CacheMetadataStore interface {
	SetClientLock(ctx context.Context, hash string, host string) error
	RemoveClientLock(ctx context.Context, hash string, host string) error
	SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
	RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
	RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
	SetFsNode(ctx context.Context, id string, metadata *FSMetadata) error
	GetFsNode(ctx context.Context, id string) (*FSMetadata, error)
	RemoveFsNode(ctx context.Context, id string) error
	RemoveFsNodeChild(ctx context.Context, pid, id string) error
	GetFsNodeChildren(ctx context.Context, id string) ([]*FSMetadata, error)
	AddFsNodeChild(ctx context.Context, pid, id string) error
}

func NewRedisCacheMetadataStore

func NewRedisCacheMetadataStore(_ GlobalConfig, serverConfig ServerConfig) (CacheMetadataStore, error)

func NewRedisCacheMetadataStoreWithClient

func NewRedisCacheMetadataStoreWithClient(_ GlobalConfig, _ ServerConfig, client redis.UniversalClient) CacheMetadataStore

type CacheMetrics

type CacheMetrics struct {
	DiskCacheUsageMB  *metrics.Histogram
	DiskCacheUsagePct *metrics.Histogram
	MemCacheUsageMB   *metrics.Histogram
	MemCacheUsagePct  *metrics.Histogram

	// Cache tier hit ratios
	L0HitRatio  *metrics.Histogram // In-memory cache hits
	L1HitRatio  *metrics.Histogram // Disk cache hits
	L2MissRatio *metrics.Histogram // Remote fetch required

	// Operation counters
	L0Hits     *metrics.Counter
	L1Hits     *metrics.Counter
	L2Misses   *metrics.Counter
	TotalReads *metrics.Counter

	// Bytes served per tier
	L0BytesServed  *metrics.Counter
	L1BytesServed  *metrics.Counter
	L2BytesFetched *metrics.Counter

	// FUSE operation latencies
	FUSEReadLatency    *metrics.Histogram
	FUSELookupLatency  *metrics.Histogram
	FUSEGetattrLatency *metrics.Histogram

	// Read throughput
	ReadThroughputMBps *metrics.Histogram
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context, cfg Config) (*Client, error)

func NewClientWithHostDirectory

func NewClientWithHostDirectory(ctx context.Context, cfg Config, metadataStore CacheMetadataStore, hostDirectory HostDirectory, locality string) (*Client, error)

func NewClientWithMetadataStore

func NewClientWithMetadataStore(ctx context.Context, cfg Config, metadataStore CacheMetadataStore, locality string) (*Client, error)

func (*Client) AttachLocalServer

func (c *Client) AttachLocalServer(server *Server)

func (*Client) CacheFSMetadata

func (c *Client) CacheFSMetadata(ctx context.Context, path string) (*FSMetadata, error)

CacheFSMetadata resolves a cachefs path to its content metadata without going through the cachefs FUSE mount.

func (*Client) Cleanup

func (c *Client) Cleanup() error

func (*Client) ClientLocalPageFileView

func (c *Client) ClientLocalPageFileView(hash string, offset int64, length int64, opts ClientOptions) (path string, pageOffset int64, n int, ok bool, err error)

func (*Client) ClientLocalPageFileViews

func (c *Client) ClientLocalPageFileViews(hash string, offset int64, length int64, opts ClientOptions) (views []ClientLocalPageFileView, err error)

func (*Client) DetachLocalServer

func (c *Client) DetachLocalServer(hostID string)

func (*Client) GetContent

func (c *Client) GetContent(hash string, offset int64, length int64, opts struct {
	RoutingKey string
}) ([]byte, error)

func (*Client) GetContentStream

func (c *Client) GetContentStream(hash string, offset int64, length int64, opts struct {
	RoutingKey string
}) (chan []byte, error)

func (*Client) GetNearbyHosts

func (c *Client) GetNearbyHosts() ([]*Host, error)

func (*Client) GetState

func (c *Client) GetState() error

func (*Client) HostsAvailable

func (c *Client) HostsAvailable() bool

func (*Client) IsCachedNearby

func (c *Client) IsCachedNearby(hash string, routingKey string) (bool, error)

func (*Client) IsPathCachedNearby

func (c *Client) IsPathCachedNearby(ctx context.Context, path string) bool

func (*Client) ReadContentInto

func (c *Client) ReadContentInto(ctx context.Context, hash string, offset int64, dst []byte, opts ClientOptions) (read int64, err error)

func (*Client) StoreContent

func (c *Client) StoreContent(chunks chan []byte, hash string, opts struct {
	RoutingKey string
}) (string, error)

func (*Client) StoreContentAtPath

func (c *Client) StoreContentAtPath(content []byte, cachePath string, opts StoreContentOptions) (string, error)

func (*Client) StoreContentFromFUSE

func (c *Client) StoreContentFromFUSE(source struct {
	Path string
}, opts struct {
	RoutingKey string
	Lock       bool
}) (string, error)

func (*Client) StoreContentFromLocalFile

func (c *Client) StoreContentFromLocalFile(source LocalContentSource, opts StoreContentOptions) (string, error)

StoreContentFromLocalFile streams a caller-local file to the selected cache host.

func (*Client) StoreContentFromLocalPath

func (c *Client) StoreContentFromLocalPath(source struct {
	Path      string
	CachePath string
}, opts struct {
	RoutingKey string
	Lock       bool
}) (string, error)

func (*Client) StoreContentFromLocalSource

func (c *Client) StoreContentFromLocalSource(source LocalContentSource, opts StoreContentOptions) (string, error)

StoreContentFromLocalSource asks the selected cache host to read source.Path itself. Prefer StoreContentFromLocalFile unless the source path is guaranteed to exist on cache hosts.

func (*Client) StoreContentFromS3

func (c *Client) StoreContentFromS3(source struct {
	Path        string
	BucketName  string
	Region      string
	EndpointURL string
	AccessKey   string
	SecretKey   string
}, opts struct {
	RoutingKey string
	Lock       bool
}) (string, error)

func (*Client) StoreContentFromS3Source

func (c *Client) StoreContentFromS3Source(source S3ContentSource, opts StoreContentOptions) (string, error)

func (*Client) WaitForHosts

func (c *Client) WaitForHosts(timeout time.Duration) error

type ClientConfig

type ClientConfig struct {
	Token                 string                    `key:"token" json:"token"`
	MinRetryLengthBytes   int64                     `key:"minRetryLengthBytes" json:"min_retry_length_bytes"`
	MaxGetContentAttempts int                       `key:"maxGetContentAttempts" json:"max_get_content_attempts"`
	NTopHosts             int                       `key:"nTopHosts" json:"n_top_hosts"`
	CacheFS               FSConfig                  `key:"cachefs" json:"cachefs"`
	PreferLocalCacheHost  bool                      `key:"preferLocalCacheHost" json:"prefer_local_cache_host"`
	PageFDCacheSize       int                       `key:"pageFDCacheSize" json:"page_fd_cache_size"`
	ReadTransport         ClientReadTransportConfig `key:"readTransport" json:"read_transport"`
	Prefetch              ReadPrefetchConfig        `key:"prefetch" json:"prefetch"`
}

type ClientLocalPageFileView

type ClientLocalPageFileView struct {
	Path   string
	Offset int64
	Length int
}

ClientLocalPageFileView describes a byte range inside a page file that is already present on this client/worker. It is intentionally not a remote address: consumers may mmap it, return it as a FUSE fd-backed response, or otherwise read it as a local file.

type ClientOptions

type ClientOptions = struct {
	RoutingKey string
}

type ClientReadTransportConfig

type ClientReadTransportConfig struct {
	Enabled               bool `key:"enabled" json:"enabled"`
	MaxActiveConnsPerHost int  `key:"maxActiveConnsPerHost" json:"max_active_conns_per_host"`
	MaxIdleConnsPerHost   int  `key:"maxIdleConnsPerHost" json:"max_idle_conns_per_host"`
}

type ClientRequest

type ClientRequest struct {
	// contains filtered or unexported fields
}

type ClientRequestType

type ClientRequestType int
const (
	ClientRequestTypeStorage ClientRequestType = iota
	ClientRequestTypeRetrieval
)

type Config

type Config struct {
	Enabled     bool              `key:"enabled" json:"enabled"`
	Disk        DiskConfig        `key:"disk" json:"disk"`
	Memory      MemoryConfig      `key:"memory" json:"memory"`
	Coordinator CoordinatorConfig `key:"coordinator" json:"coordinator"`
	Server      ServerConfig      `key:"server" json:"server"`
	Client      ClientConfig      `key:"client" json:"client"`
	Global      GlobalConfig      `key:"global" json:"global"`
	Metrics     MetricsConfig     `key:"metrics" json:"metrics"`
}

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(repository CoordinatorRepository) *Coordinator

func (*Coordinator) ListHosts

func (c *Coordinator) ListHosts(ctx context.Context, poolName, locality string) ([]CoordinatorHost, error)

func (*Coordinator) RegisterHost

func (c *Coordinator) RegisterHost(ctx context.Context, host CoordinatorHost, ttl time.Duration) error

func (*Coordinator) UnregisterHost

func (c *Coordinator) UnregisterHost(ctx context.Context, poolName, locality, logicalHostID, registrationID string) error

type CoordinatorConfig

type CoordinatorConfig struct {
	RegistrationTTLSeconds   int `key:"registrationTTLSeconds" json:"registration_ttl_seconds"`
	HeartbeatIntervalSeconds int `key:"heartbeatIntervalSeconds" json:"heartbeat_interval_seconds"`
	HostWatchIntervalSeconds int `key:"hostWatchIntervalSeconds" json:"host_watch_interval_seconds"`
}

type CoordinatorHost

type CoordinatorHost struct {
	// LogicalHostID is the stable cache routing identity. Multiple worker
	// process registrations can advertise addresses for the same logical host.
	LogicalHostID string
	// RegistrationID identifies one live worker/cache-server process lease.
	RegistrationID   string
	PoolName         string
	Locality         string
	NodeID           string
	CachePathID      string
	Addr             string
	PrivateAddr      string
	CapacityUsagePct float64
}

type CoordinatorRepository

type CoordinatorRepository interface {
	SetCacheRegistration(ctx context.Context, host CoordinatorHost, ttl time.Duration) error
	GetActiveCacheRegistration(ctx context.Context, logicalHostID string) (registrationID string, found bool, err error)
	SetActiveCacheRegistration(ctx context.Context, logicalHostID, registrationID string, ttl time.Duration) error
	ListCacheLogicalHosts(ctx context.Context, poolName, locality string) ([]string, error)
	ListCacheRegistrations(ctx context.Context, logicalHostID string) ([]string, error)
	GetCacheRegistration(ctx context.Context, logicalHostID, registrationID string) (CoordinatorHost, bool, error)
	RemoveCacheRegistration(ctx context.Context, logicalHostID, registrationID string) error
	CountCacheRegistrations(ctx context.Context, logicalHostID string) (int64, error)
	RemoveCacheLogicalHost(ctx context.Context, poolName, locality, logicalHostID string) error
}

type DiscoveryClient

type DiscoveryClient struct {
	// contains filtered or unexported fields
}

func NewDiscoveryClient

func NewDiscoveryClient(cfg GlobalConfig, hostMap *HostMap, hostDirectory HostDirectory, locality string) *DiscoveryClient

func (*DiscoveryClient) GetHostState

func (d *DiscoveryClient) GetHostState(ctx context.Context, host *Host) (*Host, error)

GetHostState attempts to connect to the gRPC service and verifies its availability

func (*DiscoveryClient) Start

func (d *DiscoveryClient) Start(ctx context.Context) error

Used by cache servers to discover their closest peers

type DiskConfig

type DiskConfig struct {
	Enabled     bool    `key:"enabled" json:"enabled"`
	HostPath    string  `key:"hostPath" json:"host_path"`
	MountPath   string  `key:"mountPath" json:"mount_path"`
	MaxUsagePct float64 `key:"maxUsagePct" json:"max_usage_pct"`
}

type ErrNodeNotFound

type ErrNodeNotFound struct {
	Id string
}

func (*ErrNodeNotFound) Error

func (e *ErrNodeNotFound) Error() string

type FSConfig

type FSConfig struct {
	Enabled            bool     `key:"enabled" json:"enabled"`
	MountPoint         string   `key:"mountPoint" json:"mount_point"`
	MaxBackgroundTasks int      `key:"maxBackgroundTasks" json:"max_background_tasks"`
	MaxWriteKB         int      `key:"maxWriteKB" json:"max_write_kb"`
	MaxReadAheadKB     int      `key:"maxReadAheadKB" json:"max_read_ahead_kb"`
	DirectMount        bool     `key:"directMount" json:"direct_mount"`
	DirectIO           bool     `key:"directIO" json:"direct_io"`
	Options            []string `key:"options" json:"options"`
}

type FSMetadata

type FSMetadata struct {
	PID       string `redis:"pid" json:"pid"`
	ID        string `redis:"id" json:"id"`
	Name      string `redis:"name" json:"name"`
	Path      string `redis:"path" json:"path"`
	Hash      string `redis:"hash" json:"hash"`
	Ino       uint64 `redis:"ino" json:"ino"`
	Size      uint64 `redis:"size" json:"size"`
	Blocks    uint64 `redis:"blocks" json:"blocks"`
	Atime     uint64 `redis:"atime" json:"atime"`
	Mtime     uint64 `redis:"mtime" json:"mtime"`
	Ctime     uint64 `redis:"ctime" json:"ctime"`
	Atimensec uint32 `redis:"atimensec" json:"atimensec"`
	Mtimensec uint32 `redis:"mtimensec" json:"mtimensec"`
	Ctimensec uint32 `redis:"ctimensec" json:"ctimensec"`
	Mode      uint32 `redis:"mode" json:"mode"`
	Nlink     uint32 `redis:"nlink" json:"nlink"`
	Rdev      uint32 `redis:"rdev" json:"rdev"`
	Blksize   uint32 `redis:"blksize" json:"blksize"`
	Padding   uint32 `redis:"padding" json:"padding"`
	Uid       uint32 `redis:"uid" json:"uid"`
	Gid       uint32 `redis:"gid" json:"gid"`
	Gen       uint64 `redis:"gen" json:"gen"`
}

func FSMetadataFromWorkerCacheProto

func FSMetadataFromWorkerCacheProto(metadata *proto.WorkerCacheFSMetadata) *FSMetadata

func (*FSMetadata) ToProto

func (m *FSMetadata) ToProto() *proto.CacheFSMetadata

func (*FSMetadata) ToWorkerCacheProto

func (m *FSMetadata) ToWorkerCacheProto() *proto.WorkerCacheFSMetadata

type FSNode

type FSNode struct {
	fs.Inode
	// contains filtered or unexported fields
}

func (*FSNode) Create

func (n *FSNode) Create(ctx context.Context, name string, flags uint32, mode uint32, out *fuse.EntryOut) (inode *fs.Inode, fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)

func (*FSNode) Getattr

func (n *FSNode) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno

func (*FSNode) Lookup

func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)

func (*FSNode) Mkdir

func (n *FSNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)

func (*FSNode) OnAdd

func (n *FSNode) OnAdd(ctx context.Context)

func (*FSNode) Open

func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)

func (*FSNode) Opendir

func (n *FSNode) Opendir(ctx context.Context) syscall.Errno

func (*FSNode) Read

func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno)

func (*FSNode) Readdir

func (n *FSNode) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno)
func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno)

func (*FSNode) Rename

func (n *FSNode) Rename(ctx context.Context, oldName string, newParent fs.InodeEmbedder, newName string, flags uint32) syscall.Errno

func (*FSNode) Rmdir

func (n *FSNode) Rmdir(ctx context.Context, name string) syscall.Errno
func (n *FSNode) Unlink(ctx context.Context, name string) syscall.Errno

type FSSystemOpts

type FSSystemOpts struct {
	Verbose       bool
	MetadataStore CacheMetadataStore
	Config        ClientConfig
	Client        *Client
}

type FileSystem

type FileSystem interface {
	Mount(opts FileSystemOpts) (func() error, <-chan error, error)
	Unmount() error
	Format() error
}

type FileSystemOpts

type FileSystemOpts struct {
	MountPoint string
	Verbose    bool
	Metadata   *Metadata
}

CacheFS types

type FileSystemStorage

type FileSystemStorage interface {
	Metadata()
	Get(string)
	ReadFile(interface{}, []byte, int64)
}

type GlobalConfig

type GlobalConfig struct {
	DefaultLocality                 string  `key:"defaultLocality" json:"default_locality"`
	ServerPort                      uint    `key:"serverPort" json:"server_port"`
	DiscoveryIntervalS              int     `key:"discoveryIntervalS" json:"discovery_interval_s"`
	DiscoveryJitterS                int     `key:"discoveryJitterS" json:"discovery_jitter_s"`
	MaxDiscoveryConcurrency         int     `key:"maxDiscoveryConcurrency" json:"max_discovery_concurrency"`
	HostMonitorIntervalS            int     `key:"hostMonitorIntervalS" json:"host_monitor_interval_s"`
	RoundTripThresholdMilliseconds  uint    `key:"rttThresholdMilliseconds" json:"rtt_threshold_ms"`
	HostStorageCapacityThresholdPct float64 `key:"hostStorageCapacityThresholdPct" json:"host_storage_capacity_threshold_pct"`
	GRPCDialTimeoutS                int     `key:"grpcDialTimeoutS" json:"grpc_dial_timeout_s"`
	GRPCMessageSizeBytes            int     `key:"grpcMessageSizeBytes" json:"grpc_message_size_bytes"`
	GRPCInitialWindowSize           int     `key:"grpcInitialWindowSize" json:"grpc_initial_window_size"`
	GRPCInitialConnWindowSize       int     `key:"grpcInitialConnWindowSize" json:"grpc_initial_conn_window_size"`
	GRPCWriteBufferSize             int     `key:"grpcWriteBufferSize" json:"grpc_write_buffer_size"`
	GRPCReadBufferSize              int     `key:"grpcReadBufferSize" json:"grpc_read_buffer_size"`
	GRPCMaxConcurrentStreams        int     `key:"grpcMaxConcurrentStreams" json:"grpc_max_concurrent_streams"`
	GRPCNumStreamWorkers            int     `key:"grpcNumStreamWorkers" json:"grpc_num_stream_workers"`
	GRPCPayloadCodecV2              bool    `key:"grpcPayloadCodecV2" json:"grpc_payload_codec_v2"`
	GRPCPayloadCodecMinBytes        int     `key:"grpcPayloadCodecMinBytes" json:"grpc_payload_codec_min_bytes"`
	DebugMode                       bool    `key:"debugMode" json:"debug_mode"`
	PrettyLogs                      bool    `key:"prettyLogs" json:"pretty_logs"`
}

func (*GlobalConfig) GetLocality

func (c *GlobalConfig) GetLocality() string

type Host

type Host struct {
	RTT              time.Duration `redis:"rtt" json:"rtt"`
	HostId           string        `redis:"host_id" json:"host_id"`
	RegistrationID   string        `redis:"registration_id" json:"registration_id"`
	PoolName         string        `redis:"pool_name" json:"pool_name"`
	Locality         string        `redis:"locality" json:"locality"`
	NodeID           string        `redis:"node_id" json:"node_id"`
	CachePathID      string        `redis:"cache_path_id" json:"cache_path_id"`
	Addr             string        `redis:"addr" json:"addr"`
	PrivateAddr      string        `redis:"private_addr" json:"private_addr"`
	CapacityUsagePct float64       `redis:"capacity_usage_pct" json:"capacity_usage_pct"`
}

func (*Host) Bytes

func (h *Host) Bytes() []byte

Bytes is needed for the rendezvous hasher

type HostDirectory

type HostDirectory interface {
	GetAvailableHosts(ctx context.Context, locality string) ([]*Host, error)
}

type HostMap

type HostMap struct {
	// contains filtered or unexported fields
}

func NewHostMap

func NewHostMap(cfg GlobalConfig, onHostAdded func(*Host) error) *HostMap

func (*HostMap) Closest

func (hm *HostMap) Closest(timeout time.Duration) (*Host, error)

Closest finds the nearest host within a given timeout If no hosts are found, it will error out

func (*HostMap) ClosestWithCapacity

func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*Host, error)

ClosestWithCapacity finds the nearest host with available storage capacity within a given timeout If no hosts are found, it will error out

func (*HostMap) Get

func (hm *HostMap) Get(hostId string) *Host

func (*HostMap) GetAll

func (hm *HostMap) GetAll() []*Host

func (*HostMap) Members

func (hm *HostMap) Members() mapset.Set[string]

func (*HostMap) Remove

func (hm *HostMap) Remove(host *Host) bool

func (*HostMap) Set

func (hm *HostMap) Set(host *Host)

type JuiceFSConfig

type JuiceFSConfig struct {
	RedisURI   string `key:"redisURI" json:"redis_uri"`
	Bucket     string `key:"bucket" json:"bucket"`
	AccessKey  string `key:"accessKey" json:"access_key"`
	SecretKey  string `key:"secretKey" json:"secret_key"`
	CacheSize  int64  `key:"cacheSize" json:"cache_size"`
	BlockSize  int64  `key:"blockSize" json:"block_size"`
	Prefetch   int64  `key:"prefetch" json:"prefetch"`
	BufferSize int64  `key:"bufferSize" json:"buffer_size"`
}

type JuiceFsSource

type JuiceFsSource struct {
	// contains filtered or unexported fields
}

func (*JuiceFsSource) Format

func (s *JuiceFsSource) Format(fsName string) error

func (*JuiceFsSource) Mount

func (s *JuiceFsSource) Mount(localPath string) error

func (*JuiceFsSource) Unmount

func (s *JuiceFsSource) Unmount(localPath string) error

type LocalContentSource

type LocalContentSource struct {
	Path      string
	CachePath string
}

type MemoryConfig

type MemoryConfig struct {
	Enabled     bool  `key:"enabled" json:"enabled"`
	MaxCachePct int64 `key:"maxCachePct" json:"max_cache_pct"`
}

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

func NewMetadata

func NewMetadata(cfg MetadataConfig) (*Metadata, error)

func NewMetadataWithRedisClient

func NewMetadataWithRedisClient(client redis.UniversalClient) *Metadata

func (*Metadata) AddFsNodeChild

func (m *Metadata) AddFsNodeChild(ctx context.Context, pid, id string) error

func (*Metadata) AddHostToIndex

func (m *Metadata) AddHostToIndex(ctx context.Context, locality string, host *Host) error

func (*Metadata) GetAvailableHosts

func (m *Metadata) GetAvailableHosts(ctx context.Context, locality string, removeHostCallback func(host *Host)) ([]*Host, error)

func (*Metadata) GetFsNode

func (m *Metadata) GetFsNode(ctx context.Context, id string) (*FSMetadata, error)

func (*Metadata) GetFsNodeChildren

func (m *Metadata) GetFsNodeChildren(ctx context.Context, id string) ([]*FSMetadata, error)

func (*Metadata) GetHostIndex

func (m *Metadata) GetHostIndex(ctx context.Context, locality string) ([]*Host, error)

func (*Metadata) RefreshStoreFromContentLock

func (m *Metadata) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*Metadata) RemoveClientLock

func (m *Metadata) RemoveClientLock(ctx context.Context, clientId, hash string) error

func (*Metadata) RemoveFsNode

func (m *Metadata) RemoveFsNode(ctx context.Context, id string) error

func (*Metadata) RemoveFsNodeChild

func (m *Metadata) RemoveFsNodeChild(ctx context.Context, pid, id string) error

func (*Metadata) RemoveHostFromIndex

func (m *Metadata) RemoveHostFromIndex(ctx context.Context, locality string, host *Host) error

func (*Metadata) RemoveHostKeepAlive

func (m *Metadata) RemoveHostKeepAlive(ctx context.Context, locality string, host *Host) error

func (*Metadata) RemoveStoreFromContentLock

func (m *Metadata) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*Metadata) SetClientLock

func (m *Metadata) SetClientLock(ctx context.Context, clientId, hash string) error

func (*Metadata) SetFsNode

func (m *Metadata) SetFsNode(ctx context.Context, id string, metadata *FSMetadata) error

func (*Metadata) SetHostKeepAlive

func (m *Metadata) SetHostKeepAlive(ctx context.Context, locality string, host *Host) error

func (*Metadata) SetStoreFromContentLock

func (m *Metadata) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

type MetadataConfig

type MetadataConfig struct {
	Mode         MetadataMode `key:"mode" json:"mode"`
	ValkeyConfig ValkeyConfig `key:"valkey" json:"valkey"`

	// Default config
	RedisAddr               string    `key:"redisAddr" json:"redis_addr"`
	RedisPasswd             string    `key:"redisPasswd" json:"redis_passwd"`
	RedisTLSEnabled         bool      `key:"redisTLSEnabled" json:"redis_tls_enabled"`
	RedisInsecureSkipVerify bool      `key:"redisInsecureSkipVerify" json:"redis_insecure_skip_verify"`
	RedisMode               RedisMode `key:"redisMode" json:"redis_mode"`
	RedisMasterName         string    `key:"redisMasterName" json:"redis_master_name"`
}

type MetadataMode

type MetadataMode string
const (
	MetadataModeDefault MetadataMode = "default"
	MetadataModeLocal   MetadataMode = "local"
)

type MetricsConfig

type MetricsConfig struct {
	PushIntervalS int    `key:"pushIntervalS" json:"push_interval_s"`
	URL           string `key:"url" json:"url"`
	Username      string `key:"username" json:"username"`
	Password      string `key:"password" json:"password"`
}

type MockCacheMetadataStore

type MockCacheMetadataStore struct {
	// contains filtered or unexported fields
}

MockCacheMetadataStore is a simple in-memory metadataStore for testing Does not require Redis or any external dependencies

func NewMockCacheMetadataStore

func NewMockCacheMetadataStore() *MockCacheMetadataStore

func (*MockCacheMetadataStore) AddFsNodeChild

func (m *MockCacheMetadataStore) AddFsNodeChild(ctx context.Context, pid, id string) error

func (*MockCacheMetadataStore) AddHostToIndex

func (m *MockCacheMetadataStore) AddHostToIndex(ctx context.Context, locality string, host *Host) error

func (*MockCacheMetadataStore) GetAvailableHosts

func (m *MockCacheMetadataStore) GetAvailableHosts(ctx context.Context, locality string) ([]*Host, error)

func (*MockCacheMetadataStore) GetFsNode

func (m *MockCacheMetadataStore) GetFsNode(ctx context.Context, id string) (*FSMetadata, error)

func (*MockCacheMetadataStore) GetFsNodeChildren

func (m *MockCacheMetadataStore) GetFsNodeChildren(ctx context.Context, id string) ([]*FSMetadata, error)

func (*MockCacheMetadataStore) RefreshStoreFromContentLock

func (m *MockCacheMetadataStore) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*MockCacheMetadataStore) RemoveClientLock

func (m *MockCacheMetadataStore) RemoveClientLock(ctx context.Context, hash string, host string) error

func (*MockCacheMetadataStore) RemoveFsNode

func (m *MockCacheMetadataStore) RemoveFsNode(ctx context.Context, id string) error

func (*MockCacheMetadataStore) RemoveFsNodeChild

func (m *MockCacheMetadataStore) RemoveFsNodeChild(ctx context.Context, pid, id string) error

func (*MockCacheMetadataStore) RemoveHost

func (m *MockCacheMetadataStore) RemoveHost(ctx context.Context, locality string, host *Host) error

func (*MockCacheMetadataStore) RemoveStoreFromContentLock

func (m *MockCacheMetadataStore) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*MockCacheMetadataStore) SetClientLock

func (m *MockCacheMetadataStore) SetClientLock(ctx context.Context, hash string, host string) error

func (*MockCacheMetadataStore) SetFsNode

func (m *MockCacheMetadataStore) SetFsNode(ctx context.Context, id string, metadata *FSMetadata) error

func (*MockCacheMetadataStore) SetHostKeepAlive

func (m *MockCacheMetadataStore) SetHostKeepAlive(ctx context.Context, locality string, host *Host) error

func (*MockCacheMetadataStore) SetStoreFromContentLock

func (m *MockCacheMetadataStore) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

type MountPointConfig

type MountPointConfig struct {
	BucketName     string `key:"bucketName" json:"bucket_name"`
	AccessKey      string `key:"accessKey" json:"access_key"`
	SecretKey      string `key:"secretKey" json:"secret_key"`
	Region         string `key:"region" json:"region"`
	EndpointURL    string `key:"endpointUrl" json:"endpoint_url"`
	ForcePathStyle bool   `key:"forcePathStyle" json:"force_path_style"`
}

type MountPointSource

type MountPointSource struct {
	// contains filtered or unexported fields
}

func (*MountPointSource) Format

func (s *MountPointSource) Format(fsName string) error

func (*MountPointSource) Mount

func (s *MountPointSource) Mount(localPath string) error

func (*MountPointSource) Unmount

func (s *MountPointSource) Unmount(localPath string) error

type PrefetchState

type PrefetchState struct {
	// contains filtered or unexported fields
}

PrefetchState tracks sequential read patterns per file/hash

type Prefetcher

type Prefetcher struct {
	// contains filtered or unexported fields
}

Prefetcher detects sequential reads and prefetches ahead

func NewPrefetcher

func NewPrefetcher(ctx context.Context, cas *Store, bufferPool *BufferPool) *Prefetcher

NewPrefetcher creates a new prefetcher instance

func (*Prefetcher) OnRead

func (pf *Prefetcher) OnRead(hash string, offset, length int64)

OnRead should be called on each read to detect patterns

type ReadPrefetchConfig

type ReadPrefetchConfig struct {
	Enabled         bool  `key:"enabled" json:"enabled"`
	AheadBytes      int64 `key:"aheadBytes" json:"ahead_bytes"`
	Workers         int   `key:"workers" json:"workers"`
	PartLengthBytes int64 `key:"partLengthBytes" json:"part_length_bytes"`
	MaxPartsPerRead int   `key:"maxPartsPerRead" json:"max_parts_per_read"`
}

type RedisCacheMetadataStore

type RedisCacheMetadataStore struct {
	// contains filtered or unexported fields
}

func (*RedisCacheMetadataStore) AddFsNodeChild

func (c *RedisCacheMetadataStore) AddFsNodeChild(ctx context.Context, pid, id string) error

func (*RedisCacheMetadataStore) AddHostToIndex

func (c *RedisCacheMetadataStore) AddHostToIndex(ctx context.Context, locality string, host *Host) error

func (*RedisCacheMetadataStore) GetAvailableHosts

func (c *RedisCacheMetadataStore) GetAvailableHosts(ctx context.Context, locality string) ([]*Host, error)

func (*RedisCacheMetadataStore) GetFsNode

func (c *RedisCacheMetadataStore) GetFsNode(ctx context.Context, id string) (*FSMetadata, error)

func (*RedisCacheMetadataStore) GetFsNodeChildren

func (c *RedisCacheMetadataStore) GetFsNodeChildren(ctx context.Context, id string) ([]*FSMetadata, error)

func (*RedisCacheMetadataStore) RefreshStoreFromContentLock

func (c *RedisCacheMetadataStore) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*RedisCacheMetadataStore) RemoveClientLock

func (c *RedisCacheMetadataStore) RemoveClientLock(ctx context.Context, hash string, host string) error

func (*RedisCacheMetadataStore) RemoveFsNode

func (c *RedisCacheMetadataStore) RemoveFsNode(ctx context.Context, id string) error

func (*RedisCacheMetadataStore) RemoveFsNodeChild

func (c *RedisCacheMetadataStore) RemoveFsNodeChild(ctx context.Context, pid, id string) error

func (*RedisCacheMetadataStore) RemoveHost

func (c *RedisCacheMetadataStore) RemoveHost(ctx context.Context, locality string, host *Host) error

func (*RedisCacheMetadataStore) RemoveStoreFromContentLock

func (c *RedisCacheMetadataStore) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

func (*RedisCacheMetadataStore) SetClientLock

func (c *RedisCacheMetadataStore) SetClientLock(ctx context.Context, hash string, host string) error

func (*RedisCacheMetadataStore) SetFsNode

func (c *RedisCacheMetadataStore) SetFsNode(ctx context.Context, id string, metadata *FSMetadata) error

func (*RedisCacheMetadataStore) SetHostKeepAlive

func (c *RedisCacheMetadataStore) SetHostKeepAlive(ctx context.Context, locality string, host *Host) error

func (*RedisCacheMetadataStore) SetStoreFromContentLock

func (c *RedisCacheMetadataStore) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error

type RedisClient

type RedisClient struct {
	redis.UniversalClient
}

func NewRedisClient

func NewRedisClient(config RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)

func (*RedisClient) Keys

func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)

Gets all keys using a pattern Actually runs a scan since keys locks up the database.

func (*RedisClient) LRange

func (r *RedisClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)

func (*RedisClient) PSubscribe

func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())

func (*RedisClient) Publish

func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd

func (*RedisClient) Scan

func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)

func (*RedisClient) Subscribe

func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)

func (*RedisClient) ToSlice

func (r *RedisClient) ToSlice(v interface{}) []interface{}

func (*RedisClient) ToStruct

func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error

type RedisConfig

type RedisConfig struct {
	Addrs              []string      `key:"addrs" json:"addrs"`
	Mode               RedisMode     `key:"mode" json:"mode"`
	ClientName         string        `key:"clientName" json:"client_name"`
	EnableTLS          bool          `key:"enableTLS" json:"enable_tls"`
	InsecureSkipVerify bool          `key:"insecureSkipVerify" json:"insecure_skip_verify"`
	MinIdleConns       int           `key:"minIdleConns" json:"min_idle_conns"`
	MaxIdleConns       int           `key:"maxIdleConns" json:"max_idle_conns"`
	ConnMaxIdleTime    time.Duration `key:"connMaxIdleTime" json:"conn_max_idle_time"`
	ConnMaxLifetime    time.Duration `key:"connMaxLifetime" json:"conn_max_lifetime"`
	DialTimeout        time.Duration `key:"dialTimeout" json:"dial_timeout"`
	ReadTimeout        time.Duration `key:"readTimeout" json:"read_timeout"`
	WriteTimeout       time.Duration `key:"writeTimeout" json:"write_timeout"`
	MaxRedirects       int           `key:"maxRedirects" json:"max_redirects"`
	MaxRetries         int           `key:"maxRetries" json:"max_retries"`
	PoolSize           int           `key:"poolSize" json:"pool_size"`
	Username           string        `key:"username" json:"username"`
	Password           string        `key:"password" json:"password"`
	RouteByLatency     bool          `key:"routeByLatency" json:"route_by_latency"`
	MasterName         string        `key:"masterName" json:"master_name"`
	SentinelPassword   string        `key:"sentinelPassword" json:"sentinel_password"`
}

type RedisLock

type RedisLock struct {
	// contains filtered or unexported fields
}

func NewRedisLock

func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock

func (*RedisLock) Acquire

func (l *RedisLock) Acquire(ctx context.Context, key string, opts RedisLockOptions) error

func (*RedisLock) Refresh

func (l *RedisLock) Refresh(key string, opts RedisLockOptions) error

func (*RedisLock) Release

func (l *RedisLock) Release(key string) error

type RedisLockOption

type RedisLockOption func(*RedisLock)

type RedisLockOptions

type RedisLockOptions struct {
	TtlS    int
	Retries int
}

type RedisMode

type RedisMode string
var (
	RedisModeSingle   RedisMode = "single"
	RedisModeCluster  RedisMode = "cluster"
	RedisModeSentinel RedisMode = "sentinel"
)

type RendezvousHasher

type RendezvousHasher interface {
	Add(hosts ...*Host)
	Remove(host *Host)
	GetN(n int, key string) []*Host
}

type S3Client

type S3Client struct {
	Client              *s3.Client
	Source              S3SourceConfig
	DownloadConcurrency int64
	DownloadChunkSize   int64
}

func NewS3Client

func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig, serverConfig ServerConfig) (*S3Client, error)

func (*S3Client) BucketName

func (c *S3Client) BucketName() string

func (*S3Client) DownloadIntoBuffer

func (c *S3Client) DownloadIntoBuffer(ctx context.Context, key string, buffer *bytes.Buffer) error

func (*S3Client) GetClient

func (c *S3Client) GetClient() *s3.Client

func (*S3Client) Head

func (c *S3Client) Head(ctx context.Context, key string) (bool, *s3.HeadObjectOutput, error)

func (*S3Client) Open

func (c *S3Client) Open(ctx context.Context, key string) (io.ReadCloser, error)

func (*S3Client) ReadRange

func (c *S3Client) ReadRange(ctx context.Context, key string, start int64, length int64) ([]byte, error)

type S3ContentSource

type S3ContentSource struct {
	Path           string
	CachePath      string
	BucketName     string
	Region         string
	EndpointURL    string
	AccessKey      string
	SecretKey      string
	ForcePathStyle bool
}

type S3SourceConfig

type S3SourceConfig struct {
	BucketName     string
	Region         string
	EndpointURL    string
	AccessKey      string
	SecretKey      string
	ForcePathStyle bool
}

type Server

type Server struct {
	proto.UnimplementedCacheServer
	// contains filtered or unexported fields
}

func NewServer

func NewServer(ctx context.Context, cfg Config, locality string) (*Server, error)

func NewServerWithOptions

func NewServerWithOptions(ctx context.Context, cfg Config, locality string, options ...ServerOption) (*Server, error)

func (*Server) Close

func (cs *Server) Close() error

func (*Server) Drain

func (cs *Server) Drain()

func (*Server) GetContent

func (*Server) GetContentStream

func (cs *Server) GetContentStream(req *proto.CacheGetContentRequest, stream proto.Cache_GetContentStreamServer) error

func (*Server) GetState

func (*Server) HasContent

func (*Server) Host

func (cs *Server) Host() *Host

func (*Server) Serve

func (cs *Server) Serve(bindAddr string, advertiseHost string) (string, error)

func (*Server) StartServer

func (cs *Server) StartServer(port uint) error

func (*Server) StoreContent

func (cs *Server) StoreContent(stream proto.Cache_StoreContentServer) error

func (*Server) StoreContentInCacheFS

func (cs *Server) StoreContentInCacheFS(ctx context.Context, path string, hash string, size uint64) error

func (*Server) StoreSyntheticContentInCacheFS

func (cs *Server) StoreSyntheticContentInCacheFS(ctx context.Context, path string, hash string, size uint64) error

func (*Server) UsagePct

func (cs *Server) UsagePct() float64

type ServerConfig

type ServerConfig struct {
	DiskCacheDir                 string                    `key:"diskCacheDir" json:"disk_cache_dir"`
	DiskCacheMaxUsagePct         float64                   `key:"diskCacheMaxUsagePct" json:"disk_cache_max_usage_pct"`
	ObjectTtlS                   int                       `key:"objectTtlS" json:"object_ttl_s"`
	MaxCachePct                  int64                     `key:"maxCachePct" json:"max_cache_pct"`
	PageSizeBytes                int64                     `key:"pageSizeBytes" json:"page_size_bytes"`
	PageFileBuckets              int                       `key:"pageFileBuckets" json:"page_file_buckets"`
	SmallRangeCopyThresholdBytes int64                     `key:"smallRangeCopyThresholdBytes" json:"small_range_copy_threshold_bytes"`
	ReadTransport                ServerReadTransportConfig `key:"readTransport" json:"read_transport"`
	Metadata                     MetadataConfig            `key:"metadata" json:"metadata"`
	Sources                      []SourceConfig            `key:"sources" json:"sources"`
	S3DownloadConcurrency        int64                     `key:"s3DownloadConcurrency" json:"s3_download_concurrency"`
	S3DownloadChunkSize          int64                     `key:"s3DownloadChunkSize" json:"s3_download_chunk_size"`
}

type ServerOption

type ServerOption func(*ServerOpts)

func WithServerAdvertiseAddr

func WithServerAdvertiseAddr(addr string) ServerOption

func WithServerHostID

func WithServerHostID(hostID string) ServerOption

func WithServerMetadataStore

func WithServerMetadataStore(metadataStore CacheMetadataStore) ServerOption

type ServerOpts

type ServerOpts struct {
	HostID        string
	MetadataStore CacheMetadataStore
	AdvertiseAddr string
}

type ServerReadTransportConfig

type ServerReadTransportConfig struct {
	Enabled  bool `key:"enabled" json:"enabled"`
	Sendfile bool `key:"sendfile" json:"sendfile"`
}

type Source

type Source interface {
	Mount(localPath string) error
	Format(fsName string) error
	Unmount(localPath string) error
}

func NewJuiceFsSource

func NewJuiceFsSource(config JuiceFSConfig) (Source, error)

func NewMountPointSource

func NewMountPointSource(config MountPointConfig) (Source, error)

func NewSource

func NewSource(config SourceConfig) (Source, error)

type SourceConfig

type SourceConfig struct {
	Mode           string           `key:"mode" json:"mode"`
	FilesystemName string           `key:"fsName" json:"filesystem_name"`
	FilesystemPath string           `key:"fsPath" json:"filesystem_path"`
	JuiceFS        JuiceFSConfig    `key:"juicefs" json:"juicefs"`
	MountPoint     MountPointConfig `key:"mountpoint" json:"mountpoint"`
}

type StorageLayer

type StorageLayer interface {
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(ctx context.Context, currentHost *Host, locality string, metadataStore CacheMetadataStore, config Config) (*Store, error)

func (*Store) Add

func (cas *Store) Add(ctx context.Context, hash string, content []byte) error

func (*Store) AddPageSourceWithExpectedHash

func (cas *Store) AddPageSourceWithExpectedHash(ctx context.Context, expectedHash string, size int64, concurrency int, readPage func(context.Context, int64, int64, int64) ([]byte, error)) (string, int64, error)

func (*Store) AddReader

func (cas *Store) AddReader(ctx context.Context, reader io.Reader) (string, int64, error)

func (*Store) AddReaderWithExpectedHash

func (cas *Store) AddReaderWithExpectedHash(ctx context.Context, reader io.Reader, expectedHash string) (string, int64, error)

AddReaderWithExpectedHash stores a content-addressed stream into a temporary page directory, validates the full stream hash, then publishes the verified pages under the final hash path.

func (*Store) Cleanup

func (cas *Store) Cleanup()

func (*Store) Exists

func (cas *Store) Exists(hash string) bool

func (*Store) Get

func (cas *Store) Get(hash string, offset, length int64, dst []byte) (int64, error)

func (*Store) GetDiskCacheMetrics

func (cas *Store) GetDiskCacheMetrics() (int64, int64, float64, error)

func (*Store) PageRegion

func (cas *Store) PageRegion(hash string, offset int64, length int64) (path string, pageOffset int64, n int, ok bool, err error)

func (*Store) PutFullPages

func (cas *Store) PutFullPages(hash string, offset int64, data []byte)

func (*Store) PutPageRange

func (cas *Store) PutPageRange(hash string, offset int64, data []byte)

func (*Store) ReadAt

func (cas *Store) ReadAt(hash string, offset int64, dst []byte) (read int64, err error)

func (*Store) WarmRange

func (cas *Store) WarmRange(hash string, offset int64, length int64)

type StoreContentOptions

type StoreContentOptions struct {
	RoutingKey string
	Lock       bool
}

type ValkeyConfig

type ValkeyConfig struct {
	PrimaryName     string                `key:"primaryName" json:"primary_name"`
	Password        string                `key:"password" json:"password"`
	TLS             bool                  `key:"tls" json:"tls"`
	Host            string                `key:"host" json:"host"`
	Port            int                   `key:"port" json:"port"`
	ExistingPrimary ValkeyExistingPrimary `key:"existingPrimary" json:"existingPrimary"`
}

type ValkeyExistingPrimary

type ValkeyExistingPrimary struct {
	Host string `key:"host" json:"host"`
	Port int    `key:"port" json:"port"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL