dbproxy

package
v0.407.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package dbproxy provides a db.Querier implementation that transparently routes write operations while keeping reads fast.

Write strategy (secondary instances):

  1. Attempt the write directly against the local RW SQLite connection (WAL mode allows concurrent writes when there is no lock contention).
  2. If SQLite returns BUSY or LOCKED (another writer holds the lock) the operation is forwarded to the primary instance via ZMQ JSON-RPC, which serialises writes as the single authoritative writer.
  3. Only if the proxy call also fails is an error returned to the caller.

Primary instances use the embedded Querier directly; no proxying occurs.

Index

Constants

View Source
const MethodDBWrite = "db.write"

MethodDBWrite is the JSON-RPC method name for proxied write operations.

Variables

View Source
var DefaultWriteTimeouts = WriteTimeout{
	Default: 5 * time.Second,
	Long:    30 * time.Second,
}

DefaultWriteTimeouts is used when no explicit timeout is provided.

Functions

func DispatchWrite added in v0.306.0

func DispatchWrite(ctx context.Context, q db.Querier, req WriteRequest) (json.RawMessage, error)

DispatchWrite is the exported entry point for dispatching a write request. The writecoordinator calls this from its serialisation loop.

func ProxyWriteWithResult added in v0.326.0

func ProxyWriteWithResult[R any](ctx context.Context, p *DBProxy, method string, params any) (R, error)

ProxyWriteWithResult forwards a write method and decodes a typed result.

func RegisterHandlers

func RegisterHandlers(bus BusRegistrar, q db.Querier)

RegisterHandlers registers the db.write JSON-RPC handler on the given bus. Only the primary instance should call this.

func RegisterHandlersWithCoordinator added in v0.306.0

func RegisterHandlersWithCoordinator(bus BusRegistrar, s WriteSubmitter)

RegisterHandlersWithCoordinator registers the db.write handler using a WriteSubmitter for serialisation. Both RegisterHandlers and this function can coexist; the coordinator path is opt-in and replaces the direct-dispatch path on the primary.

func RegisterRemembrancesDispatcher added in v0.326.0

func RegisterRemembrancesDispatcher(d RemembrancesDispatcher)

RegisterRemembrancesDispatcher registers the RAG write dispatcher on the primary. Must be called before any secondary instance attempts to forward remembrances writes. Calling it more than once replaces the previous registration.

Types

type BusRegistrar added in v0.306.0

type BusRegistrar interface {
	RegisterMethod(method string, handler ipc.HandlerFunc)
}

BusRegistrar is the minimal interface needed to register dbproxy RPC methods.

type DBProxy

type DBProxy struct {
	db.Querier // local reads and direct write attempts — embedded interface
	// contains filtered or unexported fields
}

DBProxy implements db.Querier. Reads are served from the embedded local querier. Writes first attempt the local RW connection (WAL mode); on SQLITE_BUSY/LOCKED they are forwarded via ZMQ JSON-RPC to the primary.

When client is nil the proxy behaves identically to the embedded querier (useful for the primary instance itself, which never proxies).

func New

func New(local db.Querier, client *ipc.Client, rpcAddr string) *DBProxy

New creates a DBProxy backed by local for reads and direct write attempts. Pass a non-nil client and the primary's rpcAddr to enable write proxying. Pass client=nil for primary instances (writes go directly to the local querier).

func NewWithInstanceID added in v0.306.0

func NewWithInstanceID(local db.Querier, client *ipc.Client, rpcAddr, instanceID string) *DBProxy

NewWithInstanceID is like New but records the caller's instance ID in every WriteMeta so the primary can attribute writes to the originating secondary.

func (*DBProxy) CreateFile

func (p *DBProxy) CreateFile(ctx context.Context, arg db.CreateFileParams) (db.File, error)

func (*DBProxy) CreateMessage

func (p *DBProxy) CreateMessage(ctx context.Context, arg db.CreateMessageParams) (db.Message, error)

func (*DBProxy) CreateProject

func (p *DBProxy) CreateProject(ctx context.Context, arg db.CreateProjectParams) (db.Project, error)

func (*DBProxy) CreateSession

func (p *DBProxy) CreateSession(ctx context.Context, arg db.CreateSessionParams) (db.Session, error)

func (*DBProxy) DeactivateLowestSkill

func (p *DBProxy) DeactivateLowestSkill(ctx context.Context) error

func (*DBProxy) DeleteFile

func (p *DBProxy) DeleteFile(ctx context.Context, id string) error

func (*DBProxy) DeleteMessage

func (p *DBProxy) DeleteMessage(ctx context.Context, id string) error

func (*DBProxy) DeleteProject

func (p *DBProxy) DeleteProject(ctx context.Context, id string) error

func (*DBProxy) DeleteSession

func (p *DBProxy) DeleteSession(ctx context.Context, id string) error

func (*DBProxy) DeleteSessionFiles

func (p *DBProxy) DeleteSessionFiles(ctx context.Context, sessionID string) error

func (*DBProxy) DeleteSessionMessages

func (p *DBProxy) DeleteSessionMessages(ctx context.Context, sessionID string) error

func (*DBProxy) IncrementSkillUsage

func (p *DBProxy) IncrementSkillUsage(ctx context.Context, id string) error

func (*DBProxy) InsertPromptTemplate

func (p *DBProxy) InsertPromptTemplate(ctx context.Context, arg db.InsertPromptTemplateParams) (db.PromptTemplate, error)

func (*DBProxy) InsertSessionScore

func (p *DBProxy) InsertSessionScore(ctx context.Context, arg db.InsertSessionScoreParams) (db.SessionScore, error)

func (*DBProxy) InsertSkill

func (p *DBProxy) InsertSkill(ctx context.Context, arg db.InsertSkillParams) (db.SkillLibrary, error)

func (*DBProxy) MarkProjectInitialized

func (p *DBProxy) MarkProjectInitialized(ctx context.Context, id string) error

func (*DBProxy) ProbePrimary added in v0.326.0

func (p *DBProxy) ProbePrimary(ctx context.Context) error

ProbePrimary sends an instance.ping JSON-RPC call to the primary with a 2-second timeout and returns nil on success. Returns an error if the primary is unreachable, the call times out, or the proxy is not configured. Always returns nil when this instance is the primary (no proxy needed).

func (*DBProxy) UpdateFile

func (p *DBProxy) UpdateFile(ctx context.Context, arg db.UpdateFileParams) (db.File, error)

func (*DBProxy) UpdateMessage

func (p *DBProxy) UpdateMessage(ctx context.Context, arg db.UpdateMessageParams) error

func (*DBProxy) UpdateProjectLastOpened

func (p *DBProxy) UpdateProjectLastOpened(ctx context.Context, arg db.UpdateProjectLastOpenedParams) error

func (*DBProxy) UpdateProjectStatus

func (p *DBProxy) UpdateProjectStatus(ctx context.Context, arg db.UpdateProjectStatusParams) error

func (*DBProxy) UpdateSession

func (p *DBProxy) UpdateSession(ctx context.Context, arg db.UpdateSessionParams) (db.Session, error)

func (*DBProxy) WriteWithRetry added in v0.326.0

func (p *DBProxy) WriteWithRetry(ctx context.Context, method string, params any, timeout time.Duration) error

WriteWithRetry forwards a void write with the provided timeout and retry logic.

type RemembrancesDispatcher added in v0.326.0

type RemembrancesDispatcher interface {
	DispatchRemembrancesWrite(ctx context.Context, method string, params json.RawMessage) (json.RawMessage, error)
}

RemembrancesDispatcher defines the contract for dispatching RAG (KB, Events, Code) write operations on the primary instance. It decouples the dbproxy infrastructure package from the rag business-logic packages, avoiding circular imports.

The primary instance registers an implementation via RegisterRemembrancesDispatcher. When dispatchWrite receives a method name not handled by db.Querier, it delegates to this dispatcher (if registered) so that KB, Events and Code indexing writes forwarded from secondary instances are correctly persisted.

type WriteError added in v0.306.0

type WriteError struct {
	Code    WriteErrorCode `json:"code"`
	Message string         `json:"message"`
	Method  string         `json:"method"`
}

WriteError is a structured error returned by the write channel.

func (*WriteError) Error added in v0.306.0

func (e *WriteError) Error() string

func (*WriteError) IsRetryable added in v0.306.0

func (e *WriteError) IsRetryable() bool

IsRetryable reports whether the error is transient and the operation may succeed on retry.

type WriteErrorCode added in v0.306.0

type WriteErrorCode string

WriteErrorCode identifies the category of a write-channel failure.

const (
	ErrCodeTimeout        WriteErrorCode = "TIMEOUT"
	ErrCodeUnreachable    WriteErrorCode = "UNREACHABLE"
	ErrCodeMethodNotFound WriteErrorCode = "METHOD_NOT_FOUND"
	ErrCodeInvalidParams  WriteErrorCode = "INVALID_PARAMS"
	ErrCodeConflict       WriteErrorCode = "CONFLICT"
	ErrCodeInternal       WriteErrorCode = "INTERNAL"
)

type WriteMeta added in v0.306.0

type WriteMeta struct {
	SourceInstanceID string `json:"source_instance_id"`
	RequestID        string `json:"request_id"`
	Timestamp        string `json:"timestamp"` // RFC3339
}

WriteMeta carries tracing metadata attached to every proxied write request. The primary logs this on each write, making write provenance easy to trace.

type WriteRequest

type WriteRequest struct {
	Meta   WriteMeta       `json:"meta"`
	Method string          `json:"method"`
	Params json.RawMessage `json:"params"`
}

WriteRequest is the JSON-RPC params struct for a proxied write.

type WriteSubmitter added in v0.306.0

type WriteSubmitter interface {
	Submit(ctx context.Context, req WriteRequest) (json.RawMessage, error)
}

WriteSubmitter serialises and executes a write request, returning the JSON result. Implemented by writecoordinator.Coordinator to avoid circular imports.

type WriteTimeout added in v0.306.0

type WriteTimeout struct {
	Default time.Duration
	Long    time.Duration
}

WriteTimeout groups deadline durations for different write categories.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL