Documentation
¶
Index ¶
- func OpenConnection(ctx context.Context, config *ConnectionConfig) (*sql.DB, error)
- type Config
- type ConnectionConfig
- type Dialect
- func (d *Dialect) BigIntType() string
- func (d *Dialect) BlobType() string
- func (d *Dialect) CurrentTimestamp() string
- func (d *Dialect) JSONExtract() string
- func (d *Dialect) JSONExtractPath(key string) string
- func (d *Dialect) JSONSet() string
- func (d *Dialect) JSONSetPath(key string) string
- func (d *Dialect) JSONType() string
- func (d *Dialect) Placeholder(n int) string
- func (d *Dialect) RequiresSerializableForClaim() bool
- func (d *Dialect) SetBusyTimeout(ms int) string
- func (d *Dialect) SetSynchronous(level string) string
- func (d *Dialect) SetWALMode() string
- func (d *Dialect) SupportsAdvisoryLocks() bool
- func (d *Dialect) SupportsReturning() bool
- func (d *Dialect) SupportsSkipLocked() bool
- func (d *Dialect) TimestampType() string
- func (d *Dialect) ToJSON(value string) string
- func (d *Dialect) UnixMillis(column string) string
- type SchemaManager
- func (m *SchemaManager) EnsureVersionTable(ctx context.Context, db *sql.DB) error
- func (m *SchemaManager) GetVersion(ctx context.Context, db *sql.DB) (uint, bool, error)
- func (m *SchemaManager) Initialize(ctx context.Context, db *sql.DB) error
- func (m *SchemaManager) Migrate(ctx context.Context, db *sql.DB, targetVersion uint) error
- func (m *SchemaManager) SetVersion(ctx context.Context, db *sql.DB, version uint, description string) error
- func (m *SchemaManager) Version(ctx context.Context, db *sql.DB) (uint, bool, error)
- type Storage
- func (s *Storage) AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
- func (s *Storage) ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error)
- func (s *Storage) Close() error
- func (s *Storage) CreateQueue(ctx context.Context, queue *queuepb.Queue) error
- func (s *Storage) CreateSchedule(ctx context.Context, schedule *schedulepb.Schedule) error
- func (s *Storage) DeleteDLQMessage(ctx context.Context, queueName string, messageId string) error
- func (s *Storage) DeleteQueue(ctx context.Context, name string) error
- func (s *Storage) DeleteSchedule(ctx context.Context, scheduleId string) error
- func (s *Storage) EnqueueMessage(ctx context.Context, queueName string, message *messagepb.Message) error
- func (s *Storage) ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, ...) error
- func (s *Storage) FindExpiredMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
- func (s *Storage) GetDLQMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
- func (s *Storage) GetQueue(ctx context.Context, name string) (*queuepb.Queue, error)
- func (s *Storage) GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error)
- func (s *Storage) GetSchedule(ctx context.Context, scheduleId string) (*schedulepb.Schedule, error)
- func (s *Storage) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error)
- func (s *Storage) HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
- func (s *Storage) ListQueues(ctx context.Context) ([]*queuepb.Queue, error)
- func (s *Storage) ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error)
- func (s *Storage) NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
- func (s *Storage) PauseSchedule(ctx context.Context, scheduleId string) error
- func (s *Storage) PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
- func (s *Storage) PurgeDLQ(ctx context.Context, queueName string) (int64, error)
- func (s *Storage) ReclaimExpiredMessage(ctx context.Context, queueName string, message *messagepb.Message) error
- func (s *Storage) RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error
- func (s *Storage) ResumeSchedule(ctx context.Context, scheduleId string) error
- func (s *Storage) RetryDLQMessage(ctx context.Context, queueName string, messageId string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OpenConnection ¶
OpenConnection opens and configures a SQLite database connection
Types ¶
type Config ¶
type Config struct {
Path string
Logger *log.Logger
KeyManager *keymanager.EncryptionKeyManager
}
Config holds SQLite storage configuration
type ConnectionConfig ¶
type ConnectionConfig struct {
Path string // Database file path or ":memory:"
MaxOpenConns int // Maximum number of open connections (default: 1 for SQLite)
MaxIdleConns int // Maximum number of idle connections
ConnMaxLifetime time.Duration // Maximum connection lifetime
BusyTimeout time.Duration // Busy timeout (default: 30s)
Synchronous string // Synchronous mode: OFF, NORMAL, FULL (default: NORMAL)
}
ConnectionConfig holds SQLite connection configuration
func DefaultConnectionConfig ¶
func DefaultConnectionConfig(path string) *ConnectionConfig
DefaultConnectionConfig returns default SQLite connection configuration
type Dialect ¶
type Dialect struct{}
Dialect implements sql.SQLDialect for SQLite databases. It provides SQLite-specific SQL syntax and capabilities.
func (*Dialect) BigIntType ¶
BigIntType returns the SQLite integer type (SQLite uses INTEGER for all integers)
func (*Dialect) CurrentTimestamp ¶
CurrentTimestamp returns the SQLite current timestamp function
func (*Dialect) JSONExtract ¶
JSONExtract returns the SQLite JSON extraction function
func (*Dialect) JSONExtractPath ¶
JSONExtractPath returns a JSON path argument for json_extract
func (*Dialect) JSONSetPath ¶
JSONSetPath returns a JSON path for json_set
func (*Dialect) Placeholder ¶
Placeholder returns the SQLite positional parameter syntax
func (*Dialect) RequiresSerializableForClaim ¶
RequiresSerializableForClaim indicates if SERIALIZABLE isolation is needed for claiming
func (*Dialect) SetBusyTimeout ¶
SetBusyTimeout returns the PRAGMA to set busy timeout
func (*Dialect) SetSynchronous ¶
SetSynchronous returns the PRAGMA to set synchronous mode
func (*Dialect) SetWALMode ¶
SetWALMode returns the PRAGMA to enable WAL mode
func (*Dialect) SupportsAdvisoryLocks ¶
SupportsAdvisoryLocks indicates if advisory locks are supported
func (*Dialect) SupportsReturning ¶
SupportsReturning indicates if RETURNING clause is supported
func (*Dialect) SupportsSkipLocked ¶
SupportsSkipLocked indicates if SELECT FOR UPDATE SKIP LOCKED is supported
func (*Dialect) TimestampType ¶
TimestampType returns the SQLite timestamp type
func (*Dialect) ToJSON ¶
ToJSON converts a value to JSON (SQLite json_set accepts raw values directly)
func (*Dialect) UnixMillis ¶
UnixMillis extracts unix milliseconds from a timestamp column
type SchemaManager ¶
type SchemaManager struct {
// contains filtered or unexported fields
}
func NewSchemaManager ¶
func NewSchemaManager() *SchemaManager
func (*SchemaManager) EnsureVersionTable ¶
func (*SchemaManager) GetVersion ¶
func (*SchemaManager) Initialize ¶
func (*SchemaManager) SetVersion ¶
type Storage ¶
type Storage struct {
*repositorysql.BaseSQL
// contains filtered or unexported fields
}
Storage implements the persistence.Storage interface for SQLite
func NewStorage ¶
NewStorage creates a new SQLite storage instance
func (*Storage) AcknowledgeMessage ¶
func (s *Storage) AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
AcknowledgeMessage marks a message as completed
func (*Storage) ClaimMessage ¶
func (s *Storage) ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error)
ClaimMessage claims the next available message from a queue
func (*Storage) CreateQueue ¶
CreateQueue creates a new queue
func (*Storage) CreateSchedule ¶
func (*Storage) DeleteDLQMessage ¶
DeleteDLQMessage permanently deletes a message from DLQ
func (*Storage) DeleteQueue ¶
DeleteQueue deletes a queue
func (*Storage) DeleteSchedule ¶
DeleteSchedule deletes a schedule
func (*Storage) EnqueueMessage ¶
func (*Storage) ExtendMessageLease ¶
func (s *Storage) ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, extensionMs int64) error
ExtendMessageLease extends the lease on a message
func (*Storage) FindExpiredMessages ¶
func (*Storage) GetDLQMessages ¶
func (*Storage) GetQueueMetadata ¶
func (s *Storage) GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error)
GetQueueMetadata retrieves queue metadata by name
func (*Storage) GetSchedule ¶
GetSchedule retrieves a schedule by ID
func (*Storage) GetScheduleHistory ¶
func (s *Storage) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error)
GetScheduleHistory returns the execution history for a schedule
func (*Storage) HeartbeatMessage ¶
func (s *Storage) HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
HeartbeatMessage updates the heartbeat for a message
func (*Storage) ListQueues ¶
ListQueues returns all queues
func (*Storage) ListSchedules ¶
func (s *Storage) ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error)
ListSchedules returns schedules for a queue
func (*Storage) NackMessage ¶
func (s *Storage) NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
NackMessage marks a message as failed
func (*Storage) PauseSchedule ¶
PauseSchedule pauses a schedule
func (*Storage) PeekMessages ¶
func (s *Storage) PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
PeekMessages retrieves messages without claiming them
func (*Storage) ReclaimExpiredMessage ¶
func (s *Storage) ReclaimExpiredMessage(ctx context.Context, queueName string, message *messagepb.Message) error
ReclaimExpiredMessage moves an expired message back to pending or DLQ
func (*Storage) RecordScheduleExecution ¶
func (s *Storage) RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error
RecordScheduleExecution records a schedule execution
func (*Storage) ResumeSchedule ¶
ResumeSchedule resumes a paused schedule