oversync

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OpInsert = "INSERT"
	OpUpdate = "UPDATE"
	OpDelete = "DELETE"
)

Operation constants for change operations

View Source
const SourceIDHeader = "Oversync-Source-ID"
View Source
const SyncProtocolVersion = "v1"

Variables

This section is empty.

Functions

func ActorMiddleware added in v0.2.0

func ActorMiddleware(cfg ActorMiddlewareConfig) func(http.Handler) http.Handler

ActorMiddleware injects the runtime actor from trusted auth context plus sync source header.

func ContextWithActor

func ContextWithActor(ctx context.Context, actor Actor) context.Context

ContextWithActor stores the runtime actor in context for transport/runtime handoff.

func ConvertBase64EncodedUUID

func ConvertBase64EncodedUUID(value string) (uuid.UUID, error)

func Key

func Key(schema, table string) string

Key creates a normalized schema.table key (public helper).

func OptionallyConvertBase64EncodedUUID

func OptionallyConvertBase64EncodedUUID(payloadValue string) (any, error)

Types

type Actor

type Actor struct {
	UserID   string `json:"user_id"`
	SourceID string `json:"source_id,omitempty"`
}

Actor represents the authenticated runtime identity for sync operations.

func ActorFromContext

func ActorFromContext(ctx context.Context) (Actor, bool)

ActorFromContext retrieves the runtime actor from context.

type ActorMiddlewareConfig added in v0.2.0

type ActorMiddlewareConfig struct {
	UserIDFromContext func(context.Context) (string, error)
}

ActorMiddlewareConfig configures the standard HTTP actor middleware.

type Bundle

type Bundle struct {
	BundleSeq      int64       `json:"bundle_seq"`
	SourceID       string      `json:"source_id"`
	SourceBundleID int64       `json:"source_bundle_id"`
	RowCount       int64       `json:"row_count,omitempty"`
	BundleHash     string      `json:"bundle_hash,omitempty"`
	Rows           []BundleRow `json:"rows"`
}

Bundle represents one committed durable sync unit in the target bundle-based contract.

type BundleCapabilitiesLimits

type BundleCapabilitiesLimits struct {
	MaxRowsPerBundle                   int   `json:"max_rows_per_bundle,omitempty"`
	MaxBytesPerBundle                  int   `json:"max_bytes_per_bundle,omitempty"`
	MaxBundlesPerPull                  int   `json:"max_bundles_per_pull,omitempty"`
	DefaultRowsPerPushChunk            int   `json:"default_rows_per_push_chunk,omitempty"`
	MaxRowsPerPushChunk                int   `json:"max_rows_per_push_chunk,omitempty"`
	PushSessionTTLSeconds              int   `json:"push_session_ttl_seconds,omitempty"`
	DefaultRowsPerCommittedBundleChunk int   `json:"default_rows_per_committed_bundle_chunk,omitempty"`
	MaxRowsPerCommittedBundleChunk     int   `json:"max_rows_per_committed_bundle_chunk,omitempty"`
	DefaultRowsPerSnapshotChunk        int   `json:"default_rows_per_snapshot_chunk,omitempty"`
	MaxRowsPerSnapshotChunk            int   `json:"max_rows_per_snapshot_chunk,omitempty"`
	SnapshotSessionTTLSeconds          int   `json:"snapshot_session_ttl_seconds,omitempty"`
	MaxRowsPerSnapshotSession          int64 `json:"max_rows_per_snapshot_session,omitempty"`
	MaxBytesPerSnapshotSession         int64 `json:"max_bytes_per_snapshot_session,omitempty"`
	InitializationLeaseTTLSeconds      int   `json:"initialization_lease_ttl_seconds,omitempty"`
}

BundleCapabilitiesLimits reports bundle-oriented guardrails in the target contract.

type BundleRow

type BundleRow struct {
	Schema     string          `json:"schema"`
	Table      string          `json:"table"`
	Key        SyncKey         `json:"key"`
	Op         string          `json:"op"`
	RowVersion int64           `json:"row_version"`
	Payload    json.RawMessage `json:"payload,omitempty"`
}

BundleRow describes one normalized row effect inside a committed sync bundle.

type BundleSource

type BundleSource struct {
	SourceID       string
	SourceBundleID int64
}

BundleSource identifies one server-side committed bundle source.

type CapabilitiesResponse

type CapabilitiesResponse struct {
	ProtocolVersion      string                    `json:"protocol_version"`
	SchemaVersion        int                       `json:"schema_version"`
	AppName              string                    `json:"app_name,omitempty"`
	RegisteredTables     []string                  `json:"registered_tables,omitempty"`
	RegisteredTableSpecs []RegisteredTableSpec     `json:"registered_table_specs,omitempty"`
	Features             map[string]bool           `json:"features"`
	BundleLimits         *BundleCapabilitiesLimits `json:"bundle_limits,omitempty"`
}

CapabilitiesResponse describes the currently supported sync protocol surface.

type CommittedBundleChunkInvalidError

type CommittedBundleChunkInvalidError struct {
	Message string
}

func (*CommittedBundleChunkInvalidError) Error

type CommittedBundleNotFoundError

type CommittedBundleNotFoundError struct {
	BundleSeq int64
}

func (*CommittedBundleNotFoundError) Error

type CommittedBundleRowsResponse

type CommittedBundleRowsResponse struct {
	BundleSeq      int64       `json:"bundle_seq"`
	SourceID       string      `json:"source_id"`
	SourceBundleID int64       `json:"source_bundle_id"`
	RowCount       int64       `json:"row_count"`
	BundleHash     string      `json:"bundle_hash"`
	Rows           []BundleRow `json:"rows"`
	NextRowOrdinal int64       `json:"next_row_ordinal"`
	HasMore        bool        `json:"has_more"`
}

type ConnectInvalidError added in v0.1.2

type ConnectInvalidError struct {
	Message string
}

func (*ConnectInvalidError) Error added in v0.1.2

func (e *ConnectInvalidError) Error() string

type ConnectRequest added in v0.1.2

type ConnectRequest struct {
	HasLocalPendingRows bool `json:"has_local_pending_rows"`
}

type ConnectResponse added in v0.1.2

type ConnectResponse struct {
	Resolution       string `json:"resolution"`
	InitializationID string `json:"initialization_id,omitempty"`
	LeaseExpiresAt   string `json:"lease_expires_at,omitempty"`
	RetryAfterSec    int    `json:"retry_after_seconds,omitempty"`
}

type DiscoveredSchema

type DiscoveredSchema struct {
	TableOrder   []string       // Ordered list of schema.table (parents first, cycle members grouped deterministically)
	OrderIdx     map[string]int // schema.table -> order index for O(1) lookups
	Dependencies map[string][]string
	CycleGroup   map[string]int   // schema.table -> SCC id (>0 only when part of a multi-table cycle)
	Cycles       map[int][]string // SCC id -> sorted cycle members
}

DiscoveredSchema contains the discovered table relationships

type ErrorResponse

type ErrorResponse struct {
	Error   string `json:"error"`
	Message string `json:"message"`
}

ErrorResponse represents an error response

type ForeignKeyConstraint

type ForeignKeyConstraint struct {
	ConstraintName string // Name of the FK constraint
	TableSchema    string // Schema of the child table
	TableName      string // Name of the child table
	ColumnName     string // Column in child table
	Ordinal        int    // Column ordinal within the FK constraint
	RefSchema      string // Schema of the referenced table
	RefTable       string // Referenced parent table
	RefColumn      string // Referenced column in parent table
	MatchOption    string // SIMPLE / FULL / PARTIAL
	OnDeleteAction string // CASCADE / RESTRICT / SET NULL / SET DEFAULT / NO ACTION
	OnUpdateAction string // CASCADE / RESTRICT / SET NULL / SET DEFAULT / NO ACTION
}

ForeignKeyConstraint represents a foreign key relationship

type HTTPSyncHandlers

type HTTPSyncHandlers struct {
	// contains filtered or unexported fields
}

HTTPSyncHandlers provides HTTP handlers for the two-way sync API

func NewHTTPSyncHandlers

func NewHTTPSyncHandlers(service *SyncService, logger *slog.Logger) *HTTPSyncHandlers

NewHTTPSyncHandlers creates a new instance of sync handlers

func (*HTTPSyncHandlers) HandleCapabilities

func (h *HTTPSyncHandlers) HandleCapabilities(w http.ResponseWriter, r *http.Request)

HandleCapabilities returns the current sync capabilities surface.

func (*HTTPSyncHandlers) HandleCommitPushSession

func (h *HTTPSyncHandlers) HandleCommitPushSession(w http.ResponseWriter, r *http.Request)

func (*HTTPSyncHandlers) HandleConnect added in v0.1.2

func (h *HTTPSyncHandlers) HandleConnect(w http.ResponseWriter, r *http.Request)

func (*HTTPSyncHandlers) HandleCreatePushSession

func (h *HTTPSyncHandlers) HandleCreatePushSession(w http.ResponseWriter, r *http.Request)

func (*HTTPSyncHandlers) HandleCreateSnapshotSession

func (h *HTTPSyncHandlers) HandleCreateSnapshotSession(w http.ResponseWriter, r *http.Request)

HandleCreateSnapshotSession creates one frozen snapshot session for chunked hydrate/recover.

func (*HTTPSyncHandlers) HandleDeletePushSession

func (h *HTTPSyncHandlers) HandleDeletePushSession(w http.ResponseWriter, r *http.Request)

func (*HTTPSyncHandlers) HandleDeleteSnapshotSession

