filesystem

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MPL-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const ChunkSize = 512 * 1024 // 512 KiB

ChunkSize is the granularity for tracking downloaded segments.

View Source
const FilesystemBlockSize = 2 << 17 // 256 KiB - optimal for Linux cp (uses 128 KiB blocks)
View Source
const StreamPoolSize = 4

StreamPoolSize is the number of parallel gRPC streams per file. Matches typical FUSE readahead parallelism on Linux.

Variables

This section is empty.

Functions

func BitmapPath

func BitmapPath(filePath string) string

BitmapPath returns the .kdbitmap sidecar path for a given file path.

func GetFreeDiskSpace

func GetFreeDiskSpace(path string) (freeBytesAvail, totalNumberOfBytes, totalNumberFreeBytes uint64, err error)

Types

type ChunkBitmap

type ChunkBitmap struct {
	// contains filtered or unexported fields
}

ChunkBitmap tracks which chunks of a file have been downloaded. Thread-safe: Has() uses RLock, Set() uses Lock.

func LoadChunkBitmap

func LoadChunkBitmap(path string, expectedFileSize int64) (*ChunkBitmap, error)

LoadChunkBitmap reads a bitmap from a .kdbitmap file. Returns an error if the file is corrupt or the fileSize doesn't match.

func NewChunkBitmap

func NewChunkBitmap(fileSize int64) *ChunkBitmap

NewChunkBitmap creates a bitmap for a file of the given size using ChunkSize granularity. Returns nil for size <= 0 (empty files need no tracking).

func NewChunkBitmapWithSize

func NewChunkBitmapWithSize(fileSize int64, chunkSize int) *ChunkBitmap

NewChunkBitmapWithSize creates a bitmap with a custom chunk size.

func (*ChunkBitmap) ChunkSizeBytes

func (b *ChunkBitmap) ChunkSizeBytes() int

ChunkSizeBytes returns the chunk size used by this bitmap.

func (*ChunkBitmap) FileSize

func (b *ChunkBitmap) FileSize() int64

FileSize returns the tracked file size.

func (*ChunkBitmap) Has

func (b *ChunkBitmap) Has(chunkIdx int) bool

Has returns true if the chunk at chunkIdx has been downloaded.

func (*ChunkBitmap) HasRange

func (b *ChunkBitmap) HasRange(offset int64, size int) bool

HasRange returns true if all chunks covering [offset, offset+size) are downloaded.

func (*ChunkBitmap) Have

func (b *ChunkBitmap) Have() int

Have returns the number of downloaded chunks.

func (*ChunkBitmap) IsComplete

func (b *ChunkBitmap) IsComplete() bool

IsComplete returns true if all chunks have been downloaded.

func (*ChunkBitmap) NextMissing

func (b *ChunkBitmap) NextMissing(from int) int

NextMissing returns the index of the first missing chunk starting from `from`. Returns -1 if no missing chunks remain from that point.

func (*ChunkBitmap) Progress

func (b *ChunkBitmap) Progress() float64

Progress returns download completion as a fraction [0.0, 1.0].

func (*ChunkBitmap) Save

func (b *ChunkBitmap) Save(path string) error

Save writes the bitmap state to a .kdbitmap file.

func (*ChunkBitmap) Set

func (b *ChunkBitmap) Set(chunkIdx int)

Set marks a chunk as downloaded. Idempotent.

func (*ChunkBitmap) SetRange

func (b *ChunkBitmap) SetRange(offset int64, size int)

SetRange marks all chunks covering [offset, offset+size) as downloaded.

func (*ChunkBitmap) Total

func (b *ChunkBitmap) Total() int

Total returns the total number of chunks.

type Dir

type Dir struct {
	Inode uint64 `json:"inode"` // Inodes must be unique and not re-used.
	Name  string `json:"name"`

	RelativePath   string `json:"relativePath"`      // Relative (to root) path in the mounted filesystem.
	RealPathOfFile string `json:"pathOnLocalSystem"` // The Path on the local system.

	PeerLastEdit   uint64 `json:"peerLastEdit"`
	IsLocalPresent bool   `json:"isLocalPresent"`

	LocalDownloadFolder string // The folder where the files from the peer are downloaded.

	Parent *Dir
	Root   *Dir

	OpenFileHandlers map[uint64]*HandleEntry
	OpenMapLock      sync.RWMutex

	Adm       sync.RWMutex
	AllDirMap map[string]*Dir

	AfmLock    sync.RWMutex
	AllFileMap map[string]*File

	OnLocalChange      func(event types.FileEvent)
	OpenStreamProvider func() types.FileStreamProvider

	// Collab sync options (propagated from FS).
	PrefetchOnOpen bool // If true, fetch entire file on Open() and write to local disk.
	PushOnWrite    bool // If true, async push deltas to peer on Write().

	RemoteFilesLock sync.RWMutex
	RemoteFiles     map[string]*File

	// PrefetchSem limits concurrent prefetch goroutines.
	// Without this, a large clone (600+ files) spawns 600+ simultaneous
	// StreamFile gRPC streams which overwhelm the connection.
	PrefetchSem chan struct{}
	// contains filtered or unexported fields
}

func (*Dir) Access

func (d *Dir) Access(path string, _mask uint32) (errCode int)

func (*Dir) AddRemoteFile

func (d *Dir) AddRemoteFile(logger *slog.Logger, path string, name string, stat *winfuse.Stat_t) error

func (*Dir) Chmod

func (d *Dir) Chmod(path string, mode uint32) (errCode int)

func (*Dir) Chown

func (d *Dir) Chown(path string, uid uint32, gid uint32) (errCode int)

func (*Dir) Create

func (d *Dir) Create(path string, flags int, mode uint32) (errCode int, fh uint64)

Create creates a new file.

From open(2) man page on Intel macOS:

"The flags specified for the oflag argument must include exactly one of
 the following file access modes:
   O_RDONLY    open for reading only
   O_WRONLY    open for writing only
   O_RDWR      open for reading and writing

 In addition any combination of the following values can be or'ed in oflag:
   O_APPEND    append on each write
   O_CREAT     create file if it does not exist
   O_TRUNC     truncate size to 0
   O_EXCL      error if O_CREAT and the file exists"

Use winfuse.O_ACCMODE to extract access mode (portable across macOS/Linux/Windows).

func (*Dir) CreateEx

func (d *Dir) CreateEx(path string, mode uint32, fi *winfuse.FileInfo_t) (errCode int)

CreateEx implements FileSystemOpenEx interface for per-file direct_io control.

func (*Dir) Destroy

func (d *Dir) Destroy()

Called on unmount.

func (*Dir) EditRemoteFile

func (d *Dir) EditRemoteFile(logger *slog.Logger, path string, name string, stat *winfuse.Stat_t) error

func (*Dir) Flush

func (d *Dir) Flush(path string, fh uint64) (errCode int)

func (*Dir) Fsync

func (d *Dir) Fsync(path string, datasync bool, fh uint64) (errCode int)

func (*Dir) Fsyncdir

func (d *Dir) Fsyncdir(path string, datasync bool, fh uint64) (errCode int)

func (*Dir) Getattr

func (d *Dir) Getattr(path string, stat *winfuse.Stat_t, fh uint64) (errCode int)

func (*Dir) Getxattr

func (d *Dir) Getxattr(path string, name string) (errCode int, data []byte)

func (*Dir) Init

func (d *Dir) Init()
func (d *Dir) Link(oldpath string, newpath string) (errCode int)

func (*Dir) Listxattr

func (d *Dir) Listxattr(path string, fill func(name string) bool) (errCode int)

func (*Dir) Mkdir

func (d *Dir) Mkdir(path string, mode uint32) (errCode int)

func (*Dir) MkdirFromPeer

func (d *Dir) MkdirFromPeer(path string, mode uint32) (errCode int)

MkdirFromPeer creates a directory without notifying the peer (to avoid loops).

func (*Dir) Mknod

func (d *Dir) Mknod(path string, mode uint32, dev uint64) (errCode int)

func (*Dir) Open

func (d *Dir) Open(path string, flags int) (errCode int, retFh uint64)

func (*Dir) OpenEx

func (d *Dir) OpenEx(path string, fi *winfuse.FileInfo_t) (errCode int)

OpenEx implements FileSystemOpenEx interface for per-file direct_io control.

func (*Dir) Opendir

func (d *Dir) Opendir(path string) (errCode int, retFh uint64)

func (*Dir) Read

func (d *Dir) Read(path string, buff []byte, offset int64, fh uint64) (errCode int)

func (*Dir) Readdir

func (d *Dir) Readdir(path string, fill func(name string, stat *winfuse.Stat_t, offset int64) bool, offset int64, fh uint64) (errCode int)
func (d *Dir) Readlink(path string) (errCode int, target string)

func (*Dir) Release

func (d *Dir) Release(path string, fh uint64) (errCode int)

func (*Dir) Releasedir

func (d *Dir) Releasedir(path string, fh uint64) (errCode int)

func (*Dir) Removexattr

func (d *Dir) Removexattr(path string, name string) (errCode int)

func (*Dir) Rename

func (d *Dir) Rename(oldpath string, newpath string) (errCode int)

Mac OS High Level apps use Rename SWAP, which is really fun from my experience. Note: cgofuse does not expose renamex_np with RENAME_SWAP flag. When apps try atomic rename-swap, we fall back to basic rename.

func (*Dir) Rmdir

func (d *Dir) Rmdir(path string) (errCode int)

func (*Dir) RmdirFromPeer

func (d *Dir) RmdirFromPeer(path string) (errCode int)

RmdirFromPeer removes a directory without notifying the peer (to avoid loops).

func (*Dir) Setxattr

func (d *Dir) Setxattr(path string, name string, value []byte, flags int) (errCode int)

func (*Dir) Statfs

func (d *Dir) Statfs(path string, stat *winfuse.Statfs_t) (errCode int)
func (d *Dir) Symlink(target string, newpath string) (errCode int)

func (*Dir) Truncate

func (d *Dir) Truncate(path string, size int64, fh uint64) (errCode int)

Note: On windows open does not have a truncate flag, thus Open is immediately followed by Truncate.

func (d *Dir) Unlink(path string) (errCode int)

Unlink removes a file.

func (*Dir) UnlinkFromPeer

func (d *Dir) UnlinkFromPeer(path string) (errCode int)

UnlinkFromPeer removes a file without notifying the peer (to avoid loops).

func (*Dir) Utimens

func (d *Dir) Utimens(path string, tmsp []winfuse.Timespec) (errCode int)

Utimens sets file access and modification times. We return success but don't persist the changes (timestamps come from underlying storage).

func (*Dir) Write

func (d *Dir) Write(path string, buff []byte, offset int64, fh uint64) (errCode int)

The method returns the number of bytes written.

type DownloadState

type DownloadState struct {
	TotalSize       atomic.Uint64 // Expected total bytes from peer.
	BytesDownloaded atomic.Uint64 // Bytes successfully written to local cache.
	LastReadOffset  atomic.Int64  // Last successfully read offset.
	StartedAt       atomic.Int64  // Unix nano when download started.
	LastSuccessAt   atomic.Int64  // Unix nano of last successful read.
	AttemptCount    atomic.Int32  // Reconnection attempts since last success.
	MaxRetries      int           // Max retries before giving up (default 5).
	// contains filtered or unexported fields
}

DownloadState tracks download progress for resumption on reconnect.

func (*DownloadState) CanRetry

func (ds *DownloadState) CanRetry() bool

CanRetry checks if we should attempt reconnection.

func (*DownloadState) Checksum

func (ds *DownloadState) Checksum() uint64

Checksum returns the current xxHash3 checksum of received data.

func (*DownloadState) IsComplete

func (ds *DownloadState) IsComplete() bool

IsComplete returns true if all bytes have been downloaded.

func (*DownloadState) Progress

func (ds *DownloadState) Progress() float64

Progress returns download completion percentage (0-100).

func (*DownloadState) RecordAttempt

func (ds *DownloadState) RecordAttempt() int32

RecordAttempt increments the retry counter.

func (*DownloadState) Reset

func (ds *DownloadState) Reset(totalSize uint64)

Reset clears download state for a new download.

func (*DownloadState) UpdateChecksum

func (ds *DownloadState) UpdateChecksum(data []byte)

UpdateChecksum adds data to the running checksum. Call this with the actual bytes received (in order received, not by offset).

func (*DownloadState) UpdateProgress

func (ds *DownloadState) UpdateProgress(offset int64, bytesRead int)

UpdateProgress records successful read progress and updates checksum.

type FS

type FS struct {
	OnLocalChange      func(event types.FileEvent)
	OpenStreamProvider func() types.FileStreamProvider

	// Collab sync options (set from env before Mount).
	PrefetchOnOpen bool // If true, fetch entire file on Open() and write to local disk.
	PushOnWrite    bool // If true, async push deltas to peer on Write().

	Root *Dir
	// contains filtered or unexported fields
}

func NewFS

func NewFS(logger *slog.Logger) *FS

func (*FS) Mount

func (fs *FS) Mount(mountPoint string, isSecond bool, downloadPath string) error

Mount blocks for the lifetime of the FUSE session. It returns an error immediately if the mount point is invalid or host.Mount() refuses to come up; on success it returns nil only after a clean unmount.

func (*FS) Unmount

func (fs *FS) Unmount()

type File

type File struct {
	Inode           uint64 `json:"inode"` // Inodes must be unique and not re-used.
	CurrentHandleID uint64 // Opaque FUSE handle ID for the currently-open fd.
	Name            string `json:"name"`

	RelativePath string `json:"relativePath"` // Relative (to root) path in the mounted filesystem.

	RealPathOfFile string // The Path on the local system.

	Parent *Dir
	Root   *Dir

	LastEditTime uint64 `json:"lastEdit"` // Use time.Now().UnixNano().
	CreatedTime  uint64 `json:"createdAt"`

	PeerLastEdit   uint64 `json:"peerLastEdit"`
	IsLocalPresent bool   `json:"isLocalPresent"`

	NotLocalSynced  bool
	NotRemoteSynced bool

	LocalNewer bool

	HadEdits bool

	// WasTruncatedToZero tracks if Truncate(size=0) was explicitly called.
	// Used with HadEdits to distinguish legitimate empty files from transient states.
	WasTruncatedToZero bool

	// LastNotifiedSize tracks the file size we last sent to peer in ADD_FILE.
	// Used to avoid sending duplicate notifications with same size during file copy.
	LastNotifiedSize int64

	// PeerStoppedSharing is set when peer sends REMOVE_FILE but download is in progress.
	// Once download completes (Release with 0 open handles), the file reference is removed.
	PeerStoppedSharing bool

	StreamProvider types.FileStreamProvider
	StreamPool     *StreamPool        // Pool of parallel gRPC streams for on-demand reads.
	StreamCancel   context.CancelFunc // Cancel function for the stream context.
	CacheFD        *os.File           // Persistent cache file descriptor for on-demand writes.
	CacheWg        sync.WaitGroup     // Tracks in-flight async cache writes; waited on in Release.

	// Download resumption state.
	Download DownloadState

	// Bitmap tracks which 512 KiB chunks have been downloaded from the remote peer.
	// nil for local-origin files or empty files (size=0).
	Bitmap *ChunkBitmap

	// PrefetchCancel cancels the background prefetch goroutine for this file.
	PrefetchCancel context.CancelFunc

	OnLocalChange func(event types.FileEvent)
	// contains filtered or unexported fields
}

func (*File) CountOpenDescriptors

func (f *File) CountOpenDescriptors() uint64

CountOpenDescriptors returns the number of open file handles.

func (*File) NotifyPeer

func (f *File) NotifyPeer()

type HandleEntry

type HandleEntry struct {
	FD   int   // The actual kernel file descriptor for syscalls.
	File *File // The File metadata struct.
}

HandleEntry maps an opaque FUSE file handle to the actual kernel fd and File metadata. Opaque handles prevent fd-recycling races: the kernel reuses fd numbers after close(), but handle IDs are monotonically increasing and never reused.

type OpenFileCounter

type OpenFileCounter struct {
	// contains filtered or unexported fields
}

Create and Open calls must have a corresponding Release call.

func (*OpenFileCounter) CountOpenDescriptors

func (ofc *OpenFileCounter) CountOpenDescriptors() uint64

func (*OpenFileCounter) Open

func (ofc *OpenFileCounter) Open()

func (*OpenFileCounter) Release

func (ofc *OpenFileCounter) Release() uint64

type StreamPool

type StreamPool struct {
	// contains filtered or unexported fields
}

StreamPool holds N parallel gRPC streams for a single file. FUSE reads pick a stream by chunk-index modulo N, eliminating lock contention between concurrent readahead requests.

func NewStreamPool

func NewStreamPool(provider types.FileStreamProvider, ctx context.Context, inode uint64, path string, n int) (*StreamPool, error)

NewStreamPool opens n parallel gRPC streams for the given file. On partial failure, already-opened streams are closed.

func (*StreamPool) Close

func (p *StreamPool) Close() error

Close closes all streams in the pool.

func (*StreamPool) ReadAt

func (p *StreamPool) ReadAt(ctx context.Context, offset int64, size int64) ([]byte, error)

ReadAt routes the request to a stream selected by chunk index. Different chunks hit different streams for true parallelism.

type WriteStats

type WriteStats struct {
	// contains filtered or unexported fields
}

WriteStats tracks timing for Write operations (for profiling)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL