sync

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package sync provides cross-machine observation sync via PostgreSQL.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyMutation

func ApplyMutation(ctx context.Context, engramAPI string, m RemoteMutation) error

ApplyMutation applies a single mutation to the local engram instance via its HTTP API.

func ConnectPG

func ConnectPG(ctx context.Context, dsn string) (*pgx.Conn, error)

ConnectPG parses the DSN and connects with client-side TCP keepalive.

func Pull

func Pull(ctx context.Context, pgConn *pgx.Conn, cfg *config.Config, logger *slog.Logger) (applied int, cursor int64, err error)

Pull fetches new mutations from PG in batches and applies them to the local engram instance. Returns the total number of mutations applied and the final cursor.

func Push

func Push(ctx context.Context, sqliteDB *sql.DB, pgConn *pgx.Conn, cfg *config.Config, logger *slog.Logger) (pushed int, highWater int64, err error)

Push reads new mutations from local SQLite and inserts them into PostgreSQL. Returns the number of mutations pushed and the new high-water mark.

func PushCursorFile

func PushCursorFile() string

PushCursorFile returns the path to the local push cursor file.

func ReadCursor

func ReadCursor(ctx context.Context, pgConn *pgx.Conn, machineID string) (int64, error)

ReadCursor reads this machine's pull cursor from PostgreSQL.

func ReadPushCursor

func ReadPushCursor(path string) (int64, error)

ReadPushCursor reads the last pushed seq from the local cursor file.

func SDNotify

func SDNotify(state string) bool

SDNotify sends a message to systemd's notification socket. Returns false if NOTIFY_SOCKET is not set (not running under systemd).

func UpdateCursor

func UpdateCursor(ctx context.Context, pgConn *pgx.Conn, machineID string, seq int64) error

UpdateCursor updates this machine's pull cursor in PostgreSQL.

func WatchdogLoop

func WatchdogLoop(ctx context.Context)

WatchdogLoop sends WATCHDOG=1 heartbeats to systemd at half the configured WatchdogSec interval. If WATCHDOG_USEC is not set, this is a no-op. Blocks until ctx is canceled.

func WritePushCursor

func WritePushCursor(path string, seq int64) error

WritePushCursor writes the push cursor to the local file.

Types

type Daemon

type Daemon struct {
	// contains filtered or unexported fields
}

Daemon runs the sync loop: pushes local mutations to PG and pulls remote mutations via LISTEN/NOTIFY with polling fallback.

func NewDaemon

func NewDaemon(cfg *config.Config, sqliteDB *sql.DB, dsn string, logger *slog.Logger) *Daemon

NewDaemon creates a new sync daemon.

func (*Daemon) Run

func (d *Daemon) Run(ctx context.Context) error

Run starts the daemon and blocks until the context is canceled.

type Mutation

type Mutation struct {
	Seq        int64
	TargetKey  string
	Entity     string
	EntityKey  string
	Op         string
	Payload    string
	Source     string
	Project    string
	OccurredAt string
}

Mutation represents a row from engram's sync_mutations table.

func ReadMutationsFromSQLite

func ReadMutationsFromSQLite(db *sql.DB, afterSeq int64) ([]Mutation, error)

ReadMutationsFromSQLite reads new mutations from engram's local SQLite database.

type RemoteMutation

type RemoteMutation struct {
	ID            int64
	SourceSeq     int64
	SourceMachine string
	Entity        string
	EntityKey     string
	Op            string
	Payload       json.RawMessage
	Scope         string
	Project       string
	OccurredAt    time.Time
}

RemoteMutation represents a row from the central PG engram_sync_mutations table.

func FetchMutations

func FetchMutations(ctx context.Context, pgConn *pgx.Conn, cfg *config.Config, afterSeq int64) ([]RemoteMutation, error)

FetchMutations fetches new mutations from PG for this machine, applying scope filters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL