pgo

command module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2025 License: Apache-2.0 Imports: 1 Imported by: 1

README

pgo (/pɪɡəʊ/): Postgres integrations in Go

usability status: experimental

This project follows a Work → Right → Fast approach:

  1. make it work
  2. refine it
  3. optimize it

It's in the early stage, so the code may be rough/incomplete. Join us in building and improving it!

go install github.com/edgeflare/pgo@main # or make build or download from release page

PostgREST compatible REST API

pgo rest --config pkg/config/example.config.yaml

See godoc and pgo rest --help for more.

Stream Postgres changes to NATS, MQTT, Kafka, Clickhouse, etc

asciicast

  1. Start Postgres, NATS, Kafka, MQTT broker and pgo pipeline as containers
git clone git@github.com:edgeflare/pgo.git

make image

# Set KAFKA_CFG_ADVERTISED_LISTENERS env var in docs/docker-compose.yaml to host IP for local access,
# as opposed to from within container network. adjust Kafka brokers IP in docs/pipeline-example.docker.yaml
make up # docker compose up
  1. Postgres
  • As a source: Create a test table, eg users in source postgres database
PGUSER=postgres PGPASSWORD=secret PGHOST=localhost PGDATABASE=testdb psql
CREATE TABLE IF NOT EXISTS public.users (
  id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
  name TEXT
);

ALTER TABLE public.users REPLICA IDENTITY FULL;
  • As a sink
PGUSER=postgres PGPASSWORD=secret PGHOST=localhost PGDATABASE=testdb PGPORT=5431 psql
  • Create the same users table in sink database for mirroring. altering replica identity may not be needed in sink

  • Create a second table in sink database which stores transformed data

CREATE SCHEMA IF NOT EXISTS another_schema;

CREATE TABLE IF NOT EXISTS  another_schema.transformed_users (
  uuid UUID DEFAULT gen_random_uuid(), -- because we're extracting only `name` field
  -- new_id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, -- to handle UPDATE operations, primary key column type must match in source and sink
  new_name TEXT
);

pgo caches the table schemas for simpler parsing of CDC events (rows). To update pgo cache with newly created tables, either docker restart pgo_pgo_1 or NOTIFY it to reload cache by executing on database

NOTIFY pgo, 'reload schema';
  1. Subscribe
  • MQTT: /any/prefix/schemaName/tableName/operation topic (testing with mosquitto client)
mosquitto_sub -t pgo/public/users/c # operation: c=create, u=update, d=delete, r=read
  • Kafka: topic convention is [prefix].[schema_name].[table_name].[operation]. use any kafka client eg kaf
kaf consume pgo.public.users.c --follow # consume messages until program execution
  • NATS:
nats sub -s nats://localhost:4222 'pgo.public.users.>' # wildcard. includes all nested parts
# nats sub -s nats://localhost:4222 'pgo.public.users.c' # specific
  1. INSERT (or update etc) into users table
INSERT INTO users (name) VALUES ('alice');
INSERT INTO users (name) VALUES ('bob');

And notice NATS, MQTT, Kafka, postgres-sink, or debug peer's respective subscriber receiving the message. It's not Postgres only source. Other peers too can be sources (not all peers fully functional yet).

Clean up

make down

It's also possible to import functions, etc around

  • net/http.Handler
    • router
    • middleware (authentication, logging, CORS, RequestID, ...)
    • Postgres middleware attaches a pgxpool.Conn to request context for authorized user; useful for RLS

If you're curious, start by browsing the examples, skimming over any doc.go, *.md files.

Contributing

Please see CONTRIBUTING.md.

License

Apache License 2.0

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
cmd
pgo
examples
rag
internal
pkg
pglogrepl
Package pglogrepl provides Debezium-compatible change data capture (CDC) events from PostgreSQL write-ahead logs (WAL).
Package pglogrepl provides Debezium-compatible change data capture (CDC) events from PostgreSQL write-ahead logs (WAL).
pgx
Wrapper utils around github.com/jackc/pgx
Wrapper utils around github.com/jackc/pgx
pgx/role
Package role provides functions for managing PostgreSQL roles, including creating, updating, retrieving, and deleting roles.
Package role provides functions for managing PostgreSQL roles, including creating, updating, retrieving, and deleting roles.
pgx/schema
Package schema provides functionality for caching PostgreSQL objects' metadata eg schema of tables / views, function def etc.
Package schema provides functionality for caching PostgreSQL objects' metadata eg schema of tables / views, function def etc.
pipeline
Package pipeline provides a framework for managing data pipelines from/to PostgreSQL to/from various `Peer`s (ie data source/destination).
Package pipeline provides a framework for managing data pipelines from/to PostgreSQL to/from various `Peer`s (ie data source/destination).
pipeline/peer/kafka
Package kafka provides a real-time Kafka-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
Package kafka provides a real-time Kafka-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
pipeline/peer/mqtt
Package mqtt provides a real-time MQTT-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
Package mqtt provides a real-time MQTT-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
pipeline/peer/nats
Package nats provides a real-time NATS-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
Package nats provides a real-time NATS-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
pipeline/transform
Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines.
Package transform provides utilities for applying transformations to change data capture (CDC) events in pipelines.
rest
Package rest provides a PostgreSQL REST API server similar to PostgREST.
Package rest provides a PostgreSQL REST API server similar to PostgREST.
x
x/rag
Package rag provides functions to integrate Retrieval Augmented Generation (RAG) capabilities in PostgreSQL tables using the pgvector extension.
Package rag provides functions to integrate Retrieval Augmented Generation (RAG) capabilities in PostgreSQL tables using the pgvector extension.
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL