pqstream

package module
v0.0.0-...-c9027a0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2019 License: MIT Imports: 15 Imported by: 2

README

pqstream

pqstream is a program that streams changes out of a postgres database with the intent of populating other systems and enabling stream processing of data sets.

ci status go report card coverage

installation

$ go get -u github.com/tmc/pqstream/cmd/{pqs,pqsd}

basic usage

create an example database:

$ createdb dbname
# echo "create table notes (id serial, created_at timestamp, note text)" | psql dbname

connect the agent:

$ pqsd -connect postgresql://user:pass@host/dbname

connect the cli:

$ pqs

at this point you will see streams of database operations rendered to stdout:

(in a psql shell):

dbname=# insert into notes values (default, default, 'here is a sample note');
INSERT 0 1
dbname=# insert into notes values (default, default, 'here is a sample note');
INSERT 0 1
dbname=# update notes set note = 'here is an updated note' where id=1;
UPDATE 1
dbname=# delete from notes where id = 1;
DELETE 1
dbname=#

our client should now show our operations:

$ pqs
{"schema":"public","table":"notes","op":"INSERT","id":"1","payload":{"created_at":null,"id":1,"note":"here is a sample note"}}
{"schema":"public","table":"notes","op":"INSERT","id":"2","payload":{"created_at":null,"id":2,"note":"here is a sample note"}}
{"schema":"public","table":"notes","op":"UPDATE","id":"1","payload":{"created_at":null,"id":1,"note":"here is an updated note"},"changes":{"note":"here is a sample note"}}
{"schema":"public","table":"notes","op":"DELETE","id":"1","payload":{"created_at":null,"id":1,"note":"here is an updated note"}}

field redaction

If there's a need to prevent sensitive fields (i.e. PII) from being exported the redactions flag can be used with pqsd:

$ pqsd -connect postgresql://user:pass@host/dbname -redactions='{"public":{"users":["first_name","last_name","email"]}}'

The redactions is encoded in JSON and conforms to the following layout:

'{"schema":{"table":["field1","field2"]}}'`

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FieldRedactions

type FieldRedactions map[string]map[string][]string

FieldRedactions describes how redaction fields are specified. Top level map key is the schema, inner map key is the table and slice is the fields to redact.

func DecodeRedactions

func DecodeRedactions(r string) (FieldRedactions, error)

DecodeRedactions returns a FieldRedactions map decoded from redactions specified in json format.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server implements PQStreamServer and manages both client connections and database event monitoring.

func NewServer

func NewServer(connectionString string, opts ...ServerOption) (*Server, error)

NewServer prepares a new pqstream server.

func (*Server) Close

func (s *Server) Close() error

Close stops the pqstream server.

func (*Server) HandleEvents

func (s *Server) HandleEvents(ctx context.Context) error

HandleEvents processes events from the database and copies them to relevant clients.

func (*Server) InstallTriggers

func (s *Server) InstallTriggers() error

InstallTriggers sets up triggers to start observing changes for the set of tables in the database.

func (*Server) Listen

Listen handles a request to listen for database events and streams them to clients.

func (*Server) RemoveTriggers

func (s *Server) RemoveTriggers() error

RemoveTriggers removes triggers from the database.

type ServerOption

type ServerOption func(*Server)

ServerOption allows customization of a new server.

func WithContext

func WithContext(ctx context.Context) ServerOption

WithContext allows supplying a custom context.

func WithFieldRedactions

func WithFieldRedactions(r FieldRedactions) ServerOption

WithFieldRedactions controls which fields are redacted from the feed.

func WithLogger

func WithLogger(l logrus.FieldLogger) ServerOption

WithLogger allows attaching a custom logger.

func WithTableRegexp

func WithTableRegexp(re *regexp.Regexp) ServerOption

WithTableRegexp controls which tables are managed.

Directories

Path Synopsis
cmd
pqs
pqs is the client for psqd which allows subscription to change events in a postgres database cluster.
pqs is the client for psqd which allows subscription to change events in a postgres database cluster.
pqsd
pqsd is an agent that connects to a postgresql cluster and manages stream emission.
pqsd is an agent that connects to a postgresql cluster and manages stream emission.
contrib
cmd/pqsamq
pqs is the client for psqd which allows subscription to change events in a postgres database cluster.
pqs is the client for psqd which allows subscription to change events in a postgres database cluster.
Package pqs is a generated protocol buffer package.
Package pqs is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL