query

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package query defines fabriq's capability ports — explicit, engine-typed interfaces per storage capability — and the Fabric facade that exposes them. There is deliberately no unified query language: relational work speaks SQL through grove, graph work speaks openCypher, search speaks queries against declared fields. No engine types (pgx, grove, Falkor, go-elasticsearch) appear in any signature, so adapters stay swappable.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SortField

func SortField(sort string) (column string, desc bool)

SortField splits a "column [DESC]" sort spec into the column and whether it is descending. Empty input yields an empty column — sort by relevance. Adapters and the validator share it so they agree on how a sort is read.

func TraverseAndHydrate

func TraverseAndHydrate(ctx context.Context, reg *registry.Registry, g GraphQuerier, rel RelationalQuerier,
	cypher string, params map[string]any, into any,
) error

TraverseAndHydrate is the composed graph→relational read: run a Cypher traversal that RETURNs aggregate ids (single column), then hydrate the full rows from Postgres in exactly ONE batched relational query. The entity is inferred from into's element type via the registry, so the call site stays Graph().TraverseAndHydrate(ctx, cypher, params, &assets).

Adapters implement GraphQuerier.TraverseAndHydrate by delegating here.

func ValidateConds

func ValidateConds(conds []Cond, has func(column string) bool) error

ValidateConds checks a filter tree against an entity's columns and the operator vocabulary. has reports whether a column exists. Adapters call this before translating, so the same rules guard every engine.

func ValidateSearchQuery

func ValidateSearchQuery(q SearchQuery, searchFields []string) error

ValidateSearchQuery checks a SearchQuery's Filter and Sort against the entity's indexed fields and the operator vocabulary — so every search adapter rejects the same unknown-column / bad-operator inputs (and the same injection surface) before translating to its engine. searchFields is the entity's declared search fields; id/tenant_id/version are always allowed.

Types

type Cond

type Cond struct {
	Column string
	Op     Op
	Value  any    // a slice for In/NotIn; ignored for IsNull/IsNotNull
	Or     []Cond // when non-empty, an OR group (Column/Op/Value ignored)
}

Cond is one engine-neutral predicate on a column — or, when Or is set, an OR group of sub-predicates. Build them with the constructors (Eq, In, Like, Or, …) rather than by hand.

func Eq

func Eq(column string, value any) Cond

Eq builds column = value.

func Gt

func Gt(column string, value any) Cond

Gt builds column > value.

func Gte

func Gte(column string, value any) Cond

Gte builds column >= value.

func ILike

func ILike(column, pattern string) Cond

ILike builds column ILIKE pattern (case-insensitive).

func In

func In(column string, values any) Cond

In builds column IN (values...); values must be a non-empty slice.

func IsNotNull

func IsNotNull(column string) Cond

IsNotNull builds column IS NOT NULL.

func IsNull

func IsNull(column string) Cond

IsNull builds column IS NULL.

func Like

func Like(column, pattern string) Cond

Like builds column LIKE pattern (case-sensitive; SQL % / _ wildcards).

func Lt

func Lt(column string, value any) Cond

Lt builds column < value.

func Lte

func Lte(column string, value any) Cond

Lte builds column <= value.

func Ne

func Ne(column string, value any) Cond

Ne builds column != value.

func NotIn

func NotIn(column string, values any) Cond

NotIn builds column NOT IN (values...); values must be a non-empty slice.

func Or

func Or(conds ...Cond) Cond

Or groups sub-conditions into a single OR predicate.

func (Cond) IsGroup

func (c Cond) IsGroup() bool

IsGroup reports whether the Cond is an OR group rather than a leaf.

type Delta

type Delta struct {
	// StreamID is the transport position (Redis stream entry ID). It maps
	// 1:1 onto the SSE "id:" field so Last-Event-ID resume works.
	StreamID string `json:"stream_id"`

	// Channel the delta was delivered on.
	Channel string `json:"channel"`

	TenantID  string          `json:"tenant_id"`
	Aggregate string          `json:"aggregate"`
	AggID     string          `json:"agg_id"`
	Version   int64           `json:"version"`
	Type      string          `json:"type"`
	At        time.Time       `json:"at"`
	Payload   json.RawMessage `json:"payload"`
}

Delta is what Subscribe channels carry: one change notification, small enough to conflate, complete enough that simple UIs can patch state without a refetch (and rich UIs can refetch on demand).

func DeltaFromEnvelope

func DeltaFromEnvelope(channel, streamID string, env event.Envelope) Delta

DeltaFromEnvelope projects an event envelope onto a channel.

type Fabric

type Fabric interface {
	// Exec runs one command: the only write path for aggregates.
	Exec(ctx context.Context, cmd command.Command) (command.Result, error)

	// ExecBatch runs N commands in one transaction, ordered, all-or-nothing.
	ExecBatch(ctx context.Context, cmds []command.Command) ([]command.Result, error)

	Relational() RelationalQuerier
	Graph() GraphQuerier
	Search() SearchQuerier
	Timeseries() TSQuerier
	Vector() VectorQuerier
	Spatial() SpatialQuerier
	Document() document.Store
	Blob() blob.Store

	// Subscribe resolves the scope to a channel server-side (authz hook
	// included) and returns a conflated delta stream.
	Subscribe(ctx context.Context, scope SubscribeScope) (<-chan Delta, error)

	// WaitForProjection blocks until the named projection has applied the
	// aggregate at or beyond version, or the context deadline expires
	// (ErrProjectionLag). It is the read-your-writes helper for callers
	// that need a projection-backed query right after a command.
	WaitForProjection(ctx context.Context, projection, aggregate, aggID string, version int64) error
}

Fabric is the facade application code holds. Open() wires it from configured adapters; fabriqtest wires it from fakes.

type Geometry

type Geometry struct {
	WKT  string
	SRID int
}

Geometry is an engine-neutral geometry value: WKT plus its SRID. e.g. {WKT: "POINT Z (10 20 3)", SRID: 0} (local/planar, metres) or {WKT: "POINT (-122.4 37.8)", SRID: 4326} (geographic).

type GraphQuerier

type GraphQuerier interface {
	// Query runs a read-only openCypher query. into may be *[]string for
	// single-column id traversals, or a pointer to a slice of structs for
	// multi-column rows (adapter-mapped).
	Query(ctx context.Context, cypher string, params map[string]any, into any) error

	// TraverseAndHydrate runs a traversal that RETURNs ids, then hydrates
	// the rows from Postgres in one batched relational query. The target
	// entity is inferred from into's element type via the registry. Never
	// N+1.
	TraverseAndHydrate(ctx context.Context, cypher string, params map[string]any, into any) error

	// ApplyMutations applies engine-neutral projection mutations to the
	// named target graph (projection consumers and rebuilds only).
	ApplyMutations(ctx context.Context, target string, muts []projection.Mutation) error
}

GraphQuerier queries the knowledge-graph projection. Cypher shipped in this repo must stick to the openCypher common subset — the adapters/graphtest conformance suite is the gate for engine swaps.

type ListQuery

type ListQuery struct {
	// Where: conditions ANDed together. Use the constructors; Eqs(map)
	// is the terse equality shorthand.
	Where Where
	// OrderBy: one or more comma-separated "col [ASC|DESC]" terms, e.g.
	// "sort_order ASC, created_at ASC". Empty orders by id.
	OrderBy string
	Limit   int
	Offset  int
}

ListQuery selects, filters, orders and paginates an entity's rows. The filter is a single structured, engine-neutral mechanism: Where is a list of conditions, ANDed, built with Eq, Ne, Gt/Lt, In, Like/ILike, IsNull, Or, … (and Eqs for the pure-equality case). Columns are validated against the entity — an unknown column is rejected, which is also the injection guard. Reads the structured filter cannot express drop to raw Query.

type Op

type Op string

Op is the bounded, engine-neutral comparison vocabulary the RelationalQuerier accepts on List filters. It is deliberately a fixed allowlist (not arbitrary SQL): adapters map each Op to their dialect, columns are validated against the entity, and values travel as bound parameters — so a structured filter is as injection-safe as the equality shorthand, while covering what grove's builder expresses. Anything outside this vocabulary belongs in the raw Query escape hatch.

const (
	OpEq        Op = "eq"
	OpNe        Op = "ne"
	OpGt        Op = "gt"
	OpGte       Op = "gte"
	OpLt        Op = "lt"
	OpLte       Op = "lte"
	OpIn        Op = "in"
	OpNotIn     Op = "notin"
	OpLike      Op = "like"
	OpILike     Op = "ilike"
	OpIsNull    Op = "isnull"
	OpIsNotNull Op = "isnotnull"
)

func (Op) IsSet

func (o Op) IsSet() bool

IsSet reports whether the operator is one of the known operators.

func (Op) NeedsValue

func (o Op) NeedsValue() bool

NeedsValue reports whether the operator takes a value.

type Point

type Point struct {
	Key     string // series key within the tenant, e.g. tag id
	At      time.Time
	Value   float64
	Quality int
}

Point is one telemetry sample.

type RangeQuery

type RangeQuery struct {
	Series string
	Key    string
	From   time.Time
	To     time.Time
	Bucket time.Duration // 0 = raw points
	Agg    string        // "avg", "min", "max", "last" (when Bucket > 0)
}

RangeQuery reads a time window of a series, optionally bucketed.

type RelationalQuerier

type RelationalQuerier interface {
	// Get loads one aggregate row by id into a model pointer.
	Get(ctx context.Context, entity, id string, into any) error

	// GetMany loads many rows in ONE batched query (WHERE id = ANY($1)) —
	// the dataloader-style hydration primitive. Order follows ids; missing
	// rows are skipped.
	GetMany(ctx context.Context, entity string, ids []string, into any) error

	// List pages through an entity's rows with a structured, engine-neutral
	// filter (Where/Filter), ordering and pagination. The filter covers
	// grove's builder expressiveness — operators, IN, LIKE, null checks,
	// OR groups — without leaking engine types; reads it cannot express
	// drop to the raw Query escape hatch.
	List(ctx context.Context, entity string, q ListQuery, into any) error

	// Query is the raw SQL escape hatch (still tenant-guarded). Use it for
	// reads the structured filter cannot express; writes belong to Exec.
	Query(ctx context.Context, into any, sql string, args ...any) error
}

RelationalQuerier reads the source of truth through grove. Every method is tenant-scoped structurally; the grove hook backstop asserts it.

type Repo

type Repo[T any] struct {
	// contains filtered or unexported fields
}

Repo is a type-safe view over one entity, parameterised by its grove model T. It is a thin generic layer over RelationalQuerier — the interface stays string/any (Go interface methods cannot be generic, and the untyped form is what adapters and fakes implement), while Repo gives call sites the entity-from-type and typed results:

repo, _ := query.For[domain.Asset](reg, f.Relational())
asset, err := repo.Get(ctx, id)            // *domain.Asset, not any
pumps, err := repo.List(ctx, query.ListQuery{Where: []query.Cond{query.Eq("kind", "pump")}})

It adds no query capability beyond the ports — just typing. The graph, search and vector queriers are optional; the relational one is required.

func For

func For[T any](reg *registry.Registry, rel RelationalQuerier) (*Repo[T], error)

For builds a typed Repo by resolving T's registered entity. T is the grove model struct (value or pointer); an unregistered type errors. The repo is relational-only until you attach projection queriers via With* (fabriq.For wires them all from the facade).

func (*Repo[T]) Entity

func (r *Repo[T]) Entity() string

Entity returns the resolved registry entity name.

func (*Repo[T]) Get

func (r *Repo[T]) Get(ctx context.Context, id string) (*T, error)

Get loads one row by id, typed.

