Documentation
¶
Index ¶
- Variables
- type BaseSQL
- func (b *BaseSQL) ApplyConfig(ctx context.Context, cfg *Config) error
- func (b *BaseSQL) Close() error
- func (b *BaseSQL) WithReadOnlyTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error
- func (b *BaseSQL) WithSerializableTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error
- func (b *BaseSQL) WithTransaction(ctx context.Context, opts *TxOptions, fn func(tx *sql.Tx) error) error
- type Clock
- type Config
- type LeaseRuntime
- type LeaseRuntimeCalculator
- type QueryBuilder
- func (qb *QueryBuilder) BuildClaimMessageQuery() string
- func (qb *QueryBuilder) BuildCountByStateQuery() string
- func (qb *QueryBuilder) BuildDeleteMessageQuery() string
- func (qb *QueryBuilder) BuildFindEarliestDeadlineQuery() string
- func (qb *QueryBuilder) BuildFindExpiredMessagesQuery() string
- func (qb *QueryBuilder) BuildGetQueueMetadataQuery() string
- func (qb *QueryBuilder) BuildGetScheduledMessagesQuery() string
- func (qb *QueryBuilder) BuildInsertMessageQuery() string
- func (qb *QueryBuilder) BuildInsertQueueQuery() string
- func (qb *QueryBuilder) BuildOldestMessageAgeQuery(_ string) string
- func (qb *QueryBuilder) BuildUpdateMessageStateQuery() string
- func (qb *QueryBuilder) BuildUpdateStateCountersQuery(increment bool, stateKey string) string
- type SQLDialect
- type StateManager
- type TxOptions
Constants ¶
This section is empty.
Variables ¶
var (
ErrMaxExtensionReached = fmt.Errorf("maximum lease extension reached")
)
Common errors
Functions ¶
This section is empty.
Types ¶
type BaseSQL ¶
type BaseSQL struct {
DB *sql.DB
Logger *log.Logger
Serializer *common.ProtoSerializer
KeyManager *keymanager.EncryptionKeyManager
Dialect SQLDialect
Clock *Clock
StateManager *StateManager
LeaseRuntime *LeaseRuntimeCalculator
}
BaseSQL provides common SQL storage functionality shared across SQL backends. This eliminates code duplication between SQLite, Postgres, MySQL, etc.
func NewBaseSQL ¶
func NewBaseSQL( db *sql.DB, logger *log.Logger, keyManager *keymanager.EncryptionKeyManager, dialect SQLDialect, ) *BaseSQL
NewBaseSQL creates a new BaseSQL instance
func (*BaseSQL) ApplyConfig ¶
ApplyConfig applies configuration to the database connection
func (*BaseSQL) WithReadOnlyTransaction ¶
WithReadOnlyTransaction executes fn within a read-only transaction
func (*BaseSQL) WithSerializableTransaction ¶
func (b *BaseSQL) WithSerializableTransaction( ctx context.Context, fn func(tx *sql.Tx) error, ) error
WithSerializableTransaction executes fn within a serializable transaction. Used for operations requiring strict isolation (e.g., message claiming in SQLite).
func (*BaseSQL) WithTransaction ¶
func (b *BaseSQL) WithTransaction( ctx context.Context, opts *TxOptions, fn func(tx *sql.Tx) error, ) error
WithTransaction executes fn within a transaction with proper error handling and rollback. This provides a consistent transaction pattern across all SQL backends.
type Clock ¶
type Clock struct{}
Clock provides timestamp operations for consistency across storage layer
type Config ¶
type Config struct {
// Connection pooling
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime int // seconds
// SQLite-specific
BusyTimeout int // milliseconds
WALMode bool
Synchronous string // NORMAL, FULL, etc.
}
Config holds SQL storage configuration
type LeaseRuntime ¶
type LeaseRuntime struct {
LeaseStartedAt int64
LeaseExpiry int64
LastHeartbeatAt int64
HeartbeatExpiry int64
LeaseExtensionUsed int64
}
LeaseRuntime represents the calculated runtime values for a message lease. All times are Unix milliseconds for consistency with database storage.
type LeaseRuntimeCalculator ¶
type LeaseRuntimeCalculator struct {
// contains filtered or unexported fields
}
LeaseRuntimeCalculator calculates lease expiry times and manages lease extensions. This provides a centralized way to compute lease-related timestamps.
func NewLeaseRuntimeCalculator ¶
func NewLeaseRuntimeCalculator(clock *Clock) *LeaseRuntimeCalculator
NewLeaseRuntimeCalculator creates a new lease runtime calculator
func (*LeaseRuntimeCalculator) CalculateLeaseRuntime ¶
func (lrc *LeaseRuntimeCalculator) CalculateLeaseRuntime(policy *commonpb.LeasePolicy) *LeaseRuntime
CalculateLeaseRuntime computes all lease-related timestamps from a lease policy. This is called when a message transitions to RUNNING state.
func (*LeaseRuntimeCalculator) ExtendLease ¶
func (lrc *LeaseRuntimeCalculator) ExtendLease( policy *commonpb.LeasePolicy, currentExtensionUsed int64, requestedExtensionMs int64, ) (*LeaseRuntime, error)
ExtendLease calculates a new lease expiry when extending a lease. Returns updated lease runtime or error if max extension reached.
func (*LeaseRuntimeCalculator) Heartbeat ¶
func (lrc *LeaseRuntimeCalculator) Heartbeat() *LeaseRuntime
Heartbeat updates the heartbeat expiry for an active lease. Returns updated runtime with new heartbeat timestamp.
type QueryBuilder ¶
type QueryBuilder struct {
// contains filtered or unexported fields
}
QueryBuilder helps construct SQL queries with dialect-specific syntax. This allows the same logical query to work across different SQL databases.
func NewQueryBuilder ¶
func NewQueryBuilder(dialect SQLDialect) *QueryBuilder
NewQueryBuilder creates a new QueryBuilder for the given dialect
func (*QueryBuilder) BuildClaimMessageQuery ¶
func (qb *QueryBuilder) BuildClaimMessageQuery() string
BuildClaimMessageQuery builds a query to claim a PENDING message. Returns different queries based on dialect capabilities.
func (*QueryBuilder) BuildCountByStateQuery ¶
func (qb *QueryBuilder) BuildCountByStateQuery() string
BuildCountByStateQuery builds a query to count messages by state
func (*QueryBuilder) BuildDeleteMessageQuery ¶
func (qb *QueryBuilder) BuildDeleteMessageQuery() string
BuildDeleteMessageQuery builds a query to delete a message
func (*QueryBuilder) BuildFindEarliestDeadlineQuery ¶
func (qb *QueryBuilder) BuildFindEarliestDeadlineQuery() string
BuildFindEarliestDeadlineQuery builds a query to find the earliest lease expiry
func (*QueryBuilder) BuildFindExpiredMessagesQuery ¶
func (qb *QueryBuilder) BuildFindExpiredMessagesQuery() string
BuildFindExpiredMessagesQuery builds a query to find messages with expired leases
func (*QueryBuilder) BuildGetQueueMetadataQuery ¶
func (qb *QueryBuilder) BuildGetQueueMetadataQuery() string
BuildGetQueueMetadataQuery builds a query to retrieve queue metadata
func (*QueryBuilder) BuildGetScheduledMessagesQuery ¶
func (qb *QueryBuilder) BuildGetScheduledMessagesQuery() string
BuildGetScheduledMessagesQuery builds a query to find messages ready for activation
func (*QueryBuilder) BuildInsertMessageQuery ¶
func (qb *QueryBuilder) BuildInsertMessageQuery() string
BuildInsertMessageQuery builds a query to insert a new message with idempotency support
func (*QueryBuilder) BuildInsertQueueQuery ¶
func (qb *QueryBuilder) BuildInsertQueueQuery() string
BuildInsertQueueQuery builds a query to insert a new queue
func (*QueryBuilder) BuildOldestMessageAgeQuery ¶
func (qb *QueryBuilder) BuildOldestMessageAgeQuery(_ string) string
BuildOldestMessageAgeQuery returns the minimum created_at for pending messages in a priority range. Used for aging-based priority weighting.
func (*QueryBuilder) BuildUpdateMessageStateQuery ¶
func (qb *QueryBuilder) BuildUpdateMessageStateQuery() string
BuildUpdateMessageStateQuery builds a query to update message state
func (*QueryBuilder) BuildUpdateStateCountersQuery ¶
func (qb *QueryBuilder) BuildUpdateStateCountersQuery(increment bool, stateKey string) string
BuildUpdateStateCountersQuery builds a query to update queue state counters. Uses JSON functions to increment/decrement state counts.
type SQLDialect ¶
type SQLDialect interface {
// Query syntax
Placeholder(n int) string // Positional parameter syntax: $1 (Postgres) vs ? (SQLite/MySQL)
CurrentTimestamp() string // CURRENT_TIMESTAMP vs NOW()
JSONSet() string // json_set (SQLite) vs jsonb_set (Postgres)
JSONSetPath(key string) string // Path argument for JSONSet
JSONExtract() string // json_extract (SQLite) vs jsonb_extract_path_text (Postgres)
JSONExtractPath(key string) string // Path argument for JSONExtract
ToJSON(value string) string // Convert value to JSON: raw value (SQLite) vs to_jsonb() (Postgres)
UnixMillis(column string) string // Extract unix milliseconds from timestamp column
// Type mappings
BlobType() string // BLOB vs BYTEA
TimestampType() string // TIMESTAMP vs TIMESTAMPTZ
BigIntType() string // INTEGER vs BIGINT
JSONType() string // TEXT vs JSONB
// Feature support
SupportsReturning() bool // RETURNING clause support
SupportsSkipLocked() bool // SELECT FOR UPDATE SKIP LOCKED
SupportsAdvisoryLocks() bool // Postgres advisory locks
RequiresSerializableForClaim() bool // Needs SERIALIZABLE isolation for claiming
// Connection configuration
SetWALMode() string // SQLite: PRAGMA journal_mode=WAL
SetBusyTimeout(ms int) string // SQLite: PRAGMA busy_timeout
SetSynchronous(level string) string // PRAGMA synchronous
}
SQLDialect abstracts database-specific SQL syntax and capabilities. Each SQL backend (SQLite, Postgres, MySQL) implements this interface.
type StateManager ¶
type StateManager struct {
// contains filtered or unexported fields
}
StateManager handles message state counter updates for queue statistics. This provides O(1) queue state lookups by maintaining pre-computed counters.
func NewStateManager ¶
func NewStateManager(dialect SQLDialect) *StateManager
NewStateManager creates a new StateManager instance
func (*StateManager) GetStateCounts ¶
func (sm *StateManager) GetStateCounts( ctx context.Context, db *sql.DB, queueName string, ) (map[string]int64, error)
GetStateCounts retrieves the current state counts for a queue
func (*StateManager) UpdateCounters ¶
func (sm *StateManager) UpdateCounters( ctx context.Context, tx *sql.Tx, queueName string, oldState, newState messagepb.Message_Metadata_State, ) error
UpdateCounters atomically updates state counters when a message transitions. This is called within a transaction to ensure consistency.
type TxOptions ¶
type TxOptions struct {
Isolation sql.IsolationLevel
Timeout time.Duration
ReadOnly bool
}
TxOptions holds transaction configuration
func DefaultTxOptions ¶
func DefaultTxOptions() *TxOptions
DefaultTxOptions returns default transaction options