func (h *HTTPSyncHandlers) HandleDeleteSnapshotSession(w http.ResponseWriter, r *http.Request)

HandleDeleteSnapshotSession deletes an existing frozen snapshot session.

func (*HTTPSyncHandlers) HandleGetCommittedBundleRows

func (h *HTTPSyncHandlers) HandleGetCommittedBundleRows(w http.ResponseWriter, r *http.Request)

func (*HTTPSyncHandlers) HandleGetSnapshotChunk

func (h *HTTPSyncHandlers) HandleGetSnapshotChunk(w http.ResponseWriter, r *http.Request)

HandleGetSnapshotChunk returns one chunk of rows from a frozen snapshot session.

func (*HTTPSyncHandlers) HandleHealth

func (h *HTTPSyncHandlers) HandleHealth(w http.ResponseWriter, r *http.Request)

HandleHealth returns a readiness-oriented health response derived from the service status snapshot.

func (*HTTPSyncHandlers) HandlePull

func (h *HTTPSyncHandlers) HandlePull(w http.ResponseWriter, r *http.Request)

HandlePull processes bundle pull requests.

func (*HTTPSyncHandlers) HandlePushSessionChunk

func (h *HTTPSyncHandlers) HandlePushSessionChunk(w http.ResponseWriter, r *http.Request)

func (*HTTPSyncHandlers) HandleStatus

func (h *HTTPSyncHandlers) HandleStatus(w http.ResponseWriter, r *http.Request)

HandleStatus returns the current lifecycle and operability status snapshot.

type HistoryPrunedError

type HistoryPrunedError struct {
	UserID        string
	ProvidedSeq   int64
	RetainedFloor int64
}

func (*HistoryPrunedError) Error

func (e *HistoryPrunedError) Error() string

func (*HistoryPrunedError) Is

func (e *HistoryPrunedError) Is(target error) bool

type InitializationExpiredError added in v0.1.2

type InitializationExpiredError struct {
	Message string
}

func (*InitializationExpiredError) Error added in v0.1.2

type InitializationStaleError added in v0.1.2

type InitializationStaleError struct {
	Message string
}

func (*InitializationStaleError) Error added in v0.1.2

func (e *InitializationStaleError) Error() string

type PayloadExtractor

type PayloadExtractor struct {
	// contains filtered or unexported fields
}

PayloadExtractor provides utilities for extracting typed fields from JSON payloads. This is commonly used in ApplyUpsert handlers to parse sync payloads.

func NewPayloadExtractor

func NewPayloadExtractor(payload []byte) (*PayloadExtractor, error)

NewPayloadExtractor creates a new PayloadExtractor from JSON payload bytes.

func NewPayloadExtractorFromMap

func NewPayloadExtractorFromMap(data map[string]any) *PayloadExtractor

NewPayloadExtractorFromMap creates a new PayloadExtractor from an existing map.

func (*PayloadExtractor) Base64Field

func (p *PayloadExtractor) Base64Field(key string) []byte

Base64Field extracts a nullable base64-decoded byte slice from the payload. Returns nil if the field is missing, null, not a string, or cannot be decoded.

func (*PayloadExtractor) Base64FieldRequired

func (p *PayloadExtractor) Base64FieldRequired(key string) ([]byte, error)

Base64FieldRequired extracts a required base64-decoded byte slice from the payload. Returns an error if the field is missing, null, not a string, or cannot be decoded.

func (*PayloadExtractor) BoolField

func (p *PayloadExtractor) BoolField(key string) *bool

BoolField extracts a nullable bool from the payload. Accepts bool values, numeric values (0=false, non-zero=true), and string values ("true"/"false", "1"/"0"). Returns nil if the field is missing, null, or cannot be converted.

func (*PayloadExtractor) BoolFieldRequired

func (p *PayloadExtractor) BoolFieldRequired(key string) (bool, error)

BoolFieldRequired extracts a required bool from the payload. Returns an error if the field is missing, null, or cannot be converted.

func (*PayloadExtractor) Float64Field

func (p *PayloadExtractor) Float64Field(key string) *float64

Float64Field extracts a nullable float64 from the payload. Accepts both numeric values and numeric strings. Returns nil if the field is missing, null, or cannot be converted.

func (*PayloadExtractor) Float64FieldRequired

func (p *PayloadExtractor) Float64FieldRequired(key string) (float64, error)

Float64FieldRequired extracts a required float64 from the payload. Returns an error if the field is missing, null, or cannot be converted.

func (*PayloadExtractor) HasField

func (p *PayloadExtractor) HasField(key string) bool

HasField checks if a field exists in the payload (even if it's null).

func (*PayloadExtractor) Int64Field

func (p *PayloadExtractor) Int64Field(key string) *int64

Int64Field extracts a nullable int64 from the payload. Accepts both numeric values and numeric strings. Returns nil if the field is missing, null, or cannot be converted.

func (*PayloadExtractor) Int64FieldRequired

func (p *PayloadExtractor) Int64FieldRequired(key string) (int64, error)

Int64FieldRequired extracts a required int64 from the payload. Returns an error if the field is missing, null, or cannot be converted.

func (*PayloadExtractor) StrField

func (p *PayloadExtractor) StrField(key string) *string

StrField extracts a nullable string from the payload. Returns nil if the field is missing, null, or not a string.

func (*PayloadExtractor) StrFieldRequired

func (p *PayloadExtractor) StrFieldRequired(key string) (string, error)

StrFieldRequired extracts a required string from the payload. Returns an error if the field is missing, null, or not a string.

func (*PayloadExtractor) UUIDField

func (p *PayloadExtractor) UUIDField(key string) *uuid.UUID

UUIDField extracts a nullable UUID from the payload. Accepts string values that can be parsed as UUIDs. Returns nil if the field is missing, null, or cannot be parsed as UUID.

func (*PayloadExtractor) UUIDFieldRequired

func (p *PayloadExtractor) UUIDFieldRequired(key string) (uuid.UUID, error)

UUIDFieldRequired extracts a required UUID from the payload. Returns an error if the field is missing, null, or cannot be parsed as UUID.

type PullResponse

type PullResponse struct {
	StableBundleSeq int64    `json:"stable_bundle_seq"`
	Bundles         []Bundle `json:"bundles"`
	HasMore         bool     `json:"has_more"`
}

PullResponse returns one or more complete committed bundles.

type PushChunkInvalidError

type PushChunkInvalidError struct {
	Message string
}

func (*PushChunkInvalidError) Error

func (e *PushChunkInvalidError) Error() string

type PushChunkOutOfOrderError

type PushChunkOutOfOrderError struct {
	PushID   string
	Expected int64
	Actual   int64
}

func (*PushChunkOutOfOrderError) Error

func (e *PushChunkOutOfOrderError) Error() string

type PushCommitInvalidError

type PushCommitInvalidError struct {
	Message string
}

func (*PushCommitInvalidError) Error

func (e *PushCommitInvalidError) Error() string

type PushConflictDetails

type PushConflictDetails struct {
	Schema           string          `json:"schema"`
	Table            string          `json:"table"`
	Key              SyncKey         `json:"key"`
	Op               string          `json:"op"`
	BaseRowVersion   int64           `json:"base_row_version"`
	ServerRowVersion int64           `json:"server_row_version"`
	ServerRowDeleted bool            `json:"server_row_deleted"`
	ServerRow        json.RawMessage `json:"server_row"`
}

PushConflictDetails describes one authoritative row state that rejected a push commit. The payload is deterministic:

  • schema/table are normalized lowercase identifiers
  • key uses the same canonical sync-key JSON shape as the rest of the API
  • server_row_version is 0 when no authoritative row_state exists yet
  • server_row_deleted distinguishes tombstones from an absent row when server_row is null
  • server_row is the canonical wire-format full row after-image, or null if deleted/missing

type PushConflictError

type PushConflictError struct {
	Message  string
	Conflict *PushConflictDetails
}

func (*PushConflictError) Error

func (e *PushConflictError) Error() string

type PushConflictResponse

type PushConflictResponse struct {
	Error    string               `json:"error"`
	Message  string               `json:"message"`
	Conflict *PushConflictDetails `json:"conflict,omitempty"`
}

PushConflictResponse preserves the legacy error/message envelope while adding machine-readable conflict details.

type PushRequestRow

type PushRequestRow struct {
	Schema         string          `json:"schema"`
	Table          string          `json:"table"`
	Key            SyncKey         `json:"key"`
	Op             string          `json:"op"`
	BaseRowVersion int64           `json:"base_row_version"`
	Payload        json.RawMessage `json:"payload,omitempty"`
}

PushRequestRow is one locally dirty row intent sent by the client.

type PushSessionChunkRequest

type PushSessionChunkRequest struct {
	StartRowOrdinal int64            `json:"start_row_ordinal"`
	Rows            []PushRequestRow `json:"rows"`
}

type PushSessionChunkResponse

type PushSessionChunkResponse struct {
	PushID                 string `json:"push_id"`
	NextExpectedRowOrdinal int64  `json:"next_expected_row_ordinal"`
}

type PushSessionCommitResponse

type PushSessionCommitResponse struct {
	BundleSeq      int64  `json:"bundle_seq"`
	SourceID       string `json:"source_id"`
	SourceBundleID int64  `json:"source_bundle_id"`
	RowCount       int64  `json:"row_count"`
	BundleHash     string `json:"bundle_hash"`
}

type PushSessionCreateRequest

type PushSessionCreateRequest struct {
	SourceBundleID   int64  `json:"source_bundle_id"`
	PlannedRowCount  int64  `json:"planned_row_count"`
	InitializationID string `json:"initialization_id,omitempty"`
}

type PushSessionCreateResponse

type PushSessionCreateResponse struct {
	PushID                 string `json:"push_id,omitempty"`
	Status                 string `json:"status"`
	PlannedRowCount        int64  `json:"planned_row_count,omitempty"`
	NextExpectedRowOrdinal int64  `json:"next_expected_row_ordinal,omitempty"`
	BundleSeq              int64  `json:"bundle_seq,omitempty"`
	SourceID               string `json:"source_id,omitempty"`
	SourceBundleID         int64  `json:"source_bundle_id,omitempty"`
	RowCount               int64  `json:"row_count,omitempty"`
	BundleHash             string `json:"bundle_hash,omitempty"`
}

type PushSessionExpiredError

type PushSessionExpiredError struct {
	PushID string
}

func (*PushSessionExpiredError) Error

func (e *PushSessionExpiredError) Error() string

type PushSessionForbiddenError

type PushSessionForbiddenError struct {
	PushID string
}

func (*PushSessionForbiddenError) Error

func (e *PushSessionForbiddenError) Error() string

type PushSessionInvalidError

type PushSessionInvalidError struct {
	Message string
}

func (*PushSessionInvalidError) Error

func (e *PushSessionInvalidError) Error() string

type PushSessionNotFoundError

type PushSessionNotFoundError struct {
	PushID string
}

func (*PushSessionNotFoundError) Error

func (e *PushSessionNotFoundError) Error() string

type PushValidationError

type PushValidationError struct {
	Message string
}

func (*PushValidationError) Error

func (e *PushValidationError) Error() string

type RegisteredTable

type RegisteredTable struct {
	Schema         string   `json:"schema"`                     // Schema name (e.g., "public", "crm", "business")
	Table          string   `json:"table"`                      // Table name (e.g., "users", "posts")
	SyncKeyColumns []string `json:"sync_key_columns,omitempty"` // Ordered sync key columns for the target bundle-based protocol
}

RegisteredTable represents a table that is registered for sync operations

type RegisteredTableSpec

type RegisteredTableSpec struct {
	Schema         string   `json:"schema"`
	Table          string   `json:"table"`
	SyncKeyColumns []string `json:"sync_key_columns,omitempty"`
}

RegisteredTableSpec describes one registered sync table in the newer contract surface.

type SchemaDiscovery

type SchemaDiscovery struct {
	// contains filtered or unexported fields
}

SchemaDiscovery handles automatic discovery of table relationships and ordering

func NewSchemaDiscovery

func NewSchemaDiscovery(pool *pgxpool.Pool, logger *slog.Logger) *SchemaDiscovery

NewSchemaDiscovery creates a new schema discovery instance

func (*SchemaDiscovery) DiscoverSchemaWithDependencyOverrides

func (sd *SchemaDiscovery) DiscoverSchemaWithDependencyOverrides(
	ctx context.Context,
	registeredTables map[string]bool,
	registeredTableInfo map[string]registeredTableRuntimeInfo,
	dependencyOverrides map[string][]string,
) (*DiscoveredSchema, error)

DiscoverSchemaWithDependencyOverrides runs schema discovery and merges explicit dependencies (ordering constraints) into the discovered dependency graph. Overrides only affect ordering (TableOrder/OrderIdx/Dependencies) and do not add FK validation rules.

type ScopeInitializingError added in v0.1.2

type ScopeInitializingError struct {
	UserID         string
	LeaseExpiresAt time.Time
}

func (*ScopeInitializingError) Error added in v0.1.2

func (e *ScopeInitializingError) Error() string

type ScopeManager added in v0.2.0

type ScopeManager struct {
	// contains filtered or unexported fields
}

func NewScopeManager added in v0.2.0

func NewScopeManager(service *SyncService, cfg ScopeManagerConfig) *ScopeManager

func (*ScopeManager) ExecWrite added in v0.2.0

func (m *ScopeManager) ExecWrite(
	ctx context.Context,
	scopeID string,
	opts ScopeWriteOptions,
	fn func(tx pgx.Tx) error,
) (_ *ScopeWriteResult, err error)

type ScopeManagerConfig added in v0.2.0

type ScopeManagerConfig struct {
	Logger *slog.Logger
}

type ScopeUninitializedError added in v0.1.2

type ScopeUninitializedError struct {
	UserID string
}

func (*ScopeUninitializedError) Error added in v0.1.2

func (e *ScopeUninitializedError) Error() string

type ScopeWriteInvalidError added in v0.2.0

type ScopeWriteInvalidError struct {
	Message string
}

func (*ScopeWriteInvalidError) Error added in v0.2.0

func (e *ScopeWriteInvalidError) Error() string

type ScopeWriteNoCapturedChangesError added in v0.2.0

type ScopeWriteNoCapturedChangesError struct {
	ScopeID  string
	WriterID string
}

func (*ScopeWriteNoCapturedChangesError) Error added in v0.2.0

type ScopeWriteOptions added in v0.2.0

type ScopeWriteOptions struct {
	WriterID string
}

type ScopeWriteResult added in v0.2.0

type ScopeWriteResult struct {
	AutoInitialized bool
	Bundle          *Bundle
}

type ServiceConfig

type ServiceConfig struct {
	MaxSupportedSchemaVersion int               // Current schema version to return
	AppName                   string            // Application name for connection tracking
	RegisteredTables          []RegisteredTable // Schema.table combinations allowed for sync (required)

	MaxRowsPerBundle  int // Maximum number of row effects allowed in one committed bundle (0 = unlimited)
	MaxBytesPerBundle int // Maximum JSON payload size allowed in one committed bundle (0 = unlimited)
	// Push-session chunking limits. Zero uses the runtime defaults.
	DefaultRowsPerPushChunk int
	MaxRowsPerPushChunk     int
	PushSessionTTL          time.Duration
	InitializationLeaseTTL  time.Duration
	// Committed-bundle row fetch limits. Zero uses the runtime defaults.
	DefaultRowsPerCommittedBundleChunk int
	MaxRowsPerCommittedBundleChunk     int
	// Snapshot chunking limits. Zero uses the runtime defaults.
	DefaultRowsPerSnapshotChunk int
	MaxRowsPerSnapshotChunk     int
	SnapshotSessionTTL          time.Duration
	MaxRowsPerSnapshotSession   int64
	MaxBytesPerSnapshotSession  int64
	RetainedBundlesPerUser      int64
	RetentionPruneBatchSize     int64
	// UploadLockTimeout bounds lock waits inside upload transactions.
	// Zero disables SET LOCAL lock_timeout so lock waits are governed only by the request context,
	// which is the reliability-first default.
	UploadLockTimeout time.Duration

	// DependencyOverrides optionally adds explicit ordering constraints on top of discovered
	// DB FKs. Keys and values are "schema.table". Only affects ordering, not FK validation.
	DependencyOverrides map[string][]string

	// StageMetrics optionally records per-stage timings for sync hot paths.
	// The recorder is called synchronously; it must be fast and concurrency-safe.
	StageMetrics StageMetricsRecorder

	// LogStageTimings logs per-stage timings via the service logger at DEBUG.
	// Useful for profiling; keep disabled in production.
	LogStageTimings bool
}

ServiceConfig holds configuration for the sync service

type SnapshotChunkInvalidError

type SnapshotChunkInvalidError struct {
	Message string
}

func (*SnapshotChunkInvalidError) Error

func (e *SnapshotChunkInvalidError) Error() string

type SnapshotChunkResponse

type SnapshotChunkResponse struct {
	SnapshotID        string        `json:"snapshot_id"`
	SnapshotBundleSeq int64         `json:"snapshot_bundle_seq"`
	Rows              []SnapshotRow `json:"rows"`
	NextRowOrdinal    int64         `json:"next_row_ordinal"`
	HasMore           bool          `json:"has_more"`
}

SnapshotChunkResponse returns one chunk of snapshot rows from a frozen session.

type SnapshotRow

type SnapshotRow struct {
	Schema     string          `json:"schema"`
	Table      string          `json:"table"`
	Key        SyncKey         `json:"key"`
	RowVersion int64           `json:"row_version"`
	Payload    json.RawMessage `json:"payload"`
}

SnapshotRow is the current after-image for one non-deleted row in the bundle-based contract.

type SnapshotSession

type SnapshotSession struct {
	SnapshotID        string `json:"snapshot_id"`
	SnapshotBundleSeq int64  `json:"snapshot_bundle_seq"`
	RowCount          int64  `json:"row_count"`
	ByteCount         int64  `json:"byte_count,omitempty"`
	ExpiresAt         string `json:"expires_at"`
}

SnapshotSession describes one frozen server-side snapshot session.

type SnapshotSessionCreateRequest added in v0.2.0

type SnapshotSessionCreateRequest struct {
	SourceReplacement *SnapshotSourceReplacement `json:"source_replacement,omitempty"`
}

type SnapshotSessionExpiredError

type SnapshotSessionExpiredError struct {
	SnapshotID string
}

func (*SnapshotSessionExpiredError) Error

type SnapshotSessionForbiddenError

type SnapshotSessionForbiddenError struct {
	SnapshotID string
}

func (*SnapshotSessionForbiddenError) Error

type SnapshotSessionInvalidError added in v0.2.0

type SnapshotSessionInvalidError struct {
	Message string
}

func (*SnapshotSessionInvalidError) Error added in v0.2.0

type SnapshotSessionLimitExceededError

type SnapshotSessionLimitExceededError struct {
	Dimension string
	Actual    int64
	Limit     int64
}

func (*SnapshotSessionLimitExceededError) Error

type SnapshotSessionNotFoundError

type SnapshotSessionNotFoundError struct {
	SnapshotID string
}

func (*SnapshotSessionNotFoundError) Error

type SnapshotSourceReplacement added in v0.2.0

type SnapshotSourceReplacement struct {
	PreviousSourceID string `json:"previous_source_id"`
	NewSourceID      string `json:"new_source_id"`
	Reason           string `json:"reason"`
}

type SourceReplacementInvalidError added in v0.2.0

type SourceReplacementInvalidError struct {
	Message string
}

func (*SourceReplacementInvalidError) Error added in v0.2.0

type SourceRetiredError added in v0.2.0

type SourceRetiredError struct {
	UserID             string
	SourceID           string
	ReplacedBySourceID string
}

func (*SourceRetiredError) Error added in v0.2.0

func (e *SourceRetiredError) Error() string

type SourceRetiredResponse added in v0.2.0

type SourceRetiredResponse struct {
	Error              string `json:"error"`
	Message            string `json:"message"`
	SourceID           string `json:"source_id"`
	ReplacedBySourceID string `json:"replaced_by_source_id,omitempty"`
}

type SourceSequenceChangedError added in v0.2.0

type SourceSequenceChangedError struct {
	UserID   string
	SourceID string
	Expected int64
	Actual   int64
}

func (*SourceSequenceChangedError) Error added in v0.2.0

type SourceSequenceOutOfOrderError added in v0.2.0

type SourceSequenceOutOfOrderError struct {
	UserID   string
	SourceID string
	Expected int64
	Actual   int64
}

func (*SourceSequenceOutOfOrderError) Error added in v0.2.0

type SourceTupleHistoryPrunedError added in v0.2.0

type SourceTupleHistoryPrunedError struct {
	UserID                         string
	SourceID                       string
	SourceBundleID                 int64
	MaxCommittedSourceBundleIDHint int64
}

func (*SourceTupleHistoryPrunedError) Error added in v0.2.0

type StageMetricsRecorder

type StageMetricsRecorder interface {
	ObserveStage(ctx context.Context, timing StageTiming)
}

type StageMetricsRecorderFunc

type StageMetricsRecorderFunc func(ctx context.Context, timing StageTiming)

func (StageMetricsRecorderFunc) ObserveStage

func (f StageMetricsRecorderFunc) ObserveStage(ctx context.Context, timing StageTiming)

type StageTiming

type StageTiming struct {
	Operation string
	Stage     string
	Duration  time.Duration
	Count     int
	Attempt   int
	Error     bool
}

type StatusResponse

type StatusResponse struct {
	Status                            string          `json:"status"`                                 // healthy or unhealthy
	Version                           string          `json:"version"`                                // API version
	AppName                           string          `json:"app_name"`                               // Application name
	Lifecycle                         string          `json:"lifecycle"`                              // running, shutting_down, closed
	AcceptingOperations               bool            `json:"accepting_operations"`                   // Whether new sync operations are accepted
	InFlightOperations                int             `json:"inflight_operations"`                    // Current in-flight sync operations
	RegisteredTables                  []string        `json:"registered_tables"`                      // Tables registered for sync
	Features                          map[string]bool `json:"features"`                               // Enabled features
	UserStateRetentionFloorAheadCount int64           `json:"user_state_retention_floor_ahead_count"` // user_state rows with retained_bundle_floor > next_bundle_seq - 1
	LatestBundleSeqMax                int64           `json:"latest_bundle_seq_max"`                  // highest committed bundle seq visible across sync.user_state
	RetainedBundleFloorMin            int64           `json:"retained_bundle_floor_min"`              // minimum retained_bundle_floor across sync.user_state
	RetainedBundleFloorMax            int64           `json:"retained_bundle_floor_max"`              // maximum retained_bundle_floor across sync.user_state
	RetainedBundleWindowMin           int64           `json:"retained_bundle_window_min"`             // minimum retained history window (latest bundle seq - retained floor)
	RetainedBundleWindowMax           int64           `json:"retained_bundle_window_max"`             // maximum retained history window (latest bundle seq - retained floor)
	HistoryPrunedErrorCount           int64           `json:"history_pruned_error_count"`             // total history_pruned responses observed by the server
	AcceptedPushReplayCount           int64           `json:"accepted_push_replay_count"`             // total accepted-push replay hits observed by the server
	RejectedRegisteredWriteCount      int64           `json:"rejected_registered_write_count"`        // total writes rejected for missing sync bundle context
	CommittedBundleCount              int64           `json:"committed_bundle_count"`                 // total committed bundle count visible in sync.bundle_log
	CommittedBundleBytes              int64           `json:"committed_bundle_bytes"`                 // total committed bundle bytes visible in sync.bundle_log
}

StatusResponse represents service status response

type SyncKey

type SyncKey map[string]any

SyncKey represents the canonical structured identity for a synced row.

type SyncService

type SyncService struct {
	// contains filtered or unexported fields
}

SyncService provides the core synchronization functionality This is the main SDK component that developers integrate into their applications

func NewRuntimeService

func NewRuntimeService(pool *pgxpool.Pool, config *ServiceConfig, logger *slog.Logger) (*SyncService, error)

NewRuntimeService creates a runtime-only sync service instance from an existing pool. It does not mutate database schema or discover runtime topology.

func (*SyncService) Bootstrap

func (s *SyncService) Bootstrap(ctx context.Context) error

Bootstrap initializes sync metadata and the runtime topology snapshot. Topology is prepared at bootstrap time and is restart-only for now; runtime schema changes are not re-discovered automatically by SyncService.

func (*SyncService) Close

func (s *SyncService) Close(ctx context.Context) error

Close gracefully shuts down the sync service. It rejects new runtime operations, waits for in-flight work to drain, and is safe to call multiple times. Note: This does NOT close the database pool - the caller is responsible for pool lifecycle.

func (*SyncService) CommitPushSession

func (s *SyncService) CommitPushSession(ctx context.Context, actor Actor, pushID string) (_ *PushSessionCommitResponse, err error)

func (*SyncService) Connect added in v0.1.2

func (s *SyncService) Connect(ctx context.Context, actor Actor, req *ConnectRequest) (_ *ConnectResponse, err error)

func (*SyncService) CreatePushSession

func (s *SyncService) CreatePushSession(ctx context.Context, actor Actor, req *PushSessionCreateRequest) (_ *PushSessionCreateResponse, err error)

func (*SyncService) CreateSnapshotSession

func (s *SyncService) CreateSnapshotSession(ctx context.Context, actor Actor) (_ *SnapshotSession, err error)

func (*SyncService) CreateSnapshotSessionWithRequest added in v0.2.0

func (s *SyncService) CreateSnapshotSessionWithRequest(ctx context.Context, actor Actor, req *SnapshotSessionCreateRequest) (_ *SnapshotSession, err error)

func (*SyncService) DeletePushSession

func (s *SyncService) DeletePushSession(ctx context.Context, actor Actor, pushID string) error

func (*SyncService) DeleteSnapshotSession

func (s *SyncService) DeleteSnapshotSession(ctx context.Context, actor Actor, snapshotID string) (err error)

func (*SyncService) GetCapabilities

func (s *SyncService) GetCapabilities() CapabilitiesResponse

GetCapabilities returns the currently supported sync protocol surface.

func (*SyncService) GetCommittedBundleRows

func (s *SyncService) GetCommittedBundleRows(ctx context.Context, actor Actor, bundleSeq int64, afterRowOrdinal *int64, maxRows int) (_ *CommittedBundleRowsResponse, err error)

func (*SyncService) GetSchemaVersion

func (s *SyncService) GetSchemaVersion() int

GetSchemaVersion returns the current schema version

func (*SyncService) GetSnapshotChunk

func (s *SyncService) GetSnapshotChunk(ctx context.Context, actor Actor, snapshotID string, afterRowOrdinal int64, maxRows int) (_ *SnapshotChunkResponse, err error)

func (*SyncService) GetStatus

func (s *SyncService) GetStatus(ctx context.Context) (*StatusResponse, error)

GetStatus returns the current service lifecycle and bundle-era operability snapshot.

func (*SyncService) IsTableRegistered

func (s *SyncService) IsTableRegistered(schemaName, tableName string) bool

IsTableRegistered checks if a schema.table combination is registered for sync operations

func (*SyncService) Pool

func (s *SyncService) Pool() *pgxpool.Pool

Pool returns the underlying database connection pool This allows advanced users to execute custom queries

func (*SyncService) ProcessPull

func (s *SyncService) ProcessPull(
	ctx context.Context,
	actor Actor,
	afterBundleSeq int64,
	maxBundles int,
	targetBundleSeq int64,
) (resp *PullResponse, err error)

func (*SyncService) UploadPushChunk

func (s *SyncService) UploadPushChunk(ctx context.Context, actor Actor, pushID string, req *PushSessionChunkRequest) (_ *PushSessionChunkResponse, err error)

func (*SyncService) WithinSyncBundle

func (s *SyncService) WithinSyncBundle(
	ctx context.Context,
	actor Actor,
	source BundleSource,
	fn func(tx pgx.Tx) error,
) (err error)

type UnsupportedSchemaError

type UnsupportedSchemaError struct {
	Message string
}

UnsupportedSchemaError reports a bootstrap-time schema shape that the current production-ready contract intentionally does not support.

func (*UnsupportedSchemaError) Error

func (e *UnsupportedSchemaError) Error() string

func (*UnsupportedSchemaError) Is

func (e *UnsupportedSchemaError) Is(target error) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL