Documentation
¶
Index ¶
- Variables
- type PgMQ
- func NewFromPgxConfig(ctx context.Context, pgConf *pgxpool.Config) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)
- func NewFromPgxConnStr(ctx context.Context, connString string) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)
- func NewFromPgxPool(ctx context.Context, srcpool *pgxpool.Pool) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)
- func (p *PgMQ) Archive(ctx context.Context, queue string, msgID int64) (archived bool, err error)
- func (p *PgMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) (archived []int64, err error)
- func (p *PgMQ) ArchiveBatchTX(ctx context.Context, queue string, msgIDs []int64) (tx pgx.Tx, archived []int64, err error)
- func (p *PgMQ) ArchiveBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgIDs []int64) (archived []int64, err error)
- func (p *PgMQ) ArchiveTX(ctx context.Context, queue string, msgID int64) (tx pgx.Tx, archived bool, err error)
- func (p *PgMQ) ArchiveWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64) (archived bool, err error)
- func (p *PgMQ) Close()
- func (p *PgMQ) CreateQueue(ctx context.Context, queue string) (err error)
- func (p *PgMQ) CreateUnloggedQueue(ctx context.Context, queue string) (err error)
- func (p *PgMQ) Delete(ctx context.Context, queue string, msgID int64) (deleted bool, err error)
- func (p *PgMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) (deleted []int64, err error)
- func (p *PgMQ) DeleteBatchTX(ctx context.Context, queue string, msgIDs []int64) (tx pgx.Tx, deleted []int64, err error)
- func (p *PgMQ) DeleteBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgIDs []int64) (deleted []int64, err error)
- func (p *PgMQ) DeleteTX(ctx context.Context, queue string, msgID int64) (tx pgx.Tx, deleted bool, err error)
- func (p *PgMQ) DeleteWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64) (deleted bool, err error)
- func (p *PgMQ) DropQueue(ctx context.Context, queue string) (err error)
- func (p *PgMQ) Exec(ctx context.Context, sql string, args ...any) (r pgx.Rows, err error)
- func (p *PgMQ) ExecTX(ctx context.Context, sql string, args ...any) (tx pgx.Tx, r pgx.Rows, err error)
- func (p *PgMQ) ExecWithTX(ctx context.Context, tx pgx.Tx, sql string, args ...any) (r pgx.Rows, err error)
- func (p *PgMQ) Ping() error
- func (p *PgMQ) Pool() *pgxpool.Pool
- func (p *PgMQ) Pop(ctx context.Context, queue string) (_ *PgMQMessage, err error)
- func (p *PgMQ) PopTX(ctx context.Context, queue string) (tx pgx.Tx, _ *PgMQMessage, err error)
- func (p *PgMQ) PopWithTX(ctx context.Context, tx pgx.Tx, queue string) (_ *PgMQMessage, err error)
- func (p *PgMQ) Read(ctx context.Context, queue string, vt int64, condition json.RawMessage) (_ *PgMQMessage, err error)
- func (p *PgMQ) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64, ...) (msgs []*PgMQMessage, err error)
- func (p *PgMQ) ReadBatchTX(ctx context.Context, queue string, vt int64, numMsgs int64, ...) (tx pgx.Tx, msgs []*PgMQMessage, err error)
- func (p *PgMQ) ReadBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, vt int64, numMsgs int64, ...) (msgs []*PgMQMessage, err error)
- func (p *PgMQ) ReadTX(ctx context.Context, queue string, vt int64, condition json.RawMessage) (tx pgx.Tx, _ *PgMQMessage, err error)
- func (p *PgMQ) ReadWithTX(ctx context.Context, tx pgx.Tx, queue string, vt int64, ...) (_ *PgMQMessage, err error)
- func (p *PgMQ) Send(ctx context.Context, queue string, msg json.RawMessage, ...) (int64, error)
- func (p *PgMQ) SendBatch(ctx context.Context, queue string, msgs []json.RawMessage, ...) ([]int64, error)
- func (p *PgMQ) SendBatchTX(ctx context.Context, queue string, msgs []json.RawMessage, ...) (tx pgx.Tx, msgIDs []int64, err error)
- func (p *PgMQ) SendBatchWithDelay(ctx context.Context, queue string, msgs []json.RawMessage, ...) (msgIDs []int64, err error)
- func (p *PgMQ) SendBatchWithDelayTX(ctx context.Context, queue string, msgs []json.RawMessage, ...) (tx pgx.Tx, msgIDs []int64, err error)
- func (p *PgMQ) SendBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgs []json.RawMessage, ...) (msgIDs []int64, err error)
- func (p *PgMQ) SendBatchWithTXDelay(ctx context.Context, tx pgx.Tx, queue string, msgs []json.RawMessage, ...) (msgIDs []int64, err error)
- func (p *PgMQ) SendTX(ctx context.Context, queue string, msg json.RawMessage, ...) (pgx.Tx, int64, error)
- func (p *PgMQ) SendWithDelay(ctx context.Context, queue string, msg json.RawMessage, ...) (msgID int64, err error)
- func (p *PgMQ) SendWithDelayTX(ctx context.Context, queue string, msg json.RawMessage, ...) (tx pgx.Tx, msgID int64, err error)
- func (p *PgMQ) SendWithTX(ctx context.Context, tx pgx.Tx, queue string, msg json.RawMessage, ...) (int64, error)
- func (p *PgMQ) SendWithTxDelay(ctx context.Context, tx pgx.Tx, queue string, msg json.RawMessage, ...) (msgID int64, err error)
- func (p *PgMQ) SetVT(ctx context.Context, queue string, msgID int64, invTime int64) (_ int64, err error)
- func (p *PgMQ) SetVTTX(ctx context.Context, queue string, msgID int64, invTime int64) (tx pgx.Tx, _ int64, err error)
- func (p *PgMQ) SetVTWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64, invTime int64) (_ int64, err error)
- func (p *PgMQ) WithDefaultTxOptions(newTxOpt pgx.TxOptions) *PgMQ
- func (p *PgMQ) WithDefaultVT(newVT int64) *PgMQ
- type PgMQMessage
- type PgMQQueueRecord
Constants ¶
This section is empty.
Variables ¶
var ErrNoRows = errors.New("gopgmq: no rows in result set")
Functions ¶
This section is empty.
Types ¶
type PgMQ ¶
type PgMQ struct { ActiveOps atomic.Int64 // number of ongoing activities - used to properly shutdown // contains filtered or unexported fields }
func NewFromPgxConfig ¶
func NewFromPgxConfig(ctx context.Context, pgConf *pgxpool.Config) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)
NewFromPgxConfig creates a PgMQ object if pgConf is valid and a connexion is established with the underlying database
func NewFromPgxConnStr ¶
func NewFromPgxConnStr(ctx context.Context, connString string) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)
NewFromPgxConnStr creates a PgMQ object if connString is valid and a connexion is established with the underlying database
func NewFromPgxPool ¶
func NewFromPgxPool(ctx context.Context, srcpool *pgxpool.Pool) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)
NewFromPgxPool creates a PgMQ object if the provided srcpool has a valid connexion to the underlying database and the pgmq extension is available
func (*PgMQ) Archive ¶
Archive moves a message from the queue table to the archive table by its id. View messages on the archive table with sql:
SELECT * FROM pgmq.a_<queue_name>;
func (*PgMQ) ArchiveBatch ¶
func (p *PgMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) (archived []int64, err error)
ArchiveBatch moves a batch of messages from the queue table to the archive table by their ids. View messages on the archive table with sql:
SELECT * FROM pgmq.a_<queue_name>;
func (*PgMQ) ArchiveBatchTX ¶
func (p *PgMQ) ArchiveBatchTX(ctx context.Context, queue string, msgIDs []int64) (tx pgx.Tx, archived []int64, err error)
ArchiveBatchTX creates a transaction and moves a batch of messages from the queue table to the archive table by their ids. Messages can be viewed on the archive table with sql:
SELECT * FROM pgmq.a_<queue_name>;
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) ArchiveBatchWithTX ¶
func (p *PgMQ) ArchiveBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgIDs []int64) (archived []int64, err error)
ArchiveBatchWithTX receives a transaction and moves a batch of messages from the queue table to the archive table by their ids. Messages can be viewed on the archive table with sql:
SELECT * FROM pgmq.a_<queue_name>;
Transaction will be committed / rolled back by the caller
func (*PgMQ) ArchiveTX ¶
func (p *PgMQ) ArchiveTX(ctx context.Context, queue string, msgID int64) (tx pgx.Tx, archived bool, err error)
ArchiveTX creates a transaction and moves a message from the queue table to the archive table by its id. Messages can be viewed on the archive table with sql:
SELECT * FROM pgmq.a_<queue_name>;
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) ArchiveWithTX ¶
func (p *PgMQ) ArchiveWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64) (archived bool, err error)
ArchiveWithTX receives a transaction and moves a message from the queue table to the archive table by its id. Messages can be viewed on the archive table with sql:
SELECT * FROM pgmq.a_<queue_name>;
Transaction will be committed / rolled back by the caller
func (*PgMQ) Close ¶
func (p *PgMQ) Close()
Close will cancel the context and wait for all transactions to be terminated
func (*PgMQ) CreateQueue ¶
CreateQueue creates a new unpartitioned queue. It prepares the queue's tables, indexes, and metadata.
func (*PgMQ) CreateUnloggedQueue ¶
CreateUnloggedQueue creates a new unlogged queue, which uses an unlogged table under the hood. This sets up the queue's tables, indexes, and metadata.
func (*PgMQ) Delete ¶
Delete deletes a message from the queue by its id. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the Archive method.
func (*PgMQ) DeleteBatch ¶
func (p *PgMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) (deleted []int64, err error)
DeleteBatch deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. If you want to retain a log of the messages, use the ArchiveBatch method.
func (*PgMQ) DeleteBatchTX ¶
func (p *PgMQ) DeleteBatchTX(ctx context.Context, queue string, msgIDs []int64) (tx pgx.Tx, deleted []int64, err error)
DeleteBatchTX creates a transaction and deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. To retain a log of the messages, ArchiveBatch method has to be used.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) DeleteBatchWithTX ¶
func (p *PgMQ) DeleteBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgIDs []int64) (deleted []int64, err error)
DeleteBatchWithTX receives a transaction and deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. To retain a log of the messages, the ArchiveBatch method has to be used.
Transaction will be committed / rolled back by the caller
func (*PgMQ) DeleteTX ¶
func (p *PgMQ) DeleteTX(ctx context.Context, queue string, msgID int64) (tx pgx.Tx, deleted bool, err error)
DeleteTX creates a transaction and deletes a message from the queue by its id. This is a permanent delete and cannot be undone. To retain a log of the message, Archive method has to be used.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) DeleteWithTX ¶
func (p *PgMQ) DeleteWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64) (deleted bool, err error)
DeleteTX receives a transaction and deletes a message from the queue by its id. This is a permanent delete and cannot be undone. To retain a log of the message, Archive method has to be used.
Transaction will be committed / rolled back by the caller
func (*PgMQ) DropQueue ¶
DropQueue deletes the given queue. It deletes the queue's tables, indices, and metadata. It will return an error if the queue does not exist.
func (*PgMQ) Exec ¶
Exec executes given query against the current DB connection
Returned pgx.Rows must be closed by the calling code
func (*PgMQ) ExecTX ¶
func (p *PgMQ) ExecTX(ctx context.Context, sql string, args ...any) (tx pgx.Tx, r pgx.Rows, err error)
ExecTX creates a transaction and executes the given query against the current DB connection
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) ExecWithTX ¶
func (p *PgMQ) ExecWithTX(ctx context.Context, tx pgx.Tx, sql string, args ...any) (r pgx.Rows, err error)
ExecWithTX receives a transaction and executes the given query against the current DB connection
Transaction will be committed / rolled back by the caller
func (*PgMQ) Pop ¶
Pop reads single message from the queue and deletes it at the same time. Similar to Read and ReadBatch if no messages are available an ErrNoRows is returned. Unlike these methods, the visibility timeout does not apply. This is because the message is immediately deleted.
func (*PgMQ) PopTX ¶
PopTX creates a transaction, reads single message from the queue and deletes it at the same time. Similar to ReadTX and ReadBatchTX, if no messages are available an ErrNoRows is returned. The visibility timeout does not apply - the message is immediately deleted.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) PopWithTX ¶
PopWithTX receives a transaction, reads single message from the queue and deletes it at the same time. Similar to ReadWithTX and ReadBatchWithTX, if no messages are available an ErrNoRows is returned. The visibility timeout does not apply - the message is immediately deleted.
Transaction will be committed / rolled back by the caller
func (*PgMQ) Read ¶
func (p *PgMQ) Read(ctx context.Context, queue string, vt int64, condition json.RawMessage) (_ *PgMQMessage, err error)
Read a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds. A negative visibility value means using the default visibility value.
func (*PgMQ) ReadBatch ¶
func (p *PgMQ) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64, condition json.RawMessage) (msgs []*PgMQMessage, err error)
ReadBatch reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. A negative visibility value means using the default visibility value.
func (*PgMQ) ReadBatchTX ¶
func (p *PgMQ) ReadBatchTX(ctx context.Context, queue string, vt int64, numMsgs int64, condition json.RawMessage) (tx pgx.Tx, msgs []*PgMQMessage, err error)
ReadBatchTX creates a transaction and reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. If vt is negative, the default visibility duration is used.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) ReadBatchWithTX ¶
func (p *PgMQ) ReadBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, vt int64, numMsgs int64, condition json.RawMessage) (msgs []*PgMQMessage, err error)
ReadBatchWithTX receives a transaction and reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. If vt is negative, the default visibility duration is used.
Transaction will be committed / rolled back by the caller
func (*PgMQ) ReadTX ¶
func (p *PgMQ) ReadTX(ctx context.Context, queue string, vt int64, condition json.RawMessage) (tx pgx.Tx, _ *PgMQMessage, err error)
ReadTX creates a transaction and reads a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds. If vt is negative, the default visibility duration is used.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) ReadWithTX ¶
func (p *PgMQ) ReadWithTX(ctx context.Context, tx pgx.Tx, queue string, vt int64, condition json.RawMessage) (_ *PgMQMessage, err error)
ReadWithTX receives a transaction and reads a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds. If vt is negative, the default visibility duration is used. A transaction is received and used.
Transaction will be committed / rolled back by the caller
func (*PgMQ) Send ¶
func (p *PgMQ) Send(ctx context.Context, queue string, msg json.RawMessage, headers json.RawMessage) (int64, error)
Send sends a single message to a queue. The message id, unique to the queue, is returned.
func (*PgMQ) SendBatch ¶
func (p *PgMQ) SendBatch(ctx context.Context, queue string, msgs []json.RawMessage, headers []json.RawMessage) ([]int64, error)
SendBatch sends a batch of messages to a queue. The message ids, unique to the queue, are returned.
func (*PgMQ) SendBatchTX ¶
func (p *PgMQ) SendBatchTX(ctx context.Context, queue string, msgs []json.RawMessage, headers []json.RawMessage) (tx pgx.Tx, msgIDs []int64, err error)
SendBatchTX starts a transaction and sends a batch of messages to a queue. The transaction and the message ids, unique to the queue, are returned.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) SendBatchWithDelay ¶
func (p *PgMQ) SendBatchWithDelay(ctx context.Context, queue string, msgs []json.RawMessage, headers []json.RawMessage, delay int64) (msgIDs []int64, err error)
SendBatchWithDelay sends a batch of messages to a queue with a delay. The delay is specified in seconds. A negative duration value means using the default visibility value. The message ids, unique to the queue, are returned.
func (*PgMQ) SendBatchWithDelayTX ¶
func (p *PgMQ) SendBatchWithDelayTX(ctx context.Context, queue string, msgs []json.RawMessage, headers []json.RawMessage, delay int64) (tx pgx.Tx, msgIDs []int64, err error)
SendBatchWithDelayTX starts a transaction and sends a batch of messages to a queue with a delay. The delay is specified in seconds; negative delay value means use the default value. The transaction and the message ids, unique to the queue, are returned.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) SendBatchWithTX ¶
func (p *PgMQ) SendBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgs []json.RawMessage, headers []json.RawMessage) (msgIDs []int64, err error)
SendBatchTX receives a transaction and sends a batch of messages to a queue. The message ids, unique to the queue, are returned.
Transaction will be committed / rolled back by the caller
func (*PgMQ) SendBatchWithTXDelay ¶
func (p *PgMQ) SendBatchWithTXDelay(ctx context.Context, tx pgx.Tx, queue string, msgs []json.RawMessage, headers []json.RawMessage, delay int64) (msgIDs []int64, err error)
SendBatchWithDelayTX receives a transaction and sends a batch of messages to a queue with a delay. The delay is specified in seconds; negative delay value means use the default value. The message ids, unique to the queue, are returned.
Transaction will be committed / rolled back by the caller
func (*PgMQ) SendTX ¶
func (p *PgMQ) SendTX(ctx context.Context, queue string, msg json.RawMessage, headers json.RawMessage) (pgx.Tx, int64, error)
SendTX starts a transaction and sends a single message to a queue. The transaction and the message id, unique to the queue, are returned.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) SendWithDelay ¶
func (p *PgMQ) SendWithDelay(ctx context.Context, queue string, msg json.RawMessage, headers json.RawMessage, delay int64) (msgID int64, err error)
SendWithDelay sends a single message to a queue with a delay. The delay is specified in seconds. A negative duration value means using the default visibility value. The message id, unique to the queue, is returned.
func (*PgMQ) SendWithDelayTX ¶
func (p *PgMQ) SendWithDelayTX(ctx context.Context, queue string, msg json.RawMessage, headers json.RawMessage, delay int64) (tx pgx.Tx, msgID int64, err error)
SendWithDelayTX starts a transaction and sends a single message to a queue with a delay. The delay is specified in seconds; negative delay value means use the default value. The transaction and the message id, unique to the queue, are returned.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) SendWithTX ¶
func (p *PgMQ) SendWithTX(ctx context.Context, tx pgx.Tx, queue string, msg json.RawMessage, headers json.RawMessage) (int64, error)
SendTX starts a transaction and sends a single message to a queue. The transaction and the message id, unique to the queue, are returned.
Transaction will be committed / rolled back by the caller
func (*PgMQ) SendWithTxDelay ¶
func (p *PgMQ) SendWithTxDelay(ctx context.Context, tx pgx.Tx, queue string, msg json.RawMessage, headers json.RawMessage, delay int64) (msgID int64, err error)
SendWithTxDelay receives a transaction and sends a single message to a queue with a delay. The delay is specified in seconds; negative delay value means use the default value. The message id, unique to the queue, is returned.
Transaction will be committed / rolled back by the caller
func (*PgMQ) SetVT ¶
func (p *PgMQ) SetVT(ctx context.Context, queue string, msgID int64, invTime int64) (_ int64, err error)
SetVT sets a new visibility time for a message from the queue.
If invTime is negative, then the defaultVT value is applied
func (*PgMQ) SetVTTX ¶
func (p *PgMQ) SetVTTX(ctx context.Context, queue string, msgID int64, invTime int64) (tx pgx.Tx, _ int64, err error)
SetVTTX creates a transaction and sets a new visibility time for a message from the queue.
Returned transaction will be committed / rolled back by the caller
func (*PgMQ) SetVTWithTX ¶
func (p *PgMQ) SetVTWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64, invTime int64) (_ int64, err error)
SetVT receives a transaction and sets a new visibility time for a message from the queue.
Transaction will be committed / rolled back by the caller
func (*PgMQ) WithDefaultTxOptions ¶
WithDefaultTxOptions will change the default TX options
func (*PgMQ) WithDefaultVT ¶
WithDefaultVT will change the default visibility time value