Documentation
¶
Index ¶
- Constants
- func BitmapPath(filePath string) string
- func GetFreeDiskSpace(path string) (freeBytesAvail, totalNumberOfBytes, totalNumberFreeBytes uint64, err error)
- type ChunkBitmap
- func (b *ChunkBitmap) ChunkSizeBytes() int
- func (b *ChunkBitmap) FileSize() int64
- func (b *ChunkBitmap) Has(chunkIdx int) bool
- func (b *ChunkBitmap) HasRange(offset int64, size int) bool
- func (b *ChunkBitmap) Have() int
- func (b *ChunkBitmap) IsComplete() bool
- func (b *ChunkBitmap) NextMissing(from int) int
- func (b *ChunkBitmap) Progress() float64
- func (b *ChunkBitmap) Save(path string) error
- func (b *ChunkBitmap) Set(chunkIdx int)
- func (b *ChunkBitmap) SetRange(offset int64, size int)
- func (b *ChunkBitmap) Total() int
- type Dir
- func (d *Dir) Access(path string, _mask uint32) (errCode int)
- func (d *Dir) AddRemoteFile(logger *slog.Logger, path string, name string, stat *winfuse.Stat_t) error
- func (d *Dir) Chmod(path string, mode uint32) (errCode int)
- func (d *Dir) Chown(path string, uid uint32, gid uint32) (errCode int)
- func (d *Dir) Create(path string, flags int, mode uint32) (errCode int, fh uint64)
- func (d *Dir) CreateEx(path string, mode uint32, fi *winfuse.FileInfo_t) (errCode int)
- func (d *Dir) Destroy()
- func (d *Dir) EditRemoteFile(logger *slog.Logger, path string, name string, stat *winfuse.Stat_t) error
- func (d *Dir) Flush(path string, fh uint64) (errCode int)
- func (d *Dir) Fsync(path string, datasync bool, fh uint64) (errCode int)
- func (d *Dir) Fsyncdir(path string, datasync bool, fh uint64) (errCode int)
- func (d *Dir) Getattr(path string, stat *winfuse.Stat_t, fh uint64) (errCode int)
- func (d *Dir) Getxattr(path string, name string) (errCode int, data []byte)
- func (d *Dir) Init()
- func (d *Dir) Link(oldpath string, newpath string) (errCode int)
- func (d *Dir) Listxattr(path string, fill func(name string) bool) (errCode int)
- func (d *Dir) Mkdir(path string, mode uint32) (errCode int)
- func (d *Dir) MkdirFromPeer(path string, mode uint32) (errCode int)
- func (d *Dir) Mknod(path string, mode uint32, dev uint64) (errCode int)
- func (d *Dir) Open(path string, flags int) (errCode int, retFh uint64)
- func (d *Dir) OpenEx(path string, fi *winfuse.FileInfo_t) (errCode int)
- func (d *Dir) Opendir(path string) (errCode int, retFh uint64)
- func (d *Dir) Read(path string, buff []byte, offset int64, fh uint64) (errCode int)
- func (d *Dir) Readdir(path string, fill func(name string, stat *winfuse.Stat_t, offset int64) bool, ...) (errCode int)
- func (d *Dir) Readlink(path string) (errCode int, target string)
- func (d *Dir) Release(path string, fh uint64) (errCode int)
- func (d *Dir) Releasedir(path string, fh uint64) (errCode int)
- func (d *Dir) Removexattr(path string, name string) (errCode int)
- func (d *Dir) Rename(oldpath string, newpath string) (errCode int)
- func (d *Dir) Rmdir(path string) (errCode int)
- func (d *Dir) RmdirFromPeer(path string) (errCode int)
- func (d *Dir) Setxattr(path string, name string, value []byte, flags int) (errCode int)
- func (d *Dir) Statfs(path string, stat *winfuse.Statfs_t) (errCode int)
- func (d *Dir) Symlink(target string, newpath string) (errCode int)
- func (d *Dir) Truncate(path string, size int64, fh uint64) (errCode int)
- func (d *Dir) Unlink(path string) (errCode int)
- func (d *Dir) UnlinkFromPeer(path string) (errCode int)
- func (d *Dir) Utimens(path string, tmsp []winfuse.Timespec) (errCode int)
- func (d *Dir) Write(path string, buff []byte, offset int64, fh uint64) (errCode int)
- type DownloadState
- func (ds *DownloadState) CanRetry() bool
- func (ds *DownloadState) Checksum() uint64
- func (ds *DownloadState) IsComplete() bool
- func (ds *DownloadState) Progress() float64
- func (ds *DownloadState) RecordAttempt() int32
- func (ds *DownloadState) Reset(totalSize uint64)
- func (ds *DownloadState) UpdateChecksum(data []byte)
- func (ds *DownloadState) UpdateProgress(offset int64, bytesRead int)
- type FS
- type File
- type HandleEntry
- type OpenFileCounter
- type StreamPool
- type WriteStats
Constants ¶
const ChunkSize = 512 * 1024 // 512 KiB
ChunkSize is the granularity for tracking downloaded segments.
const FilesystemBlockSize = 2 << 17 // 256 KiB - optimal for Linux cp (uses 128 KiB blocks)
const StreamPoolSize = 4
StreamPoolSize is the number of parallel gRPC streams per file. Matches typical FUSE readahead parallelism on Linux.
Variables ¶
This section is empty.
Functions ¶
func BitmapPath ¶
BitmapPath returns the .kdbitmap sidecar path for a given file path.
func GetFreeDiskSpace ¶
Types ¶
type ChunkBitmap ¶
type ChunkBitmap struct {
// contains filtered or unexported fields
}
ChunkBitmap tracks which chunks of a file have been downloaded. Thread-safe: Has() uses RLock, Set() uses Lock.
func LoadChunkBitmap ¶
func LoadChunkBitmap(path string, expectedFileSize int64) (*ChunkBitmap, error)
LoadChunkBitmap reads a bitmap from a .kdbitmap file. Returns an error if the file is corrupt or the fileSize doesn't match.
func NewChunkBitmap ¶
func NewChunkBitmap(fileSize int64) *ChunkBitmap
NewChunkBitmap creates a bitmap for a file of the given size using ChunkSize granularity. Returns nil for size <= 0 (empty files need no tracking).
func NewChunkBitmapWithSize ¶
func NewChunkBitmapWithSize(fileSize int64, chunkSize int) *ChunkBitmap
NewChunkBitmapWithSize creates a bitmap with a custom chunk size.
func (*ChunkBitmap) ChunkSizeBytes ¶
func (b *ChunkBitmap) ChunkSizeBytes() int
ChunkSizeBytes returns the chunk size used by this bitmap.
func (*ChunkBitmap) FileSize ¶
func (b *ChunkBitmap) FileSize() int64
FileSize returns the tracked file size.
func (*ChunkBitmap) Has ¶
func (b *ChunkBitmap) Has(chunkIdx int) bool
Has returns true if the chunk at chunkIdx has been downloaded.
func (*ChunkBitmap) HasRange ¶
func (b *ChunkBitmap) HasRange(offset int64, size int) bool
HasRange returns true if all chunks covering [offset, offset+size) are downloaded.
func (*ChunkBitmap) Have ¶
func (b *ChunkBitmap) Have() int
Have returns the number of downloaded chunks.
func (*ChunkBitmap) IsComplete ¶
func (b *ChunkBitmap) IsComplete() bool
IsComplete returns true if all chunks have been downloaded.
func (*ChunkBitmap) NextMissing ¶
func (b *ChunkBitmap) NextMissing(from int) int
NextMissing returns the index of the first missing chunk starting from `from`. Returns -1 if no missing chunks remain from that point.
func (*ChunkBitmap) Progress ¶
func (b *ChunkBitmap) Progress() float64
Progress returns download completion as a fraction [0.0, 1.0].
func (*ChunkBitmap) Save ¶
func (b *ChunkBitmap) Save(path string) error
Save writes the bitmap state to a .kdbitmap file.
func (*ChunkBitmap) Set ¶
func (b *ChunkBitmap) Set(chunkIdx int)
Set marks a chunk as downloaded. Idempotent.
func (*ChunkBitmap) SetRange ¶
func (b *ChunkBitmap) SetRange(offset int64, size int)
SetRange marks all chunks covering [offset, offset+size) as downloaded.
func (*ChunkBitmap) Total ¶
func (b *ChunkBitmap) Total() int
Total returns the total number of chunks.
type Dir ¶
type Dir struct {
Inode uint64 `json:"inode"` // Inodes must be unique and not re-used.
Name string `json:"name"`
RelativePath string `json:"relativePath"` // Relative (to root) path in the mounted filesystem.
RealPathOfFile string `json:"pathOnLocalSystem"` // The Path on the local system.
PeerLastEdit uint64 `json:"peerLastEdit"`
IsLocalPresent bool `json:"isLocalPresent"`
LocalDownloadFolder string // The folder where the files from the peer are downloaded.
Parent *Dir
Root *Dir
OpenFileHandlers map[uint64]*HandleEntry
OpenMapLock sync.RWMutex
Adm sync.RWMutex
AllDirMap map[string]*Dir
AfmLock sync.RWMutex
AllFileMap map[string]*File
OnLocalChange func(event types.FileEvent)
OpenStreamProvider func() types.FileStreamProvider
// Collab sync options (propagated from FS).
PrefetchOnOpen bool // If true, fetch entire file on Open() and write to local disk.
PushOnWrite bool // If true, async push deltas to peer on Write().
RemoteFilesLock sync.RWMutex
RemoteFiles map[string]*File
// PrefetchSem limits concurrent prefetch goroutines.
// Without this, a large clone (600+ files) spawns 600+ simultaneous
// StreamFile gRPC streams which overwhelm the connection.
PrefetchSem chan struct{}
// contains filtered or unexported fields
}
func (*Dir) AddRemoteFile ¶
func (*Dir) Create ¶
Create creates a new file.
From open(2) man page on Intel macOS:
"The flags specified for the oflag argument must include exactly one of the following file access modes: O_RDONLY open for reading only O_WRONLY open for writing only O_RDWR open for reading and writing In addition any combination of the following values can be or'ed in oflag: O_APPEND append on each write O_CREAT create file if it does not exist O_TRUNC truncate size to 0 O_EXCL error if O_CREAT and the file exists"
Use winfuse.O_ACCMODE to extract access mode (portable across macOS/Linux/Windows).
func (*Dir) CreateEx ¶
CreateEx implements FileSystemOpenEx interface for per-file direct_io control.
func (*Dir) EditRemoteFile ¶
func (*Dir) MkdirFromPeer ¶
MkdirFromPeer creates a directory without notifying the peer (to avoid loops).
func (*Dir) OpenEx ¶
func (d *Dir) OpenEx(path string, fi *winfuse.FileInfo_t) (errCode int)
OpenEx implements FileSystemOpenEx interface for per-file direct_io control.
func (*Dir) Rename ¶
Mac OS High Level apps use Rename SWAP, which is really fun from my experience. Note: cgofuse does not expose renamex_np with RENAME_SWAP flag. When apps try atomic rename-swap, we fall back to basic rename.
func (*Dir) RmdirFromPeer ¶
RmdirFromPeer removes a directory without notifying the peer (to avoid loops).
func (*Dir) Truncate ¶
Note: On windows open does not have a truncate flag, thus Open is immediately followed by Truncate.
func (*Dir) UnlinkFromPeer ¶
UnlinkFromPeer removes a file without notifying the peer (to avoid loops).
type DownloadState ¶
type DownloadState struct {
TotalSize atomic.Uint64 // Expected total bytes from peer.
BytesDownloaded atomic.Uint64 // Bytes successfully written to local cache.
LastReadOffset atomic.Int64 // Last successfully read offset.
StartedAt atomic.Int64 // Unix nano when download started.
LastSuccessAt atomic.Int64 // Unix nano of last successful read.
AttemptCount atomic.Int32 // Reconnection attempts since last success.
MaxRetries int // Max retries before giving up (default 5).
// contains filtered or unexported fields
}
DownloadState tracks download progress for resumption on reconnect.
func (*DownloadState) CanRetry ¶
func (ds *DownloadState) CanRetry() bool
CanRetry checks if we should attempt reconnection.
func (*DownloadState) Checksum ¶
func (ds *DownloadState) Checksum() uint64
Checksum returns the current xxHash3 checksum of received data.
func (*DownloadState) IsComplete ¶
func (ds *DownloadState) IsComplete() bool
IsComplete returns true if all bytes have been downloaded.
func (*DownloadState) Progress ¶
func (ds *DownloadState) Progress() float64
Progress returns download completion percentage (0-100).
func (*DownloadState) RecordAttempt ¶
func (ds *DownloadState) RecordAttempt() int32
RecordAttempt increments the retry counter.
func (*DownloadState) Reset ¶
func (ds *DownloadState) Reset(totalSize uint64)
Reset clears download state for a new download.
func (*DownloadState) UpdateChecksum ¶
func (ds *DownloadState) UpdateChecksum(data []byte)
UpdateChecksum adds data to the running checksum. Call this with the actual bytes received (in order received, not by offset).
func (*DownloadState) UpdateProgress ¶
func (ds *DownloadState) UpdateProgress(offset int64, bytesRead int)
UpdateProgress records successful read progress and updates checksum.
type FS ¶
type FS struct {
OnLocalChange func(event types.FileEvent)
OpenStreamProvider func() types.FileStreamProvider
// Collab sync options (set from env before Mount).
PrefetchOnOpen bool // If true, fetch entire file on Open() and write to local disk.
PushOnWrite bool // If true, async push deltas to peer on Write().
Root *Dir
// contains filtered or unexported fields
}
type File ¶
type File struct {
Inode uint64 `json:"inode"` // Inodes must be unique and not re-used.
CurrentHandleID uint64 // Opaque FUSE handle ID for the currently-open fd.
Name string `json:"name"`
RelativePath string `json:"relativePath"` // Relative (to root) path in the mounted filesystem.
RealPathOfFile string // The Path on the local system.
Parent *Dir
Root *Dir
LastEditTime uint64 `json:"lastEdit"` // Use time.Now().UnixNano().
CreatedTime uint64 `json:"createdAt"`
PeerLastEdit uint64 `json:"peerLastEdit"`
IsLocalPresent bool `json:"isLocalPresent"`
NotLocalSynced bool
NotRemoteSynced bool
LocalNewer bool
HadEdits bool
// WasTruncatedToZero tracks if Truncate(size=0) was explicitly called.
// Used with HadEdits to distinguish legitimate empty files from transient states.
WasTruncatedToZero bool
// LastNotifiedSize tracks the file size we last sent to peer in ADD_FILE.
// Used to avoid sending duplicate notifications with same size during file copy.
LastNotifiedSize int64
// PeerStoppedSharing is set when peer sends REMOVE_FILE but download is in progress.
// Once download completes (Release with 0 open handles), the file reference is removed.
PeerStoppedSharing bool
StreamProvider types.FileStreamProvider
StreamPool *StreamPool // Pool of parallel gRPC streams for on-demand reads.
StreamCancel context.CancelFunc // Cancel function for the stream context.
CacheFD *os.File // Persistent cache file descriptor for on-demand writes.
CacheWg sync.WaitGroup // Tracks in-flight async cache writes; waited on in Release.
// Download resumption state.
Download DownloadState
// Bitmap tracks which 512 KiB chunks have been downloaded from the remote peer.
// nil for local-origin files or empty files (size=0).
Bitmap *ChunkBitmap
// PrefetchCancel cancels the background prefetch goroutine for this file.
PrefetchCancel context.CancelFunc
OnLocalChange func(event types.FileEvent)
// contains filtered or unexported fields
}
func (*File) CountOpenDescriptors ¶
CountOpenDescriptors returns the number of open file handles.
func (*File) NotifyPeer ¶
func (f *File) NotifyPeer()
type HandleEntry ¶
type HandleEntry struct {
FD int // The actual kernel file descriptor for syscalls.
File *File // The File metadata struct.
}
HandleEntry maps an opaque FUSE file handle to the actual kernel fd and File metadata. Opaque handles prevent fd-recycling races: the kernel reuses fd numbers after close(), but handle IDs are monotonically increasing and never reused.
type OpenFileCounter ¶
type OpenFileCounter struct {
// contains filtered or unexported fields
}
Create and Open calls must have a corresponding Release call.
func (*OpenFileCounter) CountOpenDescriptors ¶
func (ofc *OpenFileCounter) CountOpenDescriptors() uint64
func (*OpenFileCounter) Open ¶
func (ofc *OpenFileCounter) Open()
func (*OpenFileCounter) Release ¶
func (ofc *OpenFileCounter) Release() uint64
type StreamPool ¶
type StreamPool struct {
// contains filtered or unexported fields
}
StreamPool holds N parallel gRPC streams for a single file. FUSE reads pick a stream by chunk-index modulo N, eliminating lock contention between concurrent readahead requests.
func NewStreamPool ¶
func NewStreamPool(provider types.FileStreamProvider, ctx context.Context, inode uint64, path string, n int) (*StreamPool, error)
NewStreamPool opens n parallel gRPC streams for the given file. On partial failure, already-opened streams are closed.
type WriteStats ¶
type WriteStats struct {
// contains filtered or unexported fields
}
WriteStats tracks timing for Write operations (for profiling)