Documentation
¶
Index ¶
- Constants
- func ActorMiddleware(cfg ActorMiddlewareConfig) func(http.Handler) http.Handler
- func ContextWithActor(ctx context.Context, actor Actor) context.Context
- func ConvertBase64EncodedUUID(value string) (uuid.UUID, error)
- func Key(schema, table string) string
- func OptionallyConvertBase64EncodedUUID(payloadValue string) (any, error)
- type Actor
- type ActorMiddlewareConfig
- type Bundle
- type BundleCapabilitiesLimits
- type BundleRow
- type BundleSource
- type CapabilitiesResponse
- type CommittedBundleChunkInvalidError
- type CommittedBundleNotFoundError
- type CommittedBundleRowsResponse
- type ConnectInvalidError
- type ConnectRequest
- type ConnectResponse
- type DiscoveredSchema
- type ErrorResponse
- type ForeignKeyConstraint
- type HTTPSyncHandlers
- func (h *HTTPSyncHandlers) HandleCapabilities(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleCommitPushSession(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleConnect(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleCreatePushSession(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleCreateSnapshotSession(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleDeletePushSession(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleDeleteSnapshotSession(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleGetCommittedBundleRows(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleGetSnapshotChunk(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleHealth(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandlePull(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandlePushSessionChunk(w http.ResponseWriter, r *http.Request)
- func (h *HTTPSyncHandlers) HandleStatus(w http.ResponseWriter, r *http.Request)
- type HistoryPrunedError
- type InitializationExpiredError
- type InitializationStaleError
- type PayloadExtractor
- func (p *PayloadExtractor) Base64Field(key string) []byte
- func (p *PayloadExtractor) Base64FieldRequired(key string) ([]byte, error)
- func (p *PayloadExtractor) BoolField(key string) *bool
- func (p *PayloadExtractor) BoolFieldRequired(key string) (bool, error)
- func (p *PayloadExtractor) Float64Field(key string) *float64
- func (p *PayloadExtractor) Float64FieldRequired(key string) (float64, error)
- func (p *PayloadExtractor) HasField(key string) bool
- func (p *PayloadExtractor) Int64Field(key string) *int64
- func (p *PayloadExtractor) Int64FieldRequired(key string) (int64, error)
- func (p *PayloadExtractor) StrField(key string) *string
- func (p *PayloadExtractor) StrFieldRequired(key string) (string, error)
- func (p *PayloadExtractor) UUIDField(key string) *uuid.UUID
- func (p *PayloadExtractor) UUIDFieldRequired(key string) (uuid.UUID, error)
- type PullResponse
- type PushChunkInvalidError
- type PushChunkOutOfOrderError
- type PushCommitInvalidError
- type PushConflictDetails
- type PushConflictError
- type PushConflictResponse
- type PushRequestRow
- type PushSessionChunkRequest
- type PushSessionChunkResponse
- type PushSessionCommitResponse
- type PushSessionCreateRequest
- type PushSessionCreateResponse
- type PushSessionExpiredError
- type PushSessionForbiddenError
- type PushSessionInvalidError
- type PushSessionNotFoundError
- type PushValidationError
- type RegisteredTable
- type RegisteredTableSpec
- type SchemaDiscovery
- type ScopeInitializingError
- type ScopeManager
- type ScopeManagerConfig
- type ScopeUninitializedError
- type ScopeWriteInvalidError
- type ScopeWriteNoCapturedChangesError
- type ScopeWriteOptions
- type ScopeWriteResult
- type ServiceConfig
- type SnapshotChunkInvalidError
- type SnapshotChunkResponse
- type SnapshotRow
- type SnapshotSession
- type SnapshotSessionCreateRequest
- type SnapshotSessionExpiredError
- type SnapshotSessionForbiddenError
- type SnapshotSessionInvalidError
- type SnapshotSessionLimitExceededError
- type SnapshotSessionNotFoundError
- type SnapshotSourceReplacement
- type SourceReplacementInvalidError
- type SourceRetiredError
- type SourceRetiredResponse
- type SourceSequenceChangedError
- type SourceSequenceOutOfOrderError
- type SourceTupleHistoryPrunedError
- type StageMetricsRecorder
- type StageMetricsRecorderFunc
- type StageTiming
- type StatusResponse
- type SyncKey
- type SyncService
- func (s *SyncService) Bootstrap(ctx context.Context) error
- func (s *SyncService) Close(ctx context.Context) error
- func (s *SyncService) CommitPushSession(ctx context.Context, actor Actor, pushID string) (_ *PushSessionCommitResponse, err error)
- func (s *SyncService) Connect(ctx context.Context, actor Actor, req *ConnectRequest) (_ *ConnectResponse, err error)
- func (s *SyncService) CreatePushSession(ctx context.Context, actor Actor, req *PushSessionCreateRequest) (_ *PushSessionCreateResponse, err error)
- func (s *SyncService) CreateSnapshotSession(ctx context.Context, actor Actor) (_ *SnapshotSession, err error)
- func (s *SyncService) CreateSnapshotSessionWithRequest(ctx context.Context, actor Actor, req *SnapshotSessionCreateRequest) (_ *SnapshotSession, err error)
- func (s *SyncService) DeletePushSession(ctx context.Context, actor Actor, pushID string) error
- func (s *SyncService) DeleteSnapshotSession(ctx context.Context, actor Actor, snapshotID string) (err error)
- func (s *SyncService) GetCapabilities() CapabilitiesResponse
- func (s *SyncService) GetCommittedBundleRows(ctx context.Context, actor Actor, bundleSeq int64, afterRowOrdinal *int64, ...) (_ *CommittedBundleRowsResponse, err error)
- func (s *SyncService) GetSchemaVersion() int
- func (s *SyncService) GetSnapshotChunk(ctx context.Context, actor Actor, snapshotID string, afterRowOrdinal int64, ...) (_ *SnapshotChunkResponse, err error)
- func (s *SyncService) GetStatus(ctx context.Context) (*StatusResponse, error)
- func (s *SyncService) IsTableRegistered(schemaName, tableName string) bool
- func (s *SyncService) Pool() *pgxpool.Pool
- func (s *SyncService) ProcessPull(ctx context.Context, actor Actor, afterBundleSeq int64, maxBundles int, ...) (resp *PullResponse, err error)
- func (s *SyncService) UploadPushChunk(ctx context.Context, actor Actor, pushID string, req *PushSessionChunkRequest) (_ *PushSessionChunkResponse, err error)
- func (s *SyncService) WithinSyncBundle(ctx context.Context, actor Actor, source BundleSource, ...) (err error)
- type UnsupportedSchemaError
Constants ¶
const ( OpInsert = "INSERT" OpUpdate = "UPDATE" OpDelete = "DELETE" )
Operation constants for change operations
const SourceIDHeader = "Oversync-Source-ID"
const SyncProtocolVersion = "v1"
Variables ¶
This section is empty.
Functions ¶
func ActorMiddleware ¶ added in v0.2.0
func ActorMiddleware(cfg ActorMiddlewareConfig) func(http.Handler) http.Handler
ActorMiddleware injects the runtime actor from trusted auth context plus sync source header.
func ContextWithActor ¶
ContextWithActor stores the runtime actor in context for transport/runtime handoff.
Types ¶
type ActorMiddlewareConfig ¶ added in v0.2.0
ActorMiddlewareConfig configures the standard HTTP actor middleware.
type Bundle ¶
type Bundle struct {
BundleSeq int64 `json:"bundle_seq"`
SourceID string `json:"source_id"`
SourceBundleID int64 `json:"source_bundle_id"`
RowCount int64 `json:"row_count,omitempty"`
BundleHash string `json:"bundle_hash,omitempty"`
Rows []BundleRow `json:"rows"`
}
Bundle represents one committed durable sync unit in the target bundle-based contract.
type BundleCapabilitiesLimits ¶
type BundleCapabilitiesLimits struct {
MaxRowsPerBundle int `json:"max_rows_per_bundle,omitempty"`
MaxBytesPerBundle int `json:"max_bytes_per_bundle,omitempty"`
MaxBundlesPerPull int `json:"max_bundles_per_pull,omitempty"`
DefaultRowsPerPushChunk int `json:"default_rows_per_push_chunk,omitempty"`
MaxRowsPerPushChunk int `json:"max_rows_per_push_chunk,omitempty"`
PushSessionTTLSeconds int `json:"push_session_ttl_seconds,omitempty"`
DefaultRowsPerCommittedBundleChunk int `json:"default_rows_per_committed_bundle_chunk,omitempty"`
MaxRowsPerCommittedBundleChunk int `json:"max_rows_per_committed_bundle_chunk,omitempty"`
DefaultRowsPerSnapshotChunk int `json:"default_rows_per_snapshot_chunk,omitempty"`
MaxRowsPerSnapshotChunk int `json:"max_rows_per_snapshot_chunk,omitempty"`
SnapshotSessionTTLSeconds int `json:"snapshot_session_ttl_seconds,omitempty"`
MaxRowsPerSnapshotSession int64 `json:"max_rows_per_snapshot_session,omitempty"`
MaxBytesPerSnapshotSession int64 `json:"max_bytes_per_snapshot_session,omitempty"`
InitializationLeaseTTLSeconds int `json:"initialization_lease_ttl_seconds,omitempty"`
}
BundleCapabilitiesLimits reports bundle-oriented guardrails in the target contract.
type BundleRow ¶
type BundleRow struct {
Schema string `json:"schema"`
Table string `json:"table"`
Key SyncKey `json:"key"`
Op string `json:"op"`
RowVersion int64 `json:"row_version"`
Payload json.RawMessage `json:"payload,omitempty"`
}
BundleRow describes one normalized row effect inside a committed sync bundle.
type BundleSource ¶
BundleSource identifies one server-side committed bundle source.
type CapabilitiesResponse ¶
type CapabilitiesResponse struct {
ProtocolVersion string `json:"protocol_version"`
SchemaVersion int `json:"schema_version"`
AppName string `json:"app_name,omitempty"`
RegisteredTables []string `json:"registered_tables,omitempty"`
RegisteredTableSpecs []RegisteredTableSpec `json:"registered_table_specs,omitempty"`
Features map[string]bool `json:"features"`
BundleLimits *BundleCapabilitiesLimits `json:"bundle_limits,omitempty"`
}
CapabilitiesResponse describes the currently supported sync protocol surface.
type CommittedBundleChunkInvalidError ¶
type CommittedBundleChunkInvalidError struct {
Message string
}
func (*CommittedBundleChunkInvalidError) Error ¶
func (e *CommittedBundleChunkInvalidError) Error() string
type CommittedBundleNotFoundError ¶
type CommittedBundleNotFoundError struct {
BundleSeq int64
}
func (*CommittedBundleNotFoundError) Error ¶
func (e *CommittedBundleNotFoundError) Error() string
type CommittedBundleRowsResponse ¶
type CommittedBundleRowsResponse struct {
BundleSeq int64 `json:"bundle_seq"`
SourceID string `json:"source_id"`
SourceBundleID int64 `json:"source_bundle_id"`
RowCount int64 `json:"row_count"`
BundleHash string `json:"bundle_hash"`
Rows []BundleRow `json:"rows"`
NextRowOrdinal int64 `json:"next_row_ordinal"`
HasMore bool `json:"has_more"`
}
type ConnectInvalidError ¶ added in v0.1.2
type ConnectInvalidError struct {
Message string
}
func (*ConnectInvalidError) Error ¶ added in v0.1.2
func (e *ConnectInvalidError) Error() string
type ConnectRequest ¶ added in v0.1.2
type ConnectRequest struct {
HasLocalPendingRows bool `json:"has_local_pending_rows"`
}
type ConnectResponse ¶ added in v0.1.2
type DiscoveredSchema ¶
type DiscoveredSchema struct {
TableOrder []string // Ordered list of schema.table (parents first, cycle members grouped deterministically)
OrderIdx map[string]int // schema.table -> order index for O(1) lookups
Dependencies map[string][]string
CycleGroup map[string]int // schema.table -> SCC id (>0 only when part of a multi-table cycle)
Cycles map[int][]string // SCC id -> sorted cycle members
}
DiscoveredSchema contains the discovered table relationships
type ErrorResponse ¶
ErrorResponse represents an error response
type ForeignKeyConstraint ¶
type ForeignKeyConstraint struct {
ConstraintName string // Name of the FK constraint
TableSchema string // Schema of the child table
TableName string // Name of the child table
ColumnName string // Column in child table
Ordinal int // Column ordinal within the FK constraint
RefSchema string // Schema of the referenced table
RefTable string // Referenced parent table
RefColumn string // Referenced column in parent table
MatchOption string // SIMPLE / FULL / PARTIAL
OnDeleteAction string // CASCADE / RESTRICT / SET NULL / SET DEFAULT / NO ACTION
OnUpdateAction string // CASCADE / RESTRICT / SET NULL / SET DEFAULT / NO ACTION
}
ForeignKeyConstraint represents a foreign key relationship
type HTTPSyncHandlers ¶
type HTTPSyncHandlers struct {
// contains filtered or unexported fields
}
HTTPSyncHandlers provides HTTP handlers for the two-way sync API
func NewHTTPSyncHandlers ¶
func NewHTTPSyncHandlers(service *SyncService, logger *slog.Logger) *HTTPSyncHandlers
NewHTTPSyncHandlers creates a new instance of sync handlers
func (*HTTPSyncHandlers) HandleCapabilities ¶
func (h *HTTPSyncHandlers) HandleCapabilities(w http.ResponseWriter, r *http.Request)
HandleCapabilities returns the current sync capabilities surface.
func (*HTTPSyncHandlers) HandleCommitPushSession ¶
func (h *HTTPSyncHandlers) HandleCommitPushSession(w http.ResponseWriter, r *http.Request)
func (*HTTPSyncHandlers) HandleConnect ¶ added in v0.1.2
func (h *HTTPSyncHandlers) HandleConnect(w http.ResponseWriter, r *http.Request)
func (*HTTPSyncHandlers) HandleCreatePushSession ¶
func (h *HTTPSyncHandlers) HandleCreatePushSession(w http.ResponseWriter, r *http.Request)
func (*HTTPSyncHandlers) HandleCreateSnapshotSession ¶
func (h *HTTPSyncHandlers) HandleCreateSnapshotSession(w http.ResponseWriter, r *http.Request)
HandleCreateSnapshotSession creates one frozen snapshot session for chunked hydrate/recover.
func (*HTTPSyncHandlers) HandleDeletePushSession ¶
func (h *HTTPSyncHandlers) HandleDeletePushSession(w http.ResponseWriter, r *http.Request)
func (*HTTPSyncHandlers) HandleDeleteSnapshotSession ¶
func (h *HTTPSyncHandlers) HandleDeleteSnapshotSession(w http.ResponseWriter, r *http.Request)
HandleDeleteSnapshotSession deletes an existing frozen snapshot session.
func (*HTTPSyncHandlers) HandleGetCommittedBundleRows ¶
func (h *HTTPSyncHandlers) HandleGetCommittedBundleRows(w http.ResponseWriter, r *http.Request)
func (*HTTPSyncHandlers) HandleGetSnapshotChunk ¶
func (h *HTTPSyncHandlers) HandleGetSnapshotChunk(w http.ResponseWriter, r *http.Request)
HandleGetSnapshotChunk returns one chunk of rows from a frozen snapshot session.
func (*HTTPSyncHandlers) HandleHealth ¶
func (h *HTTPSyncHandlers) HandleHealth(w http.ResponseWriter, r *http.Request)
HandleHealth returns a readiness-oriented health response derived from the service status snapshot.
func (*HTTPSyncHandlers) HandlePull ¶
func (h *HTTPSyncHandlers) HandlePull(w http.ResponseWriter, r *http.Request)
HandlePull processes bundle pull requests.
func (*HTTPSyncHandlers) HandlePushSessionChunk ¶
func (h *HTTPSyncHandlers) HandlePushSessionChunk(w http.ResponseWriter, r *http.Request)
func (*HTTPSyncHandlers) HandleStatus ¶
func (h *HTTPSyncHandlers) HandleStatus(w http.ResponseWriter, r *http.Request)
HandleStatus returns the current lifecycle and operability status snapshot.
type HistoryPrunedError ¶
func (*HistoryPrunedError) Error ¶
func (e *HistoryPrunedError) Error() string
func (*HistoryPrunedError) Is ¶
func (e *HistoryPrunedError) Is(target error) bool
type InitializationExpiredError ¶ added in v0.1.2
type InitializationExpiredError struct {
Message string
}
func (*InitializationExpiredError) Error ¶ added in v0.1.2
func (e *InitializationExpiredError) Error() string
type InitializationStaleError ¶ added in v0.1.2
type InitializationStaleError struct {
Message string
}
func (*InitializationStaleError) Error ¶ added in v0.1.2
func (e *InitializationStaleError) Error() string
type PayloadExtractor ¶
type PayloadExtractor struct {
// contains filtered or unexported fields
}
PayloadExtractor provides utilities for extracting typed fields from JSON payloads. This is commonly used in ApplyUpsert handlers to parse sync payloads.
func NewPayloadExtractor ¶
func NewPayloadExtractor(payload []byte) (*PayloadExtractor, error)
NewPayloadExtractor creates a new PayloadExtractor from JSON payload bytes.
func NewPayloadExtractorFromMap ¶
func NewPayloadExtractorFromMap(data map[string]any) *PayloadExtractor
NewPayloadExtractorFromMap creates a new PayloadExtractor from an existing map.
func (*PayloadExtractor) Base64Field ¶
func (p *PayloadExtractor) Base64Field(key string) []byte
Base64Field extracts a nullable base64-decoded byte slice from the payload. Returns nil if the field is missing, null, not a string, or cannot be decoded.
func (*PayloadExtractor) Base64FieldRequired ¶
func (p *PayloadExtractor) Base64FieldRequired(key string) ([]byte, error)
Base64FieldRequired extracts a required base64-decoded byte slice from the payload. Returns an error if the field is missing, null, not a string, or cannot be decoded.
func (*PayloadExtractor) BoolField ¶
func (p *PayloadExtractor) BoolField(key string) *bool
BoolField extracts a nullable bool from the payload. Accepts bool values, numeric values (0=false, non-zero=true), and string values ("true"/"false", "1"/"0"). Returns nil if the field is missing, null, or cannot be converted.
func (*PayloadExtractor) BoolFieldRequired ¶
func (p *PayloadExtractor) BoolFieldRequired(key string) (bool, error)
BoolFieldRequired extracts a required bool from the payload. Returns an error if the field is missing, null, or cannot be converted.
func (*PayloadExtractor) Float64Field ¶
func (p *PayloadExtractor) Float64Field(key string) *float64
Float64Field extracts a nullable float64 from the payload. Accepts both numeric values and numeric strings. Returns nil if the field is missing, null, or cannot be converted.
func (*PayloadExtractor) Float64FieldRequired ¶
func (p *PayloadExtractor) Float64FieldRequired(key string) (float64, error)
Float64FieldRequired extracts a required float64 from the payload. Returns an error if the field is missing, null, or cannot be converted.
func (*PayloadExtractor) HasField ¶
func (p *PayloadExtractor) HasField(key string) bool
HasField checks if a field exists in the payload (even if it's null).
func (*PayloadExtractor) Int64Field ¶
func (p *PayloadExtractor) Int64Field(key string) *int64
Int64Field extracts a nullable int64 from the payload. Accepts both numeric values and numeric strings. Returns nil if the field is missing, null, or cannot be converted.
func (*PayloadExtractor) Int64FieldRequired ¶
func (p *PayloadExtractor) Int64FieldRequired(key string) (int64, error)
Int64FieldRequired extracts a required int64 from the payload. Returns an error if the field is missing, null, or cannot be converted.
func (*PayloadExtractor) StrField ¶
func (p *PayloadExtractor) StrField(key string) *string
StrField extracts a nullable string from the payload. Returns nil if the field is missing, null, or not a string.
func (*PayloadExtractor) StrFieldRequired ¶
func (p *PayloadExtractor) StrFieldRequired(key string) (string, error)
StrFieldRequired extracts a required string from the payload. Returns an error if the field is missing, null, or not a string.
func (*PayloadExtractor) UUIDField ¶
func (p *PayloadExtractor) UUIDField(key string) *uuid.UUID
UUIDField extracts a nullable UUID from the payload. Accepts string values that can be parsed as UUIDs. Returns nil if the field is missing, null, or cannot be parsed as UUID.
func (*PayloadExtractor) UUIDFieldRequired ¶
func (p *PayloadExtractor) UUIDFieldRequired(key string) (uuid.UUID, error)
UUIDFieldRequired extracts a required UUID from the payload. Returns an error if the field is missing, null, or cannot be parsed as UUID.
type PullResponse ¶
type PullResponse struct {
StableBundleSeq int64 `json:"stable_bundle_seq"`
Bundles []Bundle `json:"bundles"`
HasMore bool `json:"has_more"`
}
PullResponse returns one or more complete committed bundles.
type PushChunkInvalidError ¶
type PushChunkInvalidError struct {
Message string
}
func (*PushChunkInvalidError) Error ¶
func (e *PushChunkInvalidError) Error() string
type PushChunkOutOfOrderError ¶
func (*PushChunkOutOfOrderError) Error ¶
func (e *PushChunkOutOfOrderError) Error() string
type PushCommitInvalidError ¶
type PushCommitInvalidError struct {
Message string
}
func (*PushCommitInvalidError) Error ¶
func (e *PushCommitInvalidError) Error() string
type PushConflictDetails ¶
type PushConflictDetails struct {
Schema string `json:"schema"`
Table string `json:"table"`
Key SyncKey `json:"key"`
Op string `json:"op"`
BaseRowVersion int64 `json:"base_row_version"`
ServerRowVersion int64 `json:"server_row_version"`
ServerRowDeleted bool `json:"server_row_deleted"`
ServerRow json.RawMessage `json:"server_row"`
}
PushConflictDetails describes one authoritative row state that rejected a push commit. The payload is deterministic:
- schema/table are normalized lowercase identifiers
- key uses the same canonical sync-key JSON shape as the rest of the API
- server_row_version is 0 when no authoritative row_state exists yet
- server_row_deleted distinguishes tombstones from an absent row when server_row is null
- server_row is the canonical wire-format full row after-image, or null if deleted/missing
type PushConflictError ¶
type PushConflictError struct {
Message string
Conflict *PushConflictDetails
}
func (*PushConflictError) Error ¶
func (e *PushConflictError) Error() string
type PushConflictResponse ¶
type PushConflictResponse struct {
Error string `json:"error"`
Message string `json:"message"`
Conflict *PushConflictDetails `json:"conflict,omitempty"`
}
PushConflictResponse preserves the legacy error/message envelope while adding machine-readable conflict details.
type PushRequestRow ¶
type PushRequestRow struct {
Schema string `json:"schema"`
Table string `json:"table"`
Key SyncKey `json:"key"`
Op string `json:"op"`
BaseRowVersion int64 `json:"base_row_version"`
Payload json.RawMessage `json:"payload,omitempty"`
}
PushRequestRow is one locally dirty row intent sent by the client.
type PushSessionChunkRequest ¶
type PushSessionChunkRequest struct {
StartRowOrdinal int64 `json:"start_row_ordinal"`
Rows []PushRequestRow `json:"rows"`
}
type PushSessionCreateResponse ¶
type PushSessionCreateResponse struct {
PushID string `json:"push_id,omitempty"`
Status string `json:"status"`
PlannedRowCount int64 `json:"planned_row_count,omitempty"`
NextExpectedRowOrdinal int64 `json:"next_expected_row_ordinal,omitempty"`
BundleSeq int64 `json:"bundle_seq,omitempty"`
SourceID string `json:"source_id,omitempty"`
SourceBundleID int64 `json:"source_bundle_id,omitempty"`
RowCount int64 `json:"row_count,omitempty"`
BundleHash string `json:"bundle_hash,omitempty"`
}
type PushSessionExpiredError ¶
type PushSessionExpiredError struct {
PushID string
}
func (*PushSessionExpiredError) Error ¶
func (e *PushSessionExpiredError) Error() string
type PushSessionForbiddenError ¶
type PushSessionForbiddenError struct {
PushID string
}
func (*PushSessionForbiddenError) Error ¶
func (e *PushSessionForbiddenError) Error() string
type PushSessionInvalidError ¶
type PushSessionInvalidError struct {
Message string
}
func (*PushSessionInvalidError) Error ¶
func (e *PushSessionInvalidError) Error() string
type PushSessionNotFoundError ¶
type PushSessionNotFoundError struct {
PushID string
}
func (*PushSessionNotFoundError) Error ¶
func (e *PushSessionNotFoundError) Error() string
type PushValidationError ¶
type PushValidationError struct {
Message string
}
func (*PushValidationError) Error ¶
func (e *PushValidationError) Error() string
type RegisteredTable ¶
type RegisteredTable struct {
Schema string `json:"schema"` // Schema name (e.g., "public", "crm", "business")
Table string `json:"table"` // Table name (e.g., "users", "posts")
SyncKeyColumns []string `json:"sync_key_columns,omitempty"` // Ordered sync key columns for the target bundle-based protocol
}
RegisteredTable represents a table that is registered for sync operations
type RegisteredTableSpec ¶
type RegisteredTableSpec struct {
Schema string `json:"schema"`
Table string `json:"table"`
SyncKeyColumns []string `json:"sync_key_columns,omitempty"`
}
RegisteredTableSpec describes one registered sync table in the newer contract surface.
type SchemaDiscovery ¶
type SchemaDiscovery struct {
// contains filtered or unexported fields
}
SchemaDiscovery handles automatic discovery of table relationships and ordering
func NewSchemaDiscovery ¶
func NewSchemaDiscovery(pool *pgxpool.Pool, logger *slog.Logger) *SchemaDiscovery
NewSchemaDiscovery creates a new schema discovery instance
func (*SchemaDiscovery) DiscoverSchemaWithDependencyOverrides ¶
func (sd *SchemaDiscovery) DiscoverSchemaWithDependencyOverrides( ctx context.Context, registeredTables map[string]bool, registeredTableInfo map[string]registeredTableRuntimeInfo, dependencyOverrides map[string][]string, ) (*DiscoveredSchema, error)
DiscoverSchemaWithDependencyOverrides runs schema discovery and merges explicit dependencies (ordering constraints) into the discovered dependency graph. Overrides only affect ordering (TableOrder/OrderIdx/Dependencies) and do not add FK validation rules.
type ScopeInitializingError ¶ added in v0.1.2
func (*ScopeInitializingError) Error ¶ added in v0.1.2
func (e *ScopeInitializingError) Error() string
type ScopeManager ¶ added in v0.2.0
type ScopeManager struct {
// contains filtered or unexported fields
}
func NewScopeManager ¶ added in v0.2.0
func NewScopeManager(service *SyncService, cfg ScopeManagerConfig) *ScopeManager
func (*ScopeManager) ExecWrite ¶ added in v0.2.0
func (m *ScopeManager) ExecWrite( ctx context.Context, scopeID string, opts ScopeWriteOptions, fn func(tx pgx.Tx) error, ) (_ *ScopeWriteResult, err error)
type ScopeManagerConfig ¶ added in v0.2.0
type ScopeUninitializedError ¶ added in v0.1.2
type ScopeUninitializedError struct {
UserID string
}
func (*ScopeUninitializedError) Error ¶ added in v0.1.2
func (e *ScopeUninitializedError) Error() string
type ScopeWriteInvalidError ¶ added in v0.2.0
type ScopeWriteInvalidError struct {
Message string
}
func (*ScopeWriteInvalidError) Error ¶ added in v0.2.0
func (e *ScopeWriteInvalidError) Error() string
type ScopeWriteNoCapturedChangesError ¶ added in v0.2.0
func (*ScopeWriteNoCapturedChangesError) Error ¶ added in v0.2.0
func (e *ScopeWriteNoCapturedChangesError) Error() string
type ScopeWriteOptions ¶ added in v0.2.0
type ScopeWriteOptions struct {
WriterID string
}
type ScopeWriteResult ¶ added in v0.2.0
type ServiceConfig ¶
type ServiceConfig struct {
MaxSupportedSchemaVersion int // Current schema version to return
AppName string // Application name for connection tracking
RegisteredTables []RegisteredTable // Schema.table combinations allowed for sync (required)
MaxRowsPerBundle int // Maximum number of row effects allowed in one committed bundle (0 = unlimited)
MaxBytesPerBundle int // Maximum JSON payload size allowed in one committed bundle (0 = unlimited)
// Push-session chunking limits. Zero uses the runtime defaults.
DefaultRowsPerPushChunk int
MaxRowsPerPushChunk int
PushSessionTTL time.Duration
InitializationLeaseTTL time.Duration
// Committed-bundle row fetch limits. Zero uses the runtime defaults.
DefaultRowsPerCommittedBundleChunk int
MaxRowsPerCommittedBundleChunk int
// Snapshot chunking limits. Zero uses the runtime defaults.
DefaultRowsPerSnapshotChunk int
MaxRowsPerSnapshotChunk int
SnapshotSessionTTL time.Duration
MaxRowsPerSnapshotSession int64
MaxBytesPerSnapshotSession int64
RetainedBundlesPerUser int64
RetentionPruneBatchSize int64
// UploadLockTimeout bounds lock waits inside upload transactions.
// Zero disables SET LOCAL lock_timeout so lock waits are governed only by the request context,
// which is the reliability-first default.
UploadLockTimeout time.Duration
// DependencyOverrides optionally adds explicit ordering constraints on top of discovered
// DB FKs. Keys and values are "schema.table". Only affects ordering, not FK validation.
DependencyOverrides map[string][]string
// StageMetrics optionally records per-stage timings for sync hot paths.
// The recorder is called synchronously; it must be fast and concurrency-safe.
StageMetrics StageMetricsRecorder
// LogStageTimings logs per-stage timings via the service logger at DEBUG.
// Useful for profiling; keep disabled in production.
LogStageTimings bool
}
ServiceConfig holds configuration for the sync service
type SnapshotChunkInvalidError ¶
type SnapshotChunkInvalidError struct {
Message string
}
func (*SnapshotChunkInvalidError) Error ¶
func (e *SnapshotChunkInvalidError) Error() string
type SnapshotChunkResponse ¶
type SnapshotChunkResponse struct {
SnapshotID string `json:"snapshot_id"`
SnapshotBundleSeq int64 `json:"snapshot_bundle_seq"`
Rows []SnapshotRow `json:"rows"`
NextRowOrdinal int64 `json:"next_row_ordinal"`
HasMore bool `json:"has_more"`
}
SnapshotChunkResponse returns one chunk of snapshot rows from a frozen session.
type SnapshotRow ¶
type SnapshotRow struct {
Schema string `json:"schema"`
Table string `json:"table"`
Key SyncKey `json:"key"`
RowVersion int64 `json:"row_version"`
Payload json.RawMessage `json:"payload"`
}
SnapshotRow is the current after-image for one non-deleted row in the bundle-based contract.
type SnapshotSession ¶
type SnapshotSession struct {
SnapshotID string `json:"snapshot_id"`
SnapshotBundleSeq int64 `json:"snapshot_bundle_seq"`
RowCount int64 `json:"row_count"`
ByteCount int64 `json:"byte_count,omitempty"`
ExpiresAt string `json:"expires_at"`
}
SnapshotSession describes one frozen server-side snapshot session.
type SnapshotSessionCreateRequest ¶ added in v0.2.0
type SnapshotSessionCreateRequest struct {
SourceReplacement *SnapshotSourceReplacement `json:"source_replacement,omitempty"`
}
type SnapshotSessionExpiredError ¶
type SnapshotSessionExpiredError struct {
SnapshotID string
}
func (*SnapshotSessionExpiredError) Error ¶
func (e *SnapshotSessionExpiredError) Error() string
type SnapshotSessionForbiddenError ¶
type SnapshotSessionForbiddenError struct {
SnapshotID string
}
func (*SnapshotSessionForbiddenError) Error ¶
func (e *SnapshotSessionForbiddenError) Error() string
type SnapshotSessionInvalidError ¶ added in v0.2.0
type SnapshotSessionInvalidError struct {
Message string
}
func (*SnapshotSessionInvalidError) Error ¶ added in v0.2.0
func (e *SnapshotSessionInvalidError) Error() string
type SnapshotSessionLimitExceededError ¶
func (*SnapshotSessionLimitExceededError) Error ¶
func (e *SnapshotSessionLimitExceededError) Error() string
type SnapshotSessionNotFoundError ¶
type SnapshotSessionNotFoundError struct {
SnapshotID string
}
func (*SnapshotSessionNotFoundError) Error ¶
func (e *SnapshotSessionNotFoundError) Error() string
type SnapshotSourceReplacement ¶ added in v0.2.0
type SourceReplacementInvalidError ¶ added in v0.2.0
type SourceReplacementInvalidError struct {
Message string
}
func (*SourceReplacementInvalidError) Error ¶ added in v0.2.0
func (e *SourceReplacementInvalidError) Error() string
type SourceRetiredError ¶ added in v0.2.0
func (*SourceRetiredError) Error ¶ added in v0.2.0
func (e *SourceRetiredError) Error() string
type SourceRetiredResponse ¶ added in v0.2.0
type SourceSequenceChangedError ¶ added in v0.2.0
type SourceSequenceChangedError struct {
UserID string
SourceID string
Expected int64
Actual int64
}
func (*SourceSequenceChangedError) Error ¶ added in v0.2.0
func (e *SourceSequenceChangedError) Error() string
type SourceSequenceOutOfOrderError ¶ added in v0.2.0
type SourceSequenceOutOfOrderError struct {
UserID string
SourceID string
Expected int64
Actual int64
}
func (*SourceSequenceOutOfOrderError) Error ¶ added in v0.2.0
func (e *SourceSequenceOutOfOrderError) Error() string
type SourceTupleHistoryPrunedError ¶ added in v0.2.0
type SourceTupleHistoryPrunedError struct {
UserID string
SourceID string
SourceBundleID int64
MaxCommittedSourceBundleIDHint int64
}
func (*SourceTupleHistoryPrunedError) Error ¶ added in v0.2.0
func (e *SourceTupleHistoryPrunedError) Error() string
type StageMetricsRecorder ¶
type StageMetricsRecorder interface {
ObserveStage(ctx context.Context, timing StageTiming)
}
type StageMetricsRecorderFunc ¶
type StageMetricsRecorderFunc func(ctx context.Context, timing StageTiming)
func (StageMetricsRecorderFunc) ObserveStage ¶
func (f StageMetricsRecorderFunc) ObserveStage(ctx context.Context, timing StageTiming)
type StageTiming ¶
type StatusResponse ¶
type StatusResponse struct {
Status string `json:"status"` // healthy or unhealthy
Version string `json:"version"` // API version
AppName string `json:"app_name"` // Application name
Lifecycle string `json:"lifecycle"` // running, shutting_down, closed
AcceptingOperations bool `json:"accepting_operations"` // Whether new sync operations are accepted
InFlightOperations int `json:"inflight_operations"` // Current in-flight sync operations
RegisteredTables []string `json:"registered_tables"` // Tables registered for sync
Features map[string]bool `json:"features"` // Enabled features
UserStateRetentionFloorAheadCount int64 `json:"user_state_retention_floor_ahead_count"` // user_state rows with retained_bundle_floor > next_bundle_seq - 1
LatestBundleSeqMax int64 `json:"latest_bundle_seq_max"` // highest committed bundle seq visible across sync.user_state
RetainedBundleFloorMin int64 `json:"retained_bundle_floor_min"` // minimum retained_bundle_floor across sync.user_state
RetainedBundleFloorMax int64 `json:"retained_bundle_floor_max"` // maximum retained_bundle_floor across sync.user_state
RetainedBundleWindowMin int64 `json:"retained_bundle_window_min"` // minimum retained history window (latest bundle seq - retained floor)
RetainedBundleWindowMax int64 `json:"retained_bundle_window_max"` // maximum retained history window (latest bundle seq - retained floor)
HistoryPrunedErrorCount int64 `json:"history_pruned_error_count"` // total history_pruned responses observed by the server
AcceptedPushReplayCount int64 `json:"accepted_push_replay_count"` // total accepted-push replay hits observed by the server
RejectedRegisteredWriteCount int64 `json:"rejected_registered_write_count"` // total writes rejected for missing sync bundle context
CommittedBundleCount int64 `json:"committed_bundle_count"` // total committed bundle count visible in sync.bundle_log
CommittedBundleBytes int64 `json:"committed_bundle_bytes"` // total committed bundle bytes visible in sync.bundle_log
}
StatusResponse represents service status response
type SyncService ¶
type SyncService struct {
// contains filtered or unexported fields
}
SyncService provides the core synchronization functionality This is the main SDK component that developers integrate into their applications
func NewRuntimeService ¶
func NewRuntimeService(pool *pgxpool.Pool, config *ServiceConfig, logger *slog.Logger) (*SyncService, error)
NewRuntimeService creates a runtime-only sync service instance from an existing pool. It does not mutate database schema or discover runtime topology.
func (*SyncService) Bootstrap ¶
func (s *SyncService) Bootstrap(ctx context.Context) error
Bootstrap initializes sync metadata and the runtime topology snapshot. Topology is prepared at bootstrap time and is restart-only for now; runtime schema changes are not re-discovered automatically by SyncService.
func (*SyncService) Close ¶
func (s *SyncService) Close(ctx context.Context) error
Close gracefully shuts down the sync service. It rejects new runtime operations, waits for in-flight work to drain, and is safe to call multiple times. Note: This does NOT close the database pool - the caller is responsible for pool lifecycle.
func (*SyncService) CommitPushSession ¶
func (s *SyncService) CommitPushSession(ctx context.Context, actor Actor, pushID string) (_ *PushSessionCommitResponse, err error)
func (*SyncService) Connect ¶ added in v0.1.2
func (s *SyncService) Connect(ctx context.Context, actor Actor, req *ConnectRequest) (_ *ConnectResponse, err error)
func (*SyncService) CreatePushSession ¶
func (s *SyncService) CreatePushSession(ctx context.Context, actor Actor, req *PushSessionCreateRequest) (_ *PushSessionCreateResponse, err error)
func (*SyncService) CreateSnapshotSession ¶
func (s *SyncService) CreateSnapshotSession(ctx context.Context, actor Actor) (_ *SnapshotSession, err error)
func (*SyncService) CreateSnapshotSessionWithRequest ¶ added in v0.2.0
func (s *SyncService) CreateSnapshotSessionWithRequest(ctx context.Context, actor Actor, req *SnapshotSessionCreateRequest) (_ *SnapshotSession, err error)
func (*SyncService) DeletePushSession ¶
func (*SyncService) DeleteSnapshotSession ¶
func (*SyncService) GetCapabilities ¶
func (s *SyncService) GetCapabilities() CapabilitiesResponse
GetCapabilities returns the currently supported sync protocol surface.
func (*SyncService) GetCommittedBundleRows ¶
func (s *SyncService) GetCommittedBundleRows(ctx context.Context, actor Actor, bundleSeq int64, afterRowOrdinal *int64, maxRows int) (_ *CommittedBundleRowsResponse, err error)
func (*SyncService) GetSchemaVersion ¶
func (s *SyncService) GetSchemaVersion() int
GetSchemaVersion returns the current schema version
func (*SyncService) GetSnapshotChunk ¶
func (s *SyncService) GetSnapshotChunk(ctx context.Context, actor Actor, snapshotID string, afterRowOrdinal int64, maxRows int) (_ *SnapshotChunkResponse, err error)
func (*SyncService) GetStatus ¶
func (s *SyncService) GetStatus(ctx context.Context) (*StatusResponse, error)
GetStatus returns the current service lifecycle and bundle-era operability snapshot.
func (*SyncService) IsTableRegistered ¶
func (s *SyncService) IsTableRegistered(schemaName, tableName string) bool
IsTableRegistered checks if a schema.table combination is registered for sync operations
func (*SyncService) Pool ¶
func (s *SyncService) Pool() *pgxpool.Pool
Pool returns the underlying database connection pool This allows advanced users to execute custom queries
func (*SyncService) ProcessPull ¶
func (s *SyncService) ProcessPull( ctx context.Context, actor Actor, afterBundleSeq int64, maxBundles int, targetBundleSeq int64, ) (resp *PullResponse, err error)
func (*SyncService) UploadPushChunk ¶
func (s *SyncService) UploadPushChunk(ctx context.Context, actor Actor, pushID string, req *PushSessionChunkRequest) (_ *PushSessionChunkResponse, err error)
func (*SyncService) WithinSyncBundle ¶
func (s *SyncService) WithinSyncBundle( ctx context.Context, actor Actor, source BundleSource, fn func(tx pgx.Tx) error, ) (err error)
type UnsupportedSchemaError ¶
type UnsupportedSchemaError struct {
Message string
}
UnsupportedSchemaError reports a bootstrap-time schema shape that the current production-ready contract intentionally does not support.
func (*UnsupportedSchemaError) Error ¶
func (e *UnsupportedSchemaError) Error() string
func (*UnsupportedSchemaError) Is ¶
func (e *UnsupportedSchemaError) Is(target error) bool
Source Files
¶
- actor.go
- actor_middleware.go
- binary_wire.go
- bundle_capture.go
- connect.go
- constants.go
- http_handlers.go
- metrics.go
- opcodes.go
- payload_helpers.go
- pruning.go
- pull.go
- push.go
- push_sessions.go
- rest_models.go
- retention.go
- retry.go
- schema_discovery.go
- schema_init.go
- scope_manager.go
- scope_state.go
- service.go
- snapshot_sessions.go
- source_state.go
- sync_keys.go
- unsupported_schema.go
- upload_support.go
- validation.go