func (*Repo[T]) GetMany

func (r *Repo[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)

GetMany loads many rows in one batched query, typed; order follows ids, missing rows are skipped.

func (*Repo[T]) In

func (r *Repo[T]) In(ctx context.Context, id, rel string) ([]*T, error)

In is Out with the edge reversed: same-type neighbours one hop in along a self-edge — MATCH (n:L {id:$id})<-[:REL]-(m:L) RETURN m.id.

func (*Repo[T]) List

func (r *Repo[T]) List(ctx context.Context, q ListQuery) ([]*T, error)

List runs a structured query, typed. When result-set caching is enabled for the entity, the ordered id-list is cached (Versioned by the entity generation, TTL backstop) and hydrated through GetMany.

func (*Repo[T]) One

func (r *Repo[T]) One(ctx context.Context, where ...Cond) (*T, error)

One fetches the single row matching the given conditions (ANDed) — the "load one by something other than id" primitive (e.g. a unique serial):

pump, err := repo.One(ctx, query.Eq("serial", "SN-777"))

Zero matches is ErrNotFound; more than one is an error (One means one). It needs no ListQuery — order and pagination are meaningless for a single row — and caps the read at two to detect ambiguity cheaply.

func (*Repo[T]) Out

func (r *Repo[T]) Out(ctx context.Context, id, rel string) ([]*T, error)

Out returns the same-type neighbours one hop out along a self-edge, typed and hydrated from Postgres:

MATCH (n:Asset {id:$id})-[:CHILD_OF]->(m:Asset) RETURN m.id

parents, err := repo.Out(ctx, assetID, "CHILD_OF") // []*domain.Asset

rel must be an edge this entity declares whose Target is the entity itself (a self-edge) — that is what makes the []*T result sound. Edges to other entity types, and anything outside MATCH/edge/RETURN, drop to the raw Traverse escape hatch. These helpers emit only the openCypher common subset the graphtest conformance suite gates, so they stay portable across graph engines.

func (*Repo[T]) Reachable

func (r *Repo[T]) Reachable(ctx context.Context, id, rel string, minHops, maxHops int) ([]*T, error)

Reachable returns the same-type nodes reachable from id by following a self-edge between minHops and maxHops times (a variable-length path) — the typed ancestors/descendants walk:

MATCH (n:Asset {id:$id})-[:CHILD_OF*1..3]->(m:Asset) RETURN m.id

ancestors, err := repo.Reachable(ctx, assetID, "CHILD_OF", 1, 5)

minHops must be >= 1 and maxHops within [minHops, 16]. Ids are deduped (multiple paths may reach the same node) before hydration. For the reverse direction or richer shapes, use raw Traverse.

func (*Repo[T]) Search

func (r *Repo[T]) Search(ctx context.Context, text string, limit int) ([]*T, error)

Search runs a full-text query against the entity's declared search fields, then hydrates the matching rows from Postgres in one batched query — typed entities in relevance order, never N+1. It is the one-line form of SearchWith; reach for SearchWith when you need filters, sort or pagination. For raw hits (highlighting, scores) use f.Search() directly.

func (*Repo[T]) SearchWith

func (r *Repo[T]) SearchWith(ctx context.Context, req SearchRequest) ([]*T, error)

SearchWith runs a structured full-text query — free text plus optional non-scoring Filter (the same Cond vocabulary as List), Sort and pagination — and hydrates the matches from Postgres in one batched query, typed:

hits, _ := repo.SearchWith(ctx, query.SearchRequest{
    Query:  "centrifugal",
    Filter: query.Where{query.Eq("kind", "pump"), query.Gte("version", 3)},
    Sort:   "name",
    Limit:  20,
})

Filter and Sort may reference only indexed fields (the declared search fields plus id/tenant_id/version). There is no raw engine-DSL form by design — full-text search has no portable raw language, so the structured query is the whole surface, which keeps the port swappable.

func (*Repo[T]) Similar

func (r *Repo[T]) Similar(ctx context.Context, embedding []float32, k int) ([]*T, error)

Similar runs a vector nearest-neighbour search and hydrates the matched rows from Postgres in one batched query — typed entities in similarity order, never N+1. For the relevance scores use f.Vector() directly.

func (*Repo[T]) Traverse

func (r *Repo[T]) Traverse(ctx context.Context, cypher string, params map[string]any) ([]*T, error)

Traverse runs a graph traversal that RETURNs ids and hydrates the full rows from Postgres in one batched query — typed, never N+1. The Cypher stays raw (the graph's swappability rests on common-subset openCypher, not a builder); only the result is typed:

assets, err := repo.Traverse(ctx,
    `MATCH (a:Asset)-[:LOCATED_AT]->(:Site {id:$s}) RETURN a.id`,
    map[string]any{"s": siteID})

func (*Repo[T]) WithGraph

func (r *Repo[T]) WithGraph(g GraphQuerier) *Repo[T]

WithGraph attaches the graph querier (enables Traverse).

func (*Repo[T]) WithResultCache

func (r *Repo[T]) WithResultCache(c cache.Cache, ks cache.Keyspace) *Repo[T]

WithResultCache enables result-set (id-list) caching for this repo, keyed in ks (an entity-keyed Versioned keyspace). Wired by fabriq.For for opted-in entities; a repo without it behaves exactly as before.

func (*Repo[T]) WithSearch

func (r *Repo[T]) WithSearch(s SearchQuerier) *Repo[T]

WithSearch attaches the search querier (enables Search).

func (*Repo[T]) WithVector

func (r *Repo[T]) WithVector(v VectorQuerier) *Repo[T]

WithVector attaches the vector querier (enables Similar).

type SearchQuerier

type SearchQuerier interface {
	// Search runs a query against an entity's declared search fields.
	Search(ctx context.Context, q SearchQuery, into any) error

	// ApplyMutations applies DocIndex/DocDeindex mutations to the named
	// target index (projection consumers and rebuilds only).
	ApplyMutations(ctx context.Context, target string, muts []projection.Mutation) error
}

SearchQuerier queries the full-text projection.

type SearchQuery

type SearchQuery struct {
	Entity string
	Query  string
	// Filter narrows results with the same Cond vocabulary as relational
	// List (Eq/In/Gt/…/Or), applied by engines in non-scoring filter
	// context. Columns must be indexed fields.
	Filter Where
	// Sort is an indexed column, optionally suffixed " DESC". Empty sorts
	// by relevance score.
	Sort string
	// Limit caps the page size (adapter default when <= 0); Offset skips
	// that many leading hits.
	Limit  int
	Offset int
}

SearchQuery is a full-text query over an entity's declared fields, optionally narrowed by structured non-scoring filters, ordered and paginated. Filter and Sort are validated against the INDEXED fields (the declared search fields plus id/tenant_id/version) — you can only filter or sort on what the index holds.

There is deliberately no raw engine-DSL field. Unlike relational (raw SQL) and graph (raw openCypher common subset), full-text search has no portable raw language — an Elasticsearch query body could not be honored by a Postgres-FTS or Typesense adapter — so a raw DSL would break the swappable-port contract. Everything expressible stays in this neutral struct; genuinely engine-specific needs belong on a dedicated adapter method, outside the port.

type SearchRequest

type SearchRequest struct {
	Query  string
	Filter Where
	Sort   string
	Limit  int
	Offset int
}

SearchRequest is the call-site form of a structured search over a typed Repo: SearchQuery without Entity, which the Repo supplies from T. Build Filter with the same constructors as relational filters (query.Eq, …).

type SpatialMatch

type SpatialMatch struct {
	ID        string
	DistanceM float64 // metres
	Meta      map[string]any
}

SpatialMatch is one nearest-neighbour hit, nearest first.

type SpatialQuerier

type SpatialQuerier interface {
	// Upsert stores or replaces the geometry for (tenant, entity, id).
	Upsert(ctx context.Context, entity, id string, geom Geometry, meta map[string]any) error
	// Within returns entities whose geometry lies within q.RadiusM of q.Center,
	// nearest-first, scanned into *[]SpatialMatch.
	Within(ctx context.Context, q SpatialQuery, into any) error
	// Delete removes the geometry for (tenant, entity, id).
	Delete(ctx context.Context, entity, id string) error
}

SpatialQuerier is the geometry port (PostGIS). Geometry is exchanged as WKT plus an SRID — engine-neutral, covering point/line/polygon. Consumers holding GeoJSON convert to WKT at the boundary. Like Vector, it is direct-write.

type SpatialQuery

type SpatialQuery struct {
	Entity  string
	Center  Geometry
	RadiusM float64 // radius in metres
	K       int     // cap; <=0 → adapter default
}

SpatialQuery is a radius search around a center point.

type SubscribeScope

type SubscribeScope struct {
	// Entity is the registry entity name.
	Entity string `json:"entity"`

	// Scope is a scope name declared in the entity's spec ("id", "site",
	// "tenant", ...).
	Scope string `json:"scope"`

	// ID is the scope id (aggregate id for "id" scopes, container id for
	// field scopes). Ignored for tenant scope — the tenant always comes
	// from the authenticated context.
	ID string `json:"id,omitempty"`
}

SubscribeScope is a subscription request. The channel is always resolved server-side from the validated scope plus the context tenant — client input never names a channel or tenant directly.

type TSQuerier

type TSQuerier interface {
	BulkWrite(ctx context.Context, series string, points []Point) error
	Range(ctx context.Context, q RangeQuery, into any) error
}

TSQuerier is the telemetry port (TimescaleDB hypertables). BulkWrite is the event-bypass ingest path: per-row events would melt the outbox, so bulk telemetry skips it and the relay publishes conflated deltas instead.

type VectorMatch

type VectorMatch struct {
	ID    string
	Score float64 // cosine similarity, higher is closer
	Meta  map[string]any
}

VectorMatch is one nearest-neighbour hit, best first.

type VectorQuerier

type VectorQuerier interface {
	Upsert(ctx context.Context, entity, id string, embedding []float32, meta map[string]any) error
	Similar(ctx context.Context, q VectorQuery, into any) error
	// Delete removes the embedding for (tenant, entity, id). Deleting a missing
	// id is a no-op. Mirrors SpatialQuerier.Delete.
	Delete(ctx context.Context, entity, id string) error
	// Get returns the stored embedding for (entity, id), or a *fabriqerr.NotFoundError
	// when absent. Mirrors RelationalQuerier.Get's get-by-id + NotFound convention.
	Get(ctx context.Context, entity, id string) ([]float32, error)
	// DeleteByMeta removes every embedding for (tenant, entity) whose meta
	// contains all key/value pairs in filter (AND-of-equals). An empty filter
	// deletes ALL embeddings for (tenant, entity) — scope intentionally.
	DeleteByMeta(ctx context.Context, entity string, filter map[string]string) error
}

VectorQuerier is the embedding port (pgvector).

type VectorQuery

type VectorQuery struct {
	Entity    string
	Embedding []float32
	K         int
	// Filter restricts matches to embeddings whose meta contains all of these
	// key/value pairs (exact-match, AND-of-equals). Empty/nil = no filter.
	Filter map[string]string
}

VectorQuery is a nearest-neighbour search.

type Where

type Where []Cond

Where is a conjunction (AND) of conditions — the assignable filter type carried by ListQuery and accepted across the query API. Build it as a literal (query.Where{Eq(...), Like(...)}), from Eqs(map), or by append.

func Eqs

func Eqs(equalities map[string]any) Where

Eqs is the terse equality shorthand: it turns a column=value map into a Where of Eq conditions, sorted by column for deterministic SQL. Use it when you just want "match these columns": ListQuery{Where: Eqs(m)}; mix with other conditions via append(Eqs(m), Like(...)).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL