Documentation
¶
Index ¶
- func OpenConnection(ctx context.Context, config *ConnectionConfig) (*sql.DB, error)
- type Config
- type ConnectionConfig
- type Dialect
- func (d *Dialect) BigIntType() string
- func (d *Dialect) BlobType() string
- func (d *Dialect) CurrentTimestamp() string
- func (d *Dialect) JSONExtract() string
- func (d *Dialect) JSONExtractPath(key string) string
- func (d *Dialect) JSONSet() string
- func (d *Dialect) JSONSetPath(key string) string
- func (d *Dialect) JSONType() string
- func (d *Dialect) Placeholder(n int) string
- func (d *Dialect) RequiresSerializableForClaim() bool
- func (d *Dialect) SetBusyTimeout(ms int) string
- func (d *Dialect) SetSynchronous(level string) string
- func (d *Dialect) SetWALMode() string
- func (d *Dialect) SupportsAdvisoryLocks() bool
- func (d *Dialect) SupportsReturning() bool
- func (d *Dialect) SupportsSkipLocked() bool
- func (d *Dialect) TimestampType() string
- func (d *Dialect) ToJSON(value string) string
- func (d *Dialect) UnixMillis(column string) string
- type SchemaManager
- func (m *SchemaManager) EnsureVersionTable(ctx context.Context, db *sql.DB) error
- func (m *SchemaManager) GetVersion(ctx context.Context, db *sql.DB) (uint, bool, error)
- func (m *SchemaManager) Initialize(ctx context.Context, db *sql.DB) error
- func (m *SchemaManager) Migrate(ctx context.Context, db *sql.DB, targetVersion uint) error
- func (m *SchemaManager) SetVersion(ctx context.Context, db *sql.DB, version uint, description string) error
- func (m *SchemaManager) Version(ctx context.Context, db *sql.DB) (uint, bool, error)
- type Storage
- func (s *Storage) AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
- func (s *Storage) ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error)
- func (s *Storage) Close() error
- func (s *Storage) CreateQueue(ctx context.Context, queue *queuepb.Queue) error
- func (s *Storage) CreateSchedule(ctx context.Context, schedule *schedulepb.Schedule) error
- func (s *Storage) DeleteDLQMessage(ctx context.Context, queueName string, messageId string) error
- func (s *Storage) DeleteQueue(ctx context.Context, name string) error
- func (s *Storage) DeleteSchedule(ctx context.Context, scheduleId string) error
- func (s *Storage) EnqueueMessage(ctx context.Context, queueName string, message *messagepb.Message) error
- func (s *Storage) ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, ...) error
- func (s *Storage) FindExpiredMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
- func (s *Storage) GetDLQMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
- func (s *Storage) GetQueue(ctx context.Context, name string) (*queuepb.Queue, error)
- func (s *Storage) GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error)
- func (s *Storage) GetSchedule(ctx context.Context, scheduleId string) (*schedulepb.Schedule, error)
- func (s *Storage) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error)
- func (s *Storage) HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
- func (s *Storage) ListQueues(ctx context.Context) ([]*queuepb.Queue, error)
- func (s *Storage) ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error)
- func (s *Storage) NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
- func (s *Storage) PauseSchedule(ctx context.Context, scheduleId string) error
- func (s *Storage) PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
- func (s *Storage) PurgeDLQ(ctx context.Context, queueName string) (int64, error)
- func (s *Storage) ReclaimExpiredMessage(ctx context.Context, queueName string, message *messagepb.Message) error
- func (s *Storage) RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error
- func (s *Storage) ResumeSchedule(ctx context.Context, scheduleId string) error
- func (s *Storage) RetryDLQMessage(ctx context.Context, queueName string, messageId string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OpenConnection ¶
OpenConnection opens and configures a PostgreSQL database connection.
Types ¶
type Config ¶
type Config struct {
// Conn carries connection details (DSN overrides discrete fields when set).
Conn ConnectionConfig
Logger *log.Logger
KeyManager *keymanager.EncryptionKeyManager
}
Config holds Postgres storage configuration
type ConnectionConfig ¶
type ConnectionConfig struct {
DSN string
Host string
Port int
User string
Password string
Database string
SSLMode string
// PostgreSQL Client Certificate Configuration (for mTLS with database)
ClientCertFile string // Path to PostgreSQL client certificate file
ClientKeyFile string // Path to PostgreSQL client key file
RootCertFile string // Path to PostgreSQL root CA certificate file
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
}
ConnectionConfig holds PostgreSQL connection configuration.
func DefaultConnectionConfig ¶
func DefaultConnectionConfig() *ConnectionConfig
DefaultConnectionConfig returns a baseline configuration for local development.
type Dialect ¶
type Dialect struct{}
Dialect implements sql.SQLDialect for PostgreSQL databases.
func (*Dialect) BigIntType ¶
BigIntType returns the PostgreSQL big integer type.
func (*Dialect) CurrentTimestamp ¶
CurrentTimestamp returns the PostgreSQL current timestamp expression.
func (*Dialect) JSONExtract ¶
JSONExtract returns the PostgreSQL JSON extraction function.
func (*Dialect) JSONExtractPath ¶
JSONExtractPath returns a JSON path argument for jsonb_extract_path_text
func (*Dialect) JSONSetPath ¶
JSONSetPath returns a JSON path for jsonb_set (text[] literal)
func (*Dialect) Placeholder ¶
Placeholder returns the PostgreSQL positional parameter syntax.
func (*Dialect) RequiresSerializableForClaim ¶
RequiresSerializableForClaim indicates if SERIALIZABLE isolation is required.
func (*Dialect) SetBusyTimeout ¶
SetBusyTimeout returns an empty string; not applicable to PostgreSQL.
func (*Dialect) SetSynchronous ¶
SetSynchronous returns an empty string; not applicable to PostgreSQL.
func (*Dialect) SetWALMode ¶
SetWALMode returns an empty string; not applicable to PostgreSQL.
func (*Dialect) SupportsAdvisoryLocks ¶
SupportsAdvisoryLocks indicates advisory lock support.
func (*Dialect) SupportsReturning ¶
SupportsReturning indicates RETURNING clause support.
func (*Dialect) SupportsSkipLocked ¶
SupportsSkipLocked indicates SKIP LOCKED support.
func (*Dialect) TimestampType ¶
TimestampType returns the PostgreSQL timestamp type.
func (*Dialect) UnixMillis ¶
UnixMillis extracts unix milliseconds from a timestamp expression.
type SchemaManager ¶
type SchemaManager struct {
// contains filtered or unexported fields
}
SchemaManager handles PostgreSQL schema initialization and versioning.
func NewSchemaManager ¶
func NewSchemaManager() *SchemaManager
NewSchemaManager creates a new PostgreSQL schema manager.
func (*SchemaManager) EnsureVersionTable ¶
func (*SchemaManager) GetVersion ¶
func (*SchemaManager) Initialize ¶
Initialize creates the initial schema if no version is present.
func (*SchemaManager) SetVersion ¶
type Storage ¶
type Storage struct {
*repositorysql.BaseSQL
// contains filtered or unexported fields
}
Storage implements the persistence.Storage interface for Postgres
func NewStorage ¶
NewStorage creates a new Postgres storage instance
func (*Storage) AcknowledgeMessage ¶
func (s *Storage) AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
AcknowledgeMessage marks a message as completed.
func (*Storage) ClaimMessage ¶
func (s *Storage) ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error)
ClaimMessage claims the next available message from a queue.
func (*Storage) CreateQueue ¶
func (*Storage) CreateSchedule ¶
func (*Storage) DeleteDLQMessage ¶
DeleteDLQMessage permanently deletes a message from DLQ.
func (*Storage) DeleteQueue ¶
DeleteQueue deletes a queue.
func (*Storage) DeleteSchedule ¶
DeleteSchedule deletes a schedule.
func (*Storage) EnqueueMessage ¶
func (*Storage) ExtendMessageLease ¶
func (s *Storage) ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, extensionMs int64) error
ExtendMessageLease extends the lease on a message.
func (*Storage) FindExpiredMessages ¶
func (*Storage) GetDLQMessages ¶
func (*Storage) GetQueueMetadata ¶
func (s *Storage) GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error)
GetQueueMetadata retrieves queue metadata by name.
func (*Storage) GetSchedule ¶
GetSchedule retrieves a schedule by ID.
func (*Storage) GetScheduleHistory ¶
func (s *Storage) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error)
GetScheduleHistory returns the execution history for a schedule
func (*Storage) HeartbeatMessage ¶
func (s *Storage) HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
HeartbeatMessage updates the heartbeat for a message.
func (*Storage) ListQueues ¶
ListQueues returns all queues.
func (*Storage) ListSchedules ¶
func (s *Storage) ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error)
ListSchedules returns schedules for a queue.
func (*Storage) NackMessage ¶
func (s *Storage) NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
NackMessage marks a message as failed.
func (*Storage) PauseSchedule ¶
PauseSchedule pauses a schedule.
func (*Storage) PeekMessages ¶
func (s *Storage) PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
PeekMessages retrieves messages without claiming them.
func (*Storage) PurgeDLQ ¶
PurgeDLQ deletes all errored messages for a queue and returns the count deleted.
func (*Storage) ReclaimExpiredMessage ¶
func (s *Storage) ReclaimExpiredMessage(ctx context.Context, queueName string, message *messagepb.Message) error
ReclaimExpiredMessage moves an expired message back to pending or DLQ.
func (*Storage) RecordScheduleExecution ¶
func (s *Storage) RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error
RecordScheduleExecution records a schedule execution.
func (*Storage) ResumeSchedule ¶
ResumeSchedule resumes a paused schedule.