Documentation
¶
Overview ¶
Package dbproxy provides a db.Querier implementation that transparently routes write operations while keeping reads fast.
Write strategy (secondary instances):
- Attempt the write directly against the local RW SQLite connection (WAL mode allows concurrent writes when there is no lock contention).
- If SQLite returns BUSY or LOCKED (another writer holds the lock) the operation is forwarded to the primary instance via ZMQ JSON-RPC, which serialises writes as the single authoritative writer.
- Only if the proxy call also fails is an error returned to the caller.
Primary instances use the embedded Querier directly; no proxying occurs.
Index ¶
- Constants
- Variables
- func DispatchWrite(ctx context.Context, q db.Querier, req WriteRequest) (json.RawMessage, error)
- func ProxyWriteWithResult[R any](ctx context.Context, p *DBProxy, method string, params any) (R, error)
- func RegisterHandlers(bus BusRegistrar, q db.Querier)
- func RegisterHandlersWithCoordinator(bus BusRegistrar, s WriteSubmitter)
- func RegisterRemembrancesDispatcher(d RemembrancesDispatcher)
- type BusRegistrar
- type DBProxy
- func (p *DBProxy) CreateFile(ctx context.Context, arg db.CreateFileParams) (db.File, error)
- func (p *DBProxy) CreateMessage(ctx context.Context, arg db.CreateMessageParams) (db.Message, error)
- func (p *DBProxy) CreateProject(ctx context.Context, arg db.CreateProjectParams) (db.Project, error)
- func (p *DBProxy) CreateSession(ctx context.Context, arg db.CreateSessionParams) (db.Session, error)
- func (p *DBProxy) DeactivateLowestSkill(ctx context.Context) error
- func (p *DBProxy) DeleteFile(ctx context.Context, id string) error
- func (p *DBProxy) DeleteMessage(ctx context.Context, id string) error
- func (p *DBProxy) DeleteProject(ctx context.Context, id string) error
- func (p *DBProxy) DeleteSession(ctx context.Context, id string) error
- func (p *DBProxy) DeleteSessionFiles(ctx context.Context, sessionID string) error
- func (p *DBProxy) DeleteSessionMessages(ctx context.Context, sessionID string) error
- func (p *DBProxy) IncrementSkillUsage(ctx context.Context, id string) error
- func (p *DBProxy) InsertPromptTemplate(ctx context.Context, arg db.InsertPromptTemplateParams) (db.PromptTemplate, error)
- func (p *DBProxy) InsertSessionScore(ctx context.Context, arg db.InsertSessionScoreParams) (db.SessionScore, error)
- func (p *DBProxy) InsertSkill(ctx context.Context, arg db.InsertSkillParams) (db.SkillLibrary, error)
- func (p *DBProxy) MarkProjectInitialized(ctx context.Context, id string) error
- func (p *DBProxy) ProbePrimary(ctx context.Context) error
- func (p *DBProxy) UpdateFile(ctx context.Context, arg db.UpdateFileParams) (db.File, error)
- func (p *DBProxy) UpdateMessage(ctx context.Context, arg db.UpdateMessageParams) error
- func (p *DBProxy) UpdateProjectLastOpened(ctx context.Context, arg db.UpdateProjectLastOpenedParams) error
- func (p *DBProxy) UpdateProjectStatus(ctx context.Context, arg db.UpdateProjectStatusParams) error
- func (p *DBProxy) UpdateSession(ctx context.Context, arg db.UpdateSessionParams) (db.Session, error)
- func (p *DBProxy) WriteWithRetry(ctx context.Context, method string, params any, timeout time.Duration) error
- type RemembrancesDispatcher
- type WriteError
- type WriteErrorCode
- type WriteMeta
- type WriteRequest
- type WriteSubmitter
- type WriteTimeout
Constants ¶
const MethodDBWrite = "db.write"
MethodDBWrite is the JSON-RPC method name for proxied write operations.
Variables ¶
var DefaultWriteTimeouts = WriteTimeout{ Default: 5 * time.Second, Long: 30 * time.Second, }
DefaultWriteTimeouts is used when no explicit timeout is provided.
Functions ¶
func DispatchWrite ¶ added in v0.306.0
func DispatchWrite(ctx context.Context, q db.Querier, req WriteRequest) (json.RawMessage, error)
DispatchWrite is the exported entry point for dispatching a write request. The writecoordinator calls this from its serialisation loop.
func ProxyWriteWithResult ¶ added in v0.326.0
func ProxyWriteWithResult[R any](ctx context.Context, p *DBProxy, method string, params any) (R, error)
ProxyWriteWithResult forwards a write method and decodes a typed result.
func RegisterHandlers ¶
func RegisterHandlers(bus BusRegistrar, q db.Querier)
RegisterHandlers registers the db.write JSON-RPC handler on the given bus. Only the primary instance should call this.
func RegisterHandlersWithCoordinator ¶ added in v0.306.0
func RegisterHandlersWithCoordinator(bus BusRegistrar, s WriteSubmitter)
RegisterHandlersWithCoordinator registers the db.write handler using a WriteSubmitter for serialisation. Both RegisterHandlers and this function can coexist; the coordinator path is opt-in and replaces the direct-dispatch path on the primary.
func RegisterRemembrancesDispatcher ¶ added in v0.326.0
func RegisterRemembrancesDispatcher(d RemembrancesDispatcher)
RegisterRemembrancesDispatcher registers the RAG write dispatcher on the primary. Must be called before any secondary instance attempts to forward remembrances writes. Calling it more than once replaces the previous registration.
Types ¶
type BusRegistrar ¶ added in v0.306.0
type BusRegistrar interface {
RegisterMethod(method string, handler ipc.HandlerFunc)
}
BusRegistrar is the minimal interface needed to register dbproxy RPC methods.
type DBProxy ¶
type DBProxy struct {
db.Querier // local reads and direct write attempts — embedded interface
// contains filtered or unexported fields
}
DBProxy implements db.Querier. Reads are served from the embedded local querier. Writes first attempt the local RW connection (WAL mode); on SQLITE_BUSY/LOCKED they are forwarded via ZMQ JSON-RPC to the primary.
When client is nil the proxy behaves identically to the embedded querier (useful for the primary instance itself, which never proxies).
func New ¶
New creates a DBProxy backed by local for reads and direct write attempts. Pass a non-nil client and the primary's rpcAddr to enable write proxying. Pass client=nil for primary instances (writes go directly to the local querier).
func NewWithInstanceID ¶ added in v0.306.0
NewWithInstanceID is like New but records the caller's instance ID in every WriteMeta so the primary can attribute writes to the originating secondary.
func (*DBProxy) CreateFile ¶
func (*DBProxy) CreateMessage ¶
func (*DBProxy) CreateProject ¶
func (*DBProxy) CreateSession ¶
func (*DBProxy) DeactivateLowestSkill ¶
func (*DBProxy) DeleteMessage ¶
func (*DBProxy) DeleteProject ¶
func (*DBProxy) DeleteSession ¶
func (*DBProxy) DeleteSessionFiles ¶
func (*DBProxy) DeleteSessionMessages ¶
func (*DBProxy) IncrementSkillUsage ¶
func (*DBProxy) InsertPromptTemplate ¶
func (p *DBProxy) InsertPromptTemplate(ctx context.Context, arg db.InsertPromptTemplateParams) (db.PromptTemplate, error)
func (*DBProxy) InsertSessionScore ¶
func (p *DBProxy) InsertSessionScore(ctx context.Context, arg db.InsertSessionScoreParams) (db.SessionScore, error)
func (*DBProxy) InsertSkill ¶
func (p *DBProxy) InsertSkill(ctx context.Context, arg db.InsertSkillParams) (db.SkillLibrary, error)
func (*DBProxy) MarkProjectInitialized ¶
func (*DBProxy) ProbePrimary ¶ added in v0.326.0
ProbePrimary sends an instance.ping JSON-RPC call to the primary with a 2-second timeout and returns nil on success. Returns an error if the primary is unreachable, the call times out, or the proxy is not configured. Always returns nil when this instance is the primary (no proxy needed).
func (*DBProxy) UpdateFile ¶
func (*DBProxy) UpdateMessage ¶
func (*DBProxy) UpdateProjectLastOpened ¶
func (*DBProxy) UpdateProjectStatus ¶
func (*DBProxy) UpdateSession ¶
type RemembrancesDispatcher ¶ added in v0.326.0
type RemembrancesDispatcher interface {
DispatchRemembrancesWrite(ctx context.Context, method string, params json.RawMessage) (json.RawMessage, error)
}
RemembrancesDispatcher defines the contract for dispatching RAG (KB, Events, Code) write operations on the primary instance. It decouples the dbproxy infrastructure package from the rag business-logic packages, avoiding circular imports.
The primary instance registers an implementation via RegisterRemembrancesDispatcher. When dispatchWrite receives a method name not handled by db.Querier, it delegates to this dispatcher (if registered) so that KB, Events and Code indexing writes forwarded from secondary instances are correctly persisted.
type WriteError ¶ added in v0.306.0
type WriteError struct {
Code WriteErrorCode `json:"code"`
Message string `json:"message"`
Method string `json:"method"`
}
WriteError is a structured error returned by the write channel.
func (*WriteError) Error ¶ added in v0.306.0
func (e *WriteError) Error() string
func (*WriteError) IsRetryable ¶ added in v0.306.0
func (e *WriteError) IsRetryable() bool
IsRetryable reports whether the error is transient and the operation may succeed on retry.
type WriteErrorCode ¶ added in v0.306.0
type WriteErrorCode string
WriteErrorCode identifies the category of a write-channel failure.
const ( ErrCodeTimeout WriteErrorCode = "TIMEOUT" ErrCodeUnreachable WriteErrorCode = "UNREACHABLE" ErrCodeMethodNotFound WriteErrorCode = "METHOD_NOT_FOUND" ErrCodeInvalidParams WriteErrorCode = "INVALID_PARAMS" ErrCodeConflict WriteErrorCode = "CONFLICT" ErrCodeInternal WriteErrorCode = "INTERNAL" )
type WriteMeta ¶ added in v0.306.0
type WriteMeta struct {
SourceInstanceID string `json:"source_instance_id"`
RequestID string `json:"request_id"`
Timestamp string `json:"timestamp"` // RFC3339
}
WriteMeta carries tracing metadata attached to every proxied write request. The primary logs this on each write, making write provenance easy to trace.
type WriteRequest ¶
type WriteRequest struct {
Meta WriteMeta `json:"meta"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
}
WriteRequest is the JSON-RPC params struct for a proxied write.
type WriteSubmitter ¶ added in v0.306.0
type WriteSubmitter interface {
Submit(ctx context.Context, req WriteRequest) (json.RawMessage, error)
}
WriteSubmitter serialises and executes a write request, returning the JSON result. Implemented by writecoordinator.Coordinator to avoid circular imports.