queue

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MPL-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Conflict

type Conflict struct {
	Path          string
	LocalVersion  *FileVersion
	RemoteVersion *FileVersion
	DetectedAt    time.Time
}

Conflict represents a detected file conflict.

type ConflictResolution

type ConflictResolution int

ConflictResolution defines how to resolve a file conflict.

const (
	KeepBoth      ConflictResolution = iota // Default: rename local as .conflict.TIMESTAMP
	LastWriteWins                           // Higher mtime wins
	AskUser                                 // UI prompt for choice
)

type ConflictResolver

type ConflictResolver struct {
	Strategy ConflictResolution
	BaseDir  string // Local filesystem base directory
}

ConflictResolver handles file conflicts when both peers modify the same file.

func NewConflictResolver

func NewConflictResolver(baseDir string) *ConflictResolver

NewConflictResolver creates a resolver with KeepBoth as default strategy.

func (*ConflictResolver) CleanupConflict

func (r *ConflictResolver) CleanupConflict(conflictPath string) error

CleanupConflict removes a conflict file (after user resolves it).

func (*ConflictResolver) DetectConflict

func (r *ConflictResolver) DetectConflict(path string, localVer, remoteVer *FileVersion) *Conflict

DetectConflict checks if there's a conflict between local and remote versions. Returns nil if no conflict (identical content or one is clearly newer).

func (*ConflictResolver) GetLocalVersion

func (r *ConflictResolver) GetLocalVersion(path string) (*FileVersion, error)

GetLocalVersion reads the current local file version info.

func (*ConflictResolver) ListConflicts

func (r *ConflictResolver) ListConflicts() ([]string, error)

ListConflicts returns all .conflict files in the base directory.

func (*ConflictResolver) Resolve

func (r *ConflictResolver) Resolve(conflict *Conflict) (conflictPath string, err error)

Resolve handles a conflict according to the configured strategy. Returns the path where the local version was moved (for KeepBoth) or empty string.

type FileVersion

type FileVersion struct {
	Path       string
	Size       int64
	Mtime      int64  // unix nano
	Checksum   uint64 // xxHash3
	SourcePeer string // "local" or peer fingerprint prefix
}

FileVersion tracks file state for conflict detection.

type OperationType

type OperationType int

OperationType defines the type of filesystem operation being queued.

const (
	OpWrite OperationType = iota
	OpCreate
	OpDelete
	OpRename
	OpMkdir
	OpRmdir
)

func (OperationType) String

func (t OperationType) String() string

String returns the operation type as a string.

type QueuedOperation

type QueuedOperation struct {
	ID        uint64        // Monotonic operation ID
	Type      OperationType // Type of operation
	Path      string        // Relative path in mounted filesystem
	OldPath   string        // For RENAME operations: the source path
	Data      []byte        // For WRITE: the data to write
	Offset    int64         // For WRITE: file offset
	Size      int64         // File size (for CREATE/EDIT)
	Mode      uint32        // File mode/permissions
	Mtime     int64         // Modification time (unix nano)
	Checksum  uint64        // xxHash3 of Data (for integrity verification)
	CreatedAt time.Time     // When this operation was queued
	Retries   int           // Number of replay attempts
}

QueuedOperation represents a pending filesystem operation to be sent to peer.

type ReplayFunc

type ReplayFunc func(op *QueuedOperation) error

ReplayFunc is called for each operation during flush. Returns nil on success, error on failure.

type WriteQueue

type WriteQueue struct {

	// Configuration
	MaxOps   int   // Maximum queued operations (default 1000)
	MaxBytes int64 // Maximum queued data bytes (default 100 MB)
	// contains filtered or unexported fields
}

WriteQueue persists pending filesystem operations to survive crashes. When the peer is disconnected, operations are queued locally and replayed when the connection is restored.

Storage format: One JSON file per operation in a directory. Files are named: {id}_{timestamp}.json

func NewWriteQueue

func NewWriteQueue(dir string, sessionID string, logger *slog.Logger) (*WriteQueue, error)

NewWriteQueue creates a new write queue with file-based persistence. Operations are stored as JSON files in the specified directory.

func (*WriteQueue) Clear

func (q *WriteQueue) Clear() error

Clear removes all queued operations (use with caution).

func (*WriteQueue) Close

func (q *WriteQueue) Close() error

Close is a no-op for file-based storage (for interface compatibility).

func (*WriteQueue) Count

func (q *WriteQueue) Count() int

Count returns the number of pending operations.

func (*WriteQueue) DisableQueueing

func (q *WriteQueue) DisableQueueing()

DisableQueueing disables operation queueing (call when connected).

func (*WriteQueue) EnableQueueing

func (q *WriteQueue) EnableQueueing()

EnableQueueing enables operation queueing (call when disconnected).

func (*WriteQueue) Enqueue

func (q *WriteQueue) Enqueue(op *QueuedOperation) error

Enqueue adds an operation to the queue. Returns nil if queueing is disabled (operation should be sent directly).

func (*WriteQueue) EnqueueCreate

func (q *WriteQueue) EnqueueCreate(path string, size int64, mode uint32, mtime int64) error

EnqueueCreate is a convenience method for file creation.

func (*WriteQueue) EnqueueDelete

func (q *WriteQueue) EnqueueDelete(path string) error

EnqueueDelete is a convenience method for file deletion.

func (*WriteQueue) EnqueueRename

func (q *WriteQueue) EnqueueRename(oldPath, newPath string) error

EnqueueRename is a convenience method for file rename.

func (*WriteQueue) EnqueueWrite

func (q *WriteQueue) EnqueueWrite(path string, data []byte, offset int64) error

EnqueueWrite is a convenience method for write operations.

func (*WriteQueue) Flush

func (q *WriteQueue) Flush(replay ReplayFunc) []error

Flush replays all queued operations using the provided replay function. Successfully replayed operations are removed from the queue. Failed operations are retried up to 3 times before being marked as conflicts.

func (*WriteQueue) IsEnabled

func (q *WriteQueue) IsEnabled() bool

IsEnabled returns true if queueing is currently enabled.

func (*WriteQueue) Stats

func (q *WriteQueue) Stats() (enqueued, replayed, failed uint64)

Stats returns queue statistics.

func (*WriteQueue) TotalBytes

func (q *WriteQueue) TotalBytes() int64

TotalBytes returns the total size of queued data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL