pghub

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2024 License: MIT Imports: 15 Imported by: 0

README

pg-hub-g

release coverage license

Lightweight Golang event infrastructure built on PostgreSQL Notify/Listen.

overview

The pg-hub package provides lightweight event routing infrastructure based on PostgreSQL notify and listen. In basic use cases, it enables database triggers to send events to a node client application. In more sophisticated systems, the RDBMS becomes a top-level broker, enabling database logic and multiple node clients to produce and consume events.

Each event includes three fields, topic, key, and value. The topic is an alphanumeric string that corresponds directly to a PSQL notification channel. The key field contains an ordered list of 64-bit signed integers, and the value field is a JSON object. The key and value fields are encoded into the notification payload, while the topic field corresponds to a PostgreSQL notification channel.

example

import Pg from 'pg'
import PgHub from 'pg-hub'

const pool = new Pg.Pool(config);
const hub = new PgHub.Hub(pool, 5000);

const result1 = await hub.start();

const consumer = hub.consumer((event) => console.log('topic=' + event.topic + ' key=' + event.key + ' value=' + event.value));
consumer.subscribe('topic1');

hub.notify('topic1');
hub.notify('topic1', 100n, {a:1, b:'text', c:[0,1,4]});
hub.notify('topic1', [101n,102n,103n], {a:2, b:{d:1, e:'xyzzy'}, c:[0,1,4]});
hub.notify('topic1', null, [1,2,3]);
hub.notify('topic1', 42n, null);

await new Promise((resolve) => setTimeout(resolve, 1000));

const result2 = await hub.stop();

details

The pg-hub object model includes Hub and Consumer object classes. The Hub class wraps a PostgreSQL connection using node-postgres. Hub objects support event producers with an outbound notify method, and they allow creation of event consumers with a consumer method. The Consumer class represents an event consumer. Consumer objects wrap a callback hook, which receives events based on one or more topic subscriptions.

hub object

A hub is created as follows:

import PgHub from 'pg-hub';
const hub = new PgHub.Hub(pool, reconnectTime, logger, name);

The Hub constructor includes four parameters, pool, reconnectTime, logger, and name. The pool parameter is a Pg.Pool object, used to connect to the PostgreSQL server. The reconnectTime parameter is the time, in milliseconds, that the hub waits between connection (or reconnection) attempts. If inluded, the logger parameter contains an object that exposes four logging methods, debug(), info(), warn(), and error(). The methods may be set to null to disable the corresponding level of logging, and the logger parameter may be omitted (or set to null) to disable logging altogether. The name parameter includes an optional symbolic name, which is exposed through the object's name property but not otherwise used by the hub object itself.

Once created, a hub exposes four public methods: start(), stop(), notify(), and consumer(). The start method opens a connection to the database, drawing a persistent client from the pool parameter passed during construction. The stop method likewise closes down the connection and releases associated resources, allowing the caller to await final completion. The notify method sends an outbound event, and the the consumer method creates a Consumer object to receive inbound events.

hub.start();
const consumer = hub.consumer(event => {});
hub.notify(topic, key, value);
await hub.stop();
consumer object
const consumer = hub.consumer(event => {});
consumer.subsribe('topic0');
consumer.subscribe(['topic1, 'topic2']);
consumer.close();
supervisory notifications

A hub can produce six supervisory notifications, received by consumers as events with a null topic, an empty set of keys, and a value object as follows:

{action:'connect',final:false}
{action:'disconnect',final:false}
{action:'reconnect',final:false}
{action:'stop',final:true}
{action:'subscribe',topic:'topic_name',final:false}
{action:'close',final:true}

The action property describes the cause of notification, and the final property indicates whether the notification is final. In the case of the subscribe action, the additional topic field indicates the topic of the new subscription.

The connect action informs consumers that the hub has connected to the RDBMS and started normal operations (see start() above). The disconnect action informs consumers that the hub has lost its connection, and the reconnect action informs consumers that the connection has been restored. During the gap between a disconnect and reconnect, events can be lost and consumers should act accordingly. The stop action informs consumers that their associated hub has stopped operation (see stop() above). The subscribe action informs the consumer that it has subscribed to the associated topic (see subscribe() above), and the close notification informs the consumer that it has been closed (see close() above).

PostgreSQL Interface
Underlying Event Details

Events are encoded into text format, compatible with the PostgreSQL notify command. Implementation details are available here

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EncodeEvent

func EncodeEvent(topic string, key []int64, value any) (string, error)

func GetStats

func GetStats(stats *Stats)

func SetLogger

func SetLogger(logger *slog.Logger) bool

Types

type Action

type Action uint
const (
	ActionConnect Action = iota
	ActionDisconnect
	ActionReconnect
	ActionStop
	ActionSubscribe
	ActionClose
)

type Advisory

type Advisory struct {
	// contains filtered or unexported fields
}

func (*Advisory) Action

func (s *Advisory) Action() Action

func (*Advisory) Final

func (s *Advisory) Final() bool

func (*Advisory) Topic

func (s *Advisory) Topic() string

type Config

type Config struct {
	Pool         *pgxpool.Pool
	Logger       *slog.Logger
	ConnectRetry time.Duration
}

type ConfirmHook

type ConfirmHook func(bool)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topics ...string) error

type Event

type Event struct {
	// contains filtered or unexported fields
}

func DecodeEvent

func DecodeEvent(notification *pgconn.Notification) (*Event, error)

func (*Event) Key

func (event *Event) Key() []int64

func (*Event) Topic

func (event *Event) Topic() string

func (*Event) Value

func (event *Event) Value() any

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

func New

func New(name string, config *Config) (*Hub, error)

func (*Hub) Consumer

func (hub *Hub) Consumer(notify NotifyHook, topics ...string) (*Consumer, error)

func (*Hub) GetStats

func (hub *Hub) GetStats(stats *HubStats)

func (*Hub) Notify

func (hub *Hub) Notify(topic string, key []int64, value any, confirm ConfirmHook) error

func (*Hub) Start

func (hub *Hub) Start() error

func (*Hub) Stop

func (hub *Hub) Stop(wait bool) error

type HubStats

type HubStats struct {
	Valid             bool
	RxEventCount      uint64
	SubscribeCount    uint32
	ListenCount       uint32
	ListenFailCount   uint32
	UnlistenCount     uint32
	UnlistenFailCount uint32
	NotifyCount       uint32
	NotifyFailCount   uint32
	ConnectCount      uint64
	ConnectFailCount  uint64
	Q1Max             uint32
	Q2Max             uint32
	ConsumerCount     uint32
}

type NotifyHook

type NotifyHook func(*Event)

type Stats

type Stats struct {
	Valid      bool
	MsgMax     uint32
	StartCount uint32
	QueueMax   uint32
	HubCount   uint32
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL