Documentation
¶
Overview ¶
Package postgres provides database operations for the poutbox outbox system. It handles connection management, schema migration, and logical replication for processing jobs.
Example usage with polling mode:
db, err := postgres.Connect(ctx, connStr) defer db.Close() err = postgres.Migrate(ctx, db)
Example usage with logical replication mode:
db, err := postgres.Connect(ctx, connStr)
defer db.Close()
err = postgres.Migrate(ctx, db)
err = postgres.InitializeLogicalReplication(ctx, db)
stream, err := postgres.NewReplicationStream(ctx, replConnStr, 0)
defer stream.Close(ctx)
for event, err := range stream.Events(ctx) {
if err != nil {
break
}
// Process event.Change or event.Keepalive
}
Index ¶
- Constants
- func Connect(ctx context.Context, connStr string) (*sql.DB, error)
- func InitializeLogicalReplication(ctx context.Context, db *sql.DB) error
- func Migrate(ctx context.Context, db *sql.DB) error
- type DBTX
- type EnqueueScheduledParams
- type GetImmediateJobsParams
- type GetScheduledJobsReadyRow
- type InsertDeadLetterBatchParams
- type InsertDeadLetterParams
- type InsertFailedBatchParams
- type InsertFailedParams
- type InsertPartitionMetaParams
- type KeepaliveRequest
- type LSN
- type ListPartitionsRow
- type LogicalReplChange
- type MessageParser
- type PartitionOps
- type PgoutputParser
- type PoutboxCursor
- type PoutboxDeadLetter
- type PoutboxFailed
- type PoutboxImmediate
- type PoutboxPartitionMetum
- type PoutboxScheduled
- type Queries
- func (q *Queries) DeleteFailed(ctx context.Context, db DBTX, id int64) error
- func (q *Queries) DeleteFailedBatch(ctx context.Context, db DBTX, ids []int64) error
- func (q *Queries) DeleteImmediate(ctx context.Context, db DBTX, id int64) error
- func (q *Queries) DeletePartitionMeta(ctx context.Context, db DBTX, partitionName string) error
- func (q *Queries) DeleteScheduledBatch(ctx context.Context, db DBTX, ids []int64) error
- func (q *Queries) EnqueueImmediate(ctx context.Context, db DBTX, payload string) (int64, error)
- func (q *Queries) EnqueueScheduled(ctx context.Context, db DBTX, arg EnqueueScheduledParams) (int64, error)
- func (q *Queries) GetCursor(ctx context.Context, db DBTX) (PoutboxCursor, error)
- func (q *Queries) GetFailedJobsReady(ctx context.Context, db DBTX, batchSize int32) ([]PoutboxFailed, error)
- func (q *Queries) GetImmediateJobs(ctx context.Context, db DBTX, arg GetImmediateJobsParams) ([]PoutboxImmediate, error)
- func (q *Queries) GetScheduledJobsReady(ctx context.Context, db DBTX, batchSize int32) ([]GetScheduledJobsReadyRow, error)
- func (q *Queries) InsertDeadLetter(ctx context.Context, db DBTX, arg InsertDeadLetterParams) error
- func (q *Queries) InsertDeadLetterBatch(ctx context.Context, db DBTX, arg InsertDeadLetterBatchParams) error
- func (q *Queries) InsertFailed(ctx context.Context, db DBTX, arg InsertFailedParams) error
- func (q *Queries) InsertFailedBatch(ctx context.Context, db DBTX, arg InsertFailedBatchParams) error
- func (q *Queries) InsertPartitionMeta(ctx context.Context, db DBTX, arg InsertPartitionMetaParams) error
- func (q *Queries) ListPartitions(ctx context.Context, db DBTX) ([]ListPartitionsRow, error)
- func (q *Queries) UpdateCursor(ctx context.Context, db DBTX, arg UpdateCursorParams) error
- func (q *Queries) UpdateFailed(ctx context.Context, db DBTX, arg UpdateFailedParams) error
- type RelationMetadata
- type ReplicationEvent
- type ReplicationStream
- type UpdateCursorParams
- type UpdateFailedParams
Constants ¶
const ( // PublicationName is the publication used for logical replication. PublicationName = "poutbox_immediate_pub" // ReplicationSlot is the replication slot name for tracking LSN progress. ReplicationSlot = "poutbox_immediate_slot" )
const ( // ReplicationEventTypeKeepalive indicates a keepalive message from the server. ReplicationEventTypeKeepalive = "keepalive" // ReplicationEventTypeInsert indicates an insert change event. ReplicationEventTypeInsert = "insert" )
Variables ¶
This section is empty.
Functions ¶
func Connect ¶
Connect establishes a connection to Postgres using the given connection string. Configures connection pooling: 25 max open, 5 idle, 5min lifetime, 2min idle timeout.
func InitializeLogicalReplication ¶
InitializeLogicalReplication creates the publication and replication slot if they don't exist. Idempotent: safe to call multiple times.
Types ¶
type EnqueueScheduledParams ¶
type GetImmediateJobsParams ¶
type InsertDeadLetterParams ¶
type InsertDeadLetterParams struct {
ID int64
Payload string
ErrorMessage sql.NullString
RetryCount int32
}
type InsertFailedBatchParams ¶
type InsertFailedParams ¶
type InsertFailedParams struct {
ID int64
Payload string
ErrorMessage sql.NullString
RetryCount int32
}
type KeepaliveRequest ¶
type KeepaliveRequest struct {
// ServerWALEnd is the server's current write-ahead log position.
ServerWALEnd LSN
// ReplyRequested indicates if the client should send a status update.
ReplyRequested bool
}
KeepaliveRequest represents a server keepalive message.
type ListPartitionsRow ¶
type LogicalReplChange ¶
type LogicalReplChange struct {
// ID is the job identifier from the database.
ID int64
// Payload is the job data as bytes (typically JSON).
Payload []byte
// CreatedAt is when the change was created in the database.
CreatedAt time.Time
// TransactionID is the transaction ID that produced this change.
TransactionID int64
// CommitLsn is the LSN as stored in the immediate table.
CommitLsn string
// LSN is the write-ahead log position of this change.
LSN LSN
}
LogicalReplChange represents a database change captured via logical replication.
type MessageParser ¶
type MessageParser interface {
// Parse decodes raw WAL data into a pglogrepl message.
Parse(walData []byte) (pglogrepl.Message, error)
// ParseRelation extracts table metadata from a relation message.
ParseRelation(msg *pglogrepl.RelationMessage) *RelationMetadata
// ParseInsert extracts change data from an insert message.
ParseInsert(msg *pglogrepl.InsertMessageV2, relMeta *RelationMetadata, lsn LSN) (*LogicalReplChange, error)
}
MessageParser converts WAL messages into logical replication changes.
type PartitionOps ¶
type PartitionOps struct {
// contains filtered or unexported fields
}
PartitionOps manages table partitions for the immediate queue. Creates new partitions and removes old ones based on time windows.
func NewPartitionOps ¶
func NewPartitionOps(db *sql.DB) *PartitionOps
NewPartitionOps creates a new PartitionOps instance with the given database.
func (*PartitionOps) Run ¶
func (p *PartitionOps) Run(ctx context.Context, from time.Time, to time.Time, interval time.Duration, cutoffTime time.Time) error
Run creates partitions from 'from' to 'to' with the given interval. Drops partitions with end time before cutoffTime. All operations are wrapped in a transaction with locking for safety.
type PgoutputParser ¶
type PgoutputParser struct{}
PgoutputParser implements MessageParser using the pgoutput plugin format.
func NewPgoutputParser ¶
func NewPgoutputParser() *PgoutputParser
NewPgoutputParser creates a new pgoutput message parser.
func (*PgoutputParser) Parse ¶
func (p *PgoutputParser) Parse(walData []byte) (pglogrepl.Message, error)
Parse decodes raw WAL data into a pglogrepl message.
func (*PgoutputParser) ParseInsert ¶
func (p *PgoutputParser) ParseInsert(msg *pglogrepl.InsertMessageV2, relMeta *RelationMetadata, lsn LSN) (*LogicalReplChange, error)
ParseInsert extracts change data from an insert message into a LogicalReplChange.
func (*PgoutputParser) ParseRelation ¶
func (p *PgoutputParser) ParseRelation(msg *pglogrepl.RelationMessage) *RelationMetadata
ParseRelation extracts table metadata from a relation message.
type PoutboxCursor ¶
type PoutboxDeadLetter ¶
type PoutboxFailed ¶
type PoutboxImmediate ¶
type PoutboxPartitionMetum ¶
type PoutboxScheduled ¶
type Queries ¶
type Queries struct {
}
func (*Queries) DeleteFailed ¶
func (*Queries) DeleteFailedBatch ¶
func (*Queries) DeleteImmediate ¶
func (*Queries) DeletePartitionMeta ¶
func (*Queries) DeleteScheduledBatch ¶
func (*Queries) EnqueueImmediate ¶
func (*Queries) EnqueueScheduled ¶
func (*Queries) GetFailedJobsReady ¶
func (*Queries) GetImmediateJobs ¶
func (q *Queries) GetImmediateJobs(ctx context.Context, db DBTX, arg GetImmediateJobsParams) ([]PoutboxImmediate, error)
func (*Queries) GetScheduledJobsReady ¶
func (*Queries) InsertDeadLetter ¶
func (*Queries) InsertDeadLetterBatch ¶
func (*Queries) InsertFailed ¶
func (*Queries) InsertFailedBatch ¶
func (*Queries) InsertPartitionMeta ¶
func (*Queries) ListPartitions ¶
func (*Queries) UpdateCursor ¶
func (*Queries) UpdateFailed ¶
type RelationMetadata ¶
type RelationMetadata struct {
// ID is the relation OID.
ID uint32
// Name is the table name.
Name string
// Columns describes the table columns.
Columns []*pglogrepl.RelationMessageColumn
}
RelationMetadata describes a database table for logical replication.
type ReplicationEvent ¶
type ReplicationEvent struct {
// Type is the event type: "keepalive" or "insert".
Type string
// Change contains the logical replication change data for insert events.
Change *LogicalReplChange
// Keepalive contains the server keepalive request for keepalive events.
Keepalive *KeepaliveRequest
}
ReplicationEvent represents an event received from logical replication. Either Change or Keepalive will be set depending on Type.
type ReplicationStream ¶
type ReplicationStream struct {
// contains filtered or unexported fields
}
ReplicationStream receives logical replication events from Postgres. It parses WAL messages and tracks relation metadata.
func NewReplicationStream ¶
func NewReplicationStream(ctx context.Context, connStr string, startLSN string) (*ReplicationStream, error)
NewReplicationStream creates a new replication stream starting at the given LSN. Connects to Postgres, identifies the system, and starts replication.
func (*ReplicationStream) Close ¶
func (rs *ReplicationStream) Close(ctx context.Context) error
Close closes the replication stream and releases the connection.
func (*ReplicationStream) Events ¶
func (rs *ReplicationStream) Events(ctx context.Context) iter.Seq2[ReplicationEvent, error]
Events returns an iterator of replication events from the stream. Yields either insert changes or keepalive messages as they arrive.
func (*ReplicationStream) SendKeepalive ¶
func (rs *ReplicationStream) SendKeepalive(ctx context.Context, walApplyPosition LSN) error
SendKeepalive sends a status update to the server with the current apply position. Uses a separate timeout to avoid losing messages if the context is cancelled.
type UpdateCursorParams ¶
type UpdateFailedParams ¶
type UpdateFailedParams struct {
ErrorMessage sql.NullString
RetryCount int32
ID int64
}