Documentation
¶
Index ¶
- Constants
- Variables
- func GetConnectionPool(dbConfig config.RelationalDatabaseConfig, migrationConf *MigrationConfig, ...) (*sql.DB, error)
- func GetDefaultCacheTTLDuration() time.Duration
- func SetupForDeliveryJobTestsWithOptions(options *DeliveryJobSetupOptions) []*data.Consumer
- func SetupMessageDependencyFixture(producerRepo ProducerRepository, channelRepo ChannelRepository, ...) (*data.Producer, *data.Channel, []*data.Consumer)
- func SetupPruneableMessageFixture(dataAccessor DataAccessor, channel *data.Channel, producer *data.Producer, ...) (*data.Message, []*data.DeliveryJob)
- type AppDBRepository
- type AppRepository
- type CacheItem
- type CachedChannelRepository
- func (repo *CachedChannelRepository) Close()
- func (repo *CachedChannelRepository) Get(channelID string) (*data.Channel, error)
- func (repo *CachedChannelRepository) GetList(page *data.Pagination) ([]*data.Channel, *data.Pagination, error)
- func (repo *CachedChannelRepository) Store(channel *data.Channel) (*data.Channel, error)
- type CachedConsumerRepository
- func (repo *CachedConsumerRepository) Close()
- func (repo *CachedConsumerRepository) Delete(consumer *data.Consumer) error
- func (repo *CachedConsumerRepository) Get(channelID string, consumerID string) (*data.Consumer, error)
- func (repo *CachedConsumerRepository) GetByID(id string) (*data.Consumer, error)
- func (repo *CachedConsumerRepository) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error)
- func (repo *CachedConsumerRepository) Store(consumer *data.Consumer) (*data.Consumer, error)
- type CachedProducerRepository
- func (repo *CachedProducerRepository) Close()
- func (repo *CachedProducerRepository) Get(producerID string) (*data.Producer, error)
- func (repo *CachedProducerRepository) GetList(page *data.Pagination) ([]*data.Producer, *data.Pagination, error)
- func (repo *CachedProducerRepository) Store(producer *data.Producer) (*data.Producer, error)
- type ChannelDBRepository
- type ChannelRepository
- type Channel_ID
- type ConsumerDBRepository
- func (consumerRepo *ConsumerDBRepository) Delete(consumer *data.Consumer) error
- func (consumerRepo *ConsumerDBRepository) Get(channelID string, consumerID string) (consumer *data.Consumer, err error)
- func (consumerRepo *ConsumerDBRepository) GetByID(id string) (consumer *data.Consumer, err error)
- func (consumerRepo *ConsumerDBRepository) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error)
- func (consumerRepo *ConsumerDBRepository) Store(consumer *data.Consumer) (*data.Consumer, error)
- type ConsumerRepository
- type Consumer_ID
- type ContextKey
- type DataAccessor
- type DeliveryJobDBRepository
- func (djRepo *DeliveryJobDBRepository) DeleteJobsForMessage(message *data.Message) error
- func (djRepo *DeliveryJobDBRepository) DispatchMessage(message *data.Message, deliveryJobs ...*data.DeliveryJob) (err error)
- func (djRepo *DeliveryJobDBRepository) GetByID(id string) (job *data.DeliveryJob, err error)
- func (djRepo *DeliveryJobDBRepository) GetJobStatusCountsGroupedByConsumer() (map[Channel_ID]map[Consumer_ID][]*data.StatusCount[data.JobStatus], error)
- func (djRepo *DeliveryJobDBRepository) GetJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
- func (djRepo *DeliveryJobDBRepository) GetJobsForMessage(message *data.Message, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
- func (djRepo *DeliveryJobDBRepository) GetJobsInflightSince(delta time.Duration) []*data.DeliveryJob
- func (djRepo *DeliveryJobDBRepository) GetJobsReadyForInflightSince(delta time.Duration, retryThreshold int) []*data.DeliveryJob
- func (djRepo *DeliveryJobDBRepository) GetPrioritizedJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, pageSize int) ([]*data.DeliveryJob, error)
- func (djRepo *DeliveryJobDBRepository) MarkDeadJobAsInflight(deliveryJob *data.DeliveryJob) (err error)
- func (djRepo *DeliveryJobDBRepository) MarkJobDead(deliveryJob *data.DeliveryJob) error
- func (djRepo *DeliveryJobDBRepository) MarkJobDelivered(deliveryJob *data.DeliveryJob) error
- func (djRepo *DeliveryJobDBRepository) MarkJobInflight(deliveryJob *data.DeliveryJob) error
- func (djRepo *DeliveryJobDBRepository) MarkJobRetry(deliveryJob *data.DeliveryJob, earliestDelta time.Duration) (err error)
- func (djRepo *DeliveryJobDBRepository) MarkQueuedJobAsDead(deliveryJob *data.DeliveryJob) error
- func (djRepo *DeliveryJobDBRepository) RequeueDeadJob(job *data.DeliveryJob) (err error)
- func (djRepo *DeliveryJobDBRepository) RequeueDeadJobsForConsumer(consumer *data.Consumer) (err error)
- type DeliveryJobRepository
- type DeliveryJobSetupOptions
- type LockDBRepository
- type LockRepository
- type MemoryCache
- type MessageDBRepository
- func (msgRepo *MessageDBRepository) Create(message *data.Message) (err error)
- func (msgRepo *MessageDBRepository) DeleteMessage(message *data.Message) error
- func (msgRepo *MessageDBRepository) DeleteMessagesAndJobs(ctx context.Context, messageIDs []string) error
- func (msgRepo *MessageDBRepository) Get(channelID string, messageID string) (*data.Message, error)
- func (msgRepo *MessageDBRepository) GetByID(id string) (*data.Message, error)
- func (msgRepo *MessageDBRepository) GetByIDs(ids []string) ([]*data.Message, error)
- func (msgRepo *MessageDBRepository) GetMessageStatusCountsByChannel(channelID string) ([]*data.StatusCount[data.MsgStatus], error)
- func (msgRepo *MessageDBRepository) GetMessagesForChannel(channelID string, page *data.Pagination, statusFilters ...data.MsgStatus) ([]*data.Message, *data.Pagination, error)
- func (msgRepo *MessageDBRepository) GetMessagesFromBeforeDurationThatAreCompletelyDelivered(delta time.Duration, absoluteMaxMessages int) []*data.Message
- func (msgRepo *MessageDBRepository) GetMessagesNotDispatchedForCertainPeriod(delta time.Duration) []*data.Message
- func (msgRepo *MessageDBRepository) SetDispatched(txContext context.Context, message *data.Message) error
- type MessageRepository
- type MigrationConfig
- type ProducerDBRepository
- type ProducerRepository
- type PseudoChannelRepository
- type PseudoConsumerRepository
- type PseudoProducerRepository
- type RelationalDBDataAccessor
- func (rdbmsDataAccessor *RelationalDBDataAccessor) Close()
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetAppRepository() AppRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetChannelRepository() ChannelRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetConsumerRepository() ConsumerRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetDeliveryJobRepository() DeliveryJobRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetLockRepository() LockRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetMessageRepository() MessageRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetProducerRepository() ProducerRepository
Constants ¶
const ( LIMIT_25_SUFFIX limitOption = " LIMIT 25" LIMIT_50_SUFFIX limitOption = " LIMIT 50" LIMIT_100_SUFFIX limitOption = " LIMIT 100" LIMIT_500_SUFFIX limitOption = " LIMIT 500" )
Variables ¶
var ( // ErrNoLock is returned when no lock is passed to try or release function ErrNoLock = errors.New("no lock provided") // ErrAlreadyLocked is returned when lock already exists in repo ErrAlreadyLocked = errors.New("lock already attained by someone else") )
var ( // ErrDuplicateMessageIDForChannel represents when the a message with same ID already exists ErrDuplicateMessageIDForChannel = errors.New("duplicate message id for channel") // ErrNoTxInContext represents the case where transaction is not passed in the context ErrNoTxInContext = errors.New("no tx value in content") )
var ( // ErrOptimisticAppInit represents the Error when optimistically update fails to start app init ErrOptimisticAppInit = errors.New(optimisticLockInitAppErrMsg) // ErrOptimisticAppComplete represents the Error when app complete attempted from not initializing state ErrOptimisticAppComplete = errors.New(optimisticLockCompleteAppErrMsg) // ErrAppInitializing is returned when app is being initialized by another thread. ErrAppInitializing = errors.New("app is in initializing") // ErrNoDataChangeFromInitialized is returned when initialization is attempted without any seed data change while app has been initialized ErrNoDataChangeFromInitialized = errors.New("no data change on initialized App") // ErrCompleteWhileNotBeingInitialized is returned when complete is called without being initialized ErrCompleteWhileNotBeingInitialized = errors.New("app not initializing to complete initializing") // ErrNoRowsUpdated is returned when a UPDATE query does not change any row which is unexpected ErrNoRowsUpdated = errors.New("no rows updated on UPDATE query") // ErrInvalidStateToSave is returned when a data is not in a state we can send it to the repo as ErrInvalidStateToSave = errors.New("data model in invalid state to be stored") // ErrPaginationDeadlock is returned if both after and before is provided in pagination ErrPaginationDeadlock = errors.New("can not decide on pagination direction! Both after and before provided or pagination is nil") )
var ( // ErrDBConnectionNeverInitialized is returned when same NewDataAccessor is called the first time and it failed to connec to DB; in all subsequent calls the accessor will remain nil ErrDBConnectionNeverInitialized = errors.New("DB Connection never initialized") // RDBMSStorageInternalInjector injector for data storage related implementation RDBMSStorageInternalInjector = wire.NewSet(GetConnectionPool, GetDefaultCacheTTLDuration, NewLockRepository, NewAppRepository, NewProducerRepository, NewCachedProducerRepository, NewChannelRepository, NewCachedChannelRepository, NewConsumerRepository, NewCachedConsumerRepository, NewMessageRepository, NewDeliveryJobRepository, wire.Struct(new(RelationalDBDataAccessor), "db", "appRepository", "producerRepository", "channelRepository", "consumerRepository", "messageRepository", "deliveryJobRepository", "lockRepository"), wire.Bind(new(DataAccessor), new(*RelationalDBDataAccessor))) )
Functions ¶
func GetConnectionPool ¶
func GetConnectionPool(dbConfig config.RelationalDatabaseConfig, migrationConf *MigrationConfig, seedDataConfig config.SeedDataConfig) (*sql.DB, error)
GetConnectionPool Gets the DB Connection Pool for the App
func SetupForDeliveryJobTestsWithOptions ¶
func SetupForDeliveryJobTestsWithOptions(options *DeliveryJobSetupOptions) []*data.Consumer
func SetupMessageDependencyFixture ¶
func SetupMessageDependencyFixture(producerRepo ProducerRepository, channelRepo ChannelRepository, consumerRepo ConsumerRepository, consumerIDPrefix string) (*data.Producer, *data.Channel, []*data.Consumer)
Types ¶
type AppDBRepository ¶
type AppDBRepository struct {
// contains filtered or unexported fields
}
AppDBRepository is the repository to access App data
func (*AppDBRepository) CompleteAppInit ¶
func (appRep *AppDBRepository) CompleteAppInit() error
CompleteAppInit stores that App initialization completed; it will return error if app is not in initializing state before the update is made
func (*AppDBRepository) GetApp ¶
func (appRep *AppDBRepository) GetApp() (*data.App, error)
GetApp retrieves the App from storage, it will never return nil
func (*AppDBRepository) InitAppData ¶
func (appRep *AppDBRepository) InitAppData(seedData *config.SeedData) error
InitAppData initializes only and only if none present in DB with status NotInitialized. Error if insertion fails.
func (*AppDBRepository) StartAppInit ¶
func (appRep *AppDBRepository) StartAppInit(seedData *config.SeedData) error
StartAppInit stores state that App initialization started. It will return error if App is in Initializing state or if data hash is equal and app in initialized state
type AppRepository ¶
type AppRepository interface { GetApp() (*data.App, error) StartAppInit(data *config.SeedData) error CompleteAppInit() error }
AppRepository allows storage operation interaction for App
func NewAppRepository ¶
func NewAppRepository(db *sql.DB) AppRepository
NewAppRepository retrieves App Repository
type CacheItem ¶
type CacheItem[K comparable, V any] struct { Value V Expiration time.Time }
CacheItem represents a cached item with its expiration time.
type CachedChannelRepository ¶
type CachedChannelRepository struct {
// contains filtered or unexported fields
}
CachedChannelRepository is a decorator for ChannelRepository that caches channel data.
func (*CachedChannelRepository) Close ¶
func (repo *CachedChannelRepository) Close()
Close closes the underlying cache
func (*CachedChannelRepository) Get ¶
func (repo *CachedChannelRepository) Get(channelID string) (*data.Channel, error)
Get retrieves a channel by ID, first checking the cache.
func (*CachedChannelRepository) GetList ¶
func (repo *CachedChannelRepository) GetList(page *data.Pagination) ([]*data.Channel, *data.Pagination, error)
GetList retrieves the list of channel based on pagination params supplied. It will return a error if both after and before is present at the same time
type CachedConsumerRepository ¶
type CachedConsumerRepository struct {
// contains filtered or unexported fields
}
CachedConsumerRepository is a decorator for ConsumerRepository that caches consumer data.
func (*CachedConsumerRepository) Close ¶
func (repo *CachedConsumerRepository) Close()
func (*CachedConsumerRepository) Delete ¶
func (repo *CachedConsumerRepository) Delete(consumer *data.Consumer) error
Delete delegates deleting to the underlying repository and invalidates the cache.
func (*CachedConsumerRepository) Get ¶
func (repo *CachedConsumerRepository) Get(channelID string, consumerID string) (*data.Consumer, error)
Get retrieves a consumer by ID, first checking the cache.
func (*CachedConsumerRepository) GetByID ¶
func (repo *CachedConsumerRepository) GetByID(id string) (*data.Consumer, error)
GetByID retrieves a consumer by its ID from the cache or underlying repository
func (*CachedConsumerRepository) GetList ¶
func (repo *CachedConsumerRepository) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error)
GetList retrieves the list of consumers based on pagination params supplied. It will return an error if both after and before are present at the same time.
type CachedProducerRepository ¶
type CachedProducerRepository struct {
// contains filtered or unexported fields
}
CachedProducerRepository is a decorator for ProducerRepository that caches producer data.
func (*CachedProducerRepository) Close ¶
func (repo *CachedProducerRepository) Close()
Close closes the underlying cache.
func (*CachedProducerRepository) Get ¶
func (repo *CachedProducerRepository) Get(producerID string) (*data.Producer, error)
Get retrieves a producer by ID, first checking the cache.
func (*CachedProducerRepository) GetList ¶
func (repo *CachedProducerRepository) GetList(page *data.Pagination) ([]*data.Producer, *data.Pagination, error)
GetList retrieves the list of producers based on pagination params supplied. It delegates directly to the underlying repository as caching lists is more complex and requires invalidation strategies that are beyond the scope of this simple example.
type ChannelDBRepository ¶
type ChannelDBRepository struct {
// contains filtered or unexported fields
}
ChannelDBRepository channel repository implementation for RDBMS
func (*ChannelDBRepository) Get ¶
func (repo *ChannelDBRepository) Get(channelID string) (*data.Channel, error)
Get retrieves the channel with matching channel id
func (*ChannelDBRepository) GetList ¶
func (repo *ChannelDBRepository) GetList(page *data.Pagination) ([]*data.Channel, *data.Pagination, error)
GetList retrieves the list of channel based on pagination params supplied. It will return a error if both after and before is present at the same time
type ChannelRepository ¶
type ChannelRepository interface { Store(channel *data.Channel) (*data.Channel, error) Get(channelID string) (*data.Channel, error) GetList(page *data.Pagination) ([]*data.Channel, *data.Pagination, error) }
ChannelRepository allows storage operation interaction for Channel
func NewCachedChannelRepository ¶
func NewCachedChannelRepository(delegate PseudoChannelRepository, ttl time.Duration) ChannelRepository
NewCachedChannelRepository creates a new CachedChannelRepository.
type Channel_ID ¶
type Channel_ID string
type ConsumerDBRepository ¶
type ConsumerDBRepository struct {
// contains filtered or unexported fields
}
ConsumerDBRepository is the RDBMS implementation for ConsumerRepository
func (*ConsumerDBRepository) Delete ¶
func (consumerRepo *ConsumerDBRepository) Delete(consumer *data.Consumer) error
Delete deletes consumer from DB
func (*ConsumerDBRepository) Get ¶
func (consumerRepo *ConsumerDBRepository) Get(channelID string, consumerID string) (consumer *data.Consumer, err error)
Get retrieves consumer for specific consumer, error if either consumer or channel does not exist
func (*ConsumerDBRepository) GetByID ¶
func (consumerRepo *ConsumerDBRepository) GetByID(id string) (consumer *data.Consumer, err error)
GetByID retrieves a consumer by its ID
func (*ConsumerDBRepository) GetList ¶
func (consumerRepo *ConsumerDBRepository) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error)
GetList retrieves consumers for specific consumer; return error if channel does not exist
type ConsumerRepository ¶
type ConsumerRepository interface { Store(consumer *data.Consumer) (*data.Consumer, error) Delete(consumer *data.Consumer) error Get(channelID string, consumerID string) (*data.Consumer, error) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error) GetByID(id string) (*data.Consumer, error) }
ConsumerRepository allows storage operation interaction for Consumer
func NewCachedConsumerRepository ¶
func NewCachedConsumerRepository(delegate PseudoConsumerRepository, ttl time.Duration) ConsumerRepository
NewCachedConsumerRepository creates a new CachedConsumerRepository.
type Consumer_ID ¶
type Consumer_ID string
type DataAccessor ¶
type DataAccessor interface { GetAppRepository() AppRepository GetProducerRepository() ProducerRepository GetChannelRepository() ChannelRepository GetConsumerRepository() ConsumerRepository GetMessageRepository() MessageRepository GetDeliveryJobRepository() DeliveryJobRepository GetLockRepository() LockRepository Close() }
DataAccessor is the facade to all the data repository
func GetNewDataAccessor ¶
func GetNewDataAccessor(dbConfig config.RelationalDatabaseConfig, migrationConf *MigrationConfig, seedDataConfig config.SeedDataConfig) (DataAccessor, error)
GetNewDataAccessor provides the facade for accessing all the object repositories
type DeliveryJobDBRepository ¶
type DeliveryJobDBRepository struct {
// contains filtered or unexported fields
}
DeliveryJobDBRepository is the DeliveryJobRepository's RDBMS implementation
func (*DeliveryJobDBRepository) DeleteJobsForMessage ¶
func (djRepo *DeliveryJobDBRepository) DeleteJobsForMessage(message *data.Message) error
func (*DeliveryJobDBRepository) DispatchMessage ¶
func (djRepo *DeliveryJobDBRepository) DispatchMessage(message *data.Message, deliveryJobs ...*data.DeliveryJob) (err error)
DispatchMessage saves the delivery jobs and updates the message status in one atomic state
func (*DeliveryJobDBRepository) GetByID ¶
func (djRepo *DeliveryJobDBRepository) GetByID(id string) (job *data.DeliveryJob, err error)
GetByID loads the delivery job with specified id if it exists, else returns an error
func (*DeliveryJobDBRepository) GetJobStatusCountsGroupedByConsumer ¶
func (djRepo *DeliveryJobDBRepository) GetJobStatusCountsGroupedByConsumer() (map[Channel_ID]map[Consumer_ID][]*data.StatusCount[data.JobStatus], error)
func (*DeliveryJobDBRepository) GetJobsForConsumer ¶
func (djRepo *DeliveryJobDBRepository) GetJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
GetJobsForConsumer retrieves DeliveryJob created for delivery to a customer and it has to be filtered by a specific status
func (*DeliveryJobDBRepository) GetJobsForMessage ¶
func (djRepo *DeliveryJobDBRepository) GetJobsForMessage(message *data.Message, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
GetJobsForMessage retrieves jobs created for a specific message
func (*DeliveryJobDBRepository) GetJobsInflightSince ¶
func (djRepo *DeliveryJobDBRepository) GetJobsInflightSince(delta time.Duration) []*data.DeliveryJob
GetJobsInflightSince retrieves jobs in inflight status since the delta duration
func (*DeliveryJobDBRepository) GetJobsReadyForInflightSince ¶
func (djRepo *DeliveryJobDBRepository) GetJobsReadyForInflightSince(delta time.Duration, retryThreshold int) []*data.DeliveryJob
GetJobsReadyForInflightSince retrieves jobs in queued status and earliestNextAttemptAt < `now`-delta
func (*DeliveryJobDBRepository) GetPrioritizedJobsForConsumer ¶
func (djRepo *DeliveryJobDBRepository) GetPrioritizedJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, pageSize int) ([]*data.DeliveryJob, error)
GetPrioritizedJobsForConsumer retrieves DeliveryJob created for delivery to a customer and it has to be filtered by a specific status and ordered by message priority
func (*DeliveryJobDBRepository) MarkDeadJobAsInflight ¶
func (djRepo *DeliveryJobDBRepository) MarkDeadJobAsInflight(deliveryJob *data.DeliveryJob) (err error)
MarkDeadJobAsInflight increases the retry attempt count and sets the status of the job to Queued if the job's current status is Dead in the object and DB; else returns error
func (*DeliveryJobDBRepository) MarkJobDead ¶
func (djRepo *DeliveryJobDBRepository) MarkJobDead(deliveryJob *data.DeliveryJob) error
MarkJobDead sets the status of the job to Dead if the job's current status is Inflight in the object and DB; else returns error
func (*DeliveryJobDBRepository) MarkJobDelivered ¶
func (djRepo *DeliveryJobDBRepository) MarkJobDelivered(deliveryJob *data.DeliveryJob) error
MarkJobDelivered sets the status of the job to Delivered if the job's current status is Inflight in the object and DB; else returns error
func (*DeliveryJobDBRepository) MarkJobInflight ¶
func (djRepo *DeliveryJobDBRepository) MarkJobInflight(deliveryJob *data.DeliveryJob) error
MarkJobInflight sets the status of the job to Inflight if job's current state in the object and DB is Queued; else returns error
func (*DeliveryJobDBRepository) MarkJobRetry ¶
func (djRepo *DeliveryJobDBRepository) MarkJobRetry(deliveryJob *data.DeliveryJob, earliestDelta time.Duration) (err error)
MarkJobRetry increases the retry attempt count and sets the status of the job to Queued if the job's current status is Inflight in the object and DB; else returns error
func (*DeliveryJobDBRepository) MarkQueuedJobAsDead ¶
func (djRepo *DeliveryJobDBRepository) MarkQueuedJobAsDead(deliveryJob *data.DeliveryJob) error
func (*DeliveryJobDBRepository) RequeueDeadJob ¶
func (djRepo *DeliveryJobDBRepository) RequeueDeadJob(job *data.DeliveryJob) (err error)
RequeueDeadJob queues up a dead job
func (*DeliveryJobDBRepository) RequeueDeadJobsForConsumer ¶
func (djRepo *DeliveryJobDBRepository) RequeueDeadJobsForConsumer(consumer *data.Consumer) (err error)
RequeueDeadJobsForConsumer queues up dead jobs for a specific consumer
type DeliveryJobRepository ¶
type DeliveryJobRepository interface { DispatchMessage(message *data.Message, deliveryJobs ...*data.DeliveryJob) error MarkJobInflight(deliveryJob *data.DeliveryJob) error MarkJobDelivered(deliveryJob *data.DeliveryJob) error MarkJobDead(deliveryJob *data.DeliveryJob) error MarkJobRetry(deliveryJob *data.DeliveryJob, earliestDelta time.Duration) error MarkQueuedJobAsDead(deliveryJob *data.DeliveryJob) error MarkDeadJobAsInflight(deliveryJob *data.DeliveryJob) error RequeueDeadJobsForConsumer(consumer *data.Consumer) error GetJobsForMessage(message *data.Message, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error) GetJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error) GetPrioritizedJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, pageSize int) ([]*data.DeliveryJob, error) GetByID(id string) (*data.DeliveryJob, error) GetJobsInflightSince(delta time.Duration) []*data.DeliveryJob GetJobsReadyForInflightSince(delta time.Duration, retryThreshold int) []*data.DeliveryJob DeleteJobsForMessage(message *data.Message) error GetJobStatusCountsGroupedByConsumer() (map[Channel_ID]map[Consumer_ID][]*data.StatusCount[data.JobStatus], error) RequeueDeadJob(job *data.DeliveryJob) (err error) }
DeliveryJobRepository allows storage operations over DeliveryJob
func NewDeliveryJobRepository ¶
func NewDeliveryJobRepository(db *sql.DB, msgRepo MessageRepository, consumerRepo ConsumerRepository) DeliveryJobRepository
NewDeliveryJobRepository creates a new instance of DeliveryJobRepository
type DeliveryJobSetupOptions ¶
type DeliveryJobSetupOptions struct { ConsumerCount int ConsumerIDPrefix string ConsumerRepo ConsumerRepository IgnoreSettingConsumers bool ConsumerChannel *data.Channel }
func (*DeliveryJobSetupOptions) GetConsumerChannel ¶
func (opt *DeliveryJobSetupOptions) GetConsumerChannel() *data.Channel
func (*DeliveryJobSetupOptions) GetConsumerCount ¶
func (opt *DeliveryJobSetupOptions) GetConsumerCount() int
func (*DeliveryJobSetupOptions) GetConsumerIDPrefix ¶
func (opt *DeliveryJobSetupOptions) GetConsumerIDPrefix() string
func (*DeliveryJobSetupOptions) GetConsumerRepo ¶
func (opt *DeliveryJobSetupOptions) GetConsumerRepo() ConsumerRepository
type LockDBRepository ¶
type LockDBRepository struct {
// contains filtered or unexported fields
}
LockDBRepository represents the RDBMS implementation of LockRepository
func (*LockDBRepository) ReleaseLock ¶
func (lockRepo *LockDBRepository) ReleaseLock(lock *data.Lock) (err error)
ReleaseLock tries to release the lock, will return error if no such lock or any error in releasing
func (*LockDBRepository) TimeoutLocks ¶
func (lockRepo *LockDBRepository) TimeoutLocks(threshold time.Duration) (err error)
TimeoutLocks will force release locks that are older than the duration specified from now. Return error if DB called failed
type LockRepository ¶
type LockRepository interface { TryLock(lock *data.Lock) error ReleaseLock(lock *data.Lock) error TimeoutLocks(threshold time.Duration) error }
LockRepository allows storage operations over Lock
func NewLockRepository ¶
func NewLockRepository(db *sql.DB) LockRepository
NewLockRepository creates a new instance of LockRepository
type MemoryCache ¶
type MemoryCache[K comparable, V any] struct { // contains filtered or unexported fields }
MemoryCache is a generic in-memory cache with TTL support.
func NewMemoryCache ¶
func NewMemoryCache[K comparable, V any](ttl time.Duration) *MemoryCache[K, V]
NewMemoryCache creates a new MemoryCache instance. ttl: Time-to-live for cached items. 0 means no expiration.
func (*MemoryCache[K, V]) Close ¶
func (c *MemoryCache[K, V]) Close()
func (*MemoryCache[K, V]) Delete ¶
func (c *MemoryCache[K, V]) Delete(key K)
Delete removes an item from the cache.
func (*MemoryCache[K, V]) Get ¶
func (c *MemoryCache[K, V]) Get(key K) (V, bool)
Get retrieves an item from the cache. Returns the item and a boolean indicating if it was found.
func (*MemoryCache[K, V]) Set ¶
func (c *MemoryCache[K, V]) Set(key K, value V)
Set adds or updates an item in the cache.
type MessageDBRepository ¶
type MessageDBRepository struct {
// contains filtered or unexported fields
}
MessageDBRepository is the MessageRepository implementation
func (*MessageDBRepository) Create ¶
func (msgRepo *MessageDBRepository) Create(message *data.Message) (err error)
Create creates a new message if message.MessageID does not already exist; please ensure QuickFix is called before repo is called
func (*MessageDBRepository) DeleteMessage ¶
func (msgRepo *MessageDBRepository) DeleteMessage(message *data.Message) error
func (*MessageDBRepository) DeleteMessagesAndJobs ¶
func (msgRepo *MessageDBRepository) DeleteMessagesAndJobs(ctx context.Context, messageIDs []string) error
func (*MessageDBRepository) GetByID ¶
func (msgRepo *MessageDBRepository) GetByID(id string) (*data.Message, error)
GetByID retrieves a message by its ID
func (*MessageDBRepository) GetByIDs ¶
func (msgRepo *MessageDBRepository) GetByIDs(ids []string) ([]*data.Message, error)
GetByIDs retrieves messages by their IDs
func (*MessageDBRepository) GetMessageStatusCountsByChannel ¶
func (msgRepo *MessageDBRepository) GetMessageStatusCountsByChannel(channelID string) ([]*data.StatusCount[data.MsgStatus], error)
func (*MessageDBRepository) GetMessagesForChannel ¶
func (msgRepo *MessageDBRepository) GetMessagesForChannel(channelID string, page *data.Pagination, statusFilters ...data.MsgStatus) ([]*data.Message, *data.Pagination, error)
GetMessagesForChannel retrieves messages broadcasted to a specific channel
func (*MessageDBRepository) GetMessagesFromBeforeDurationThatAreCompletelyDelivered ¶
func (msgRepo *MessageDBRepository) GetMessagesFromBeforeDurationThatAreCompletelyDelivered(delta time.Duration, absoluteMaxMessages int) []*data.Message
GetMessagesFromBeforeDurationThatAreCompletelyDelivered retrieves messages for which every job is in delivered status and the message was created before the `delta` period. Maximum messages returned would be less than as specified by `absoluteMaxMessages` + 100
func (*MessageDBRepository) GetMessagesNotDispatchedForCertainPeriod ¶
func (msgRepo *MessageDBRepository) GetMessagesNotDispatchedForCertainPeriod(delta time.Duration) []*data.Message
GetMessagesNotDispatchedForCertainPeriod retrieves messages in acknowledged state despite `delta` being passed.
func (*MessageDBRepository) SetDispatched ¶
func (msgRepo *MessageDBRepository) SetDispatched(txContext context.Context, message *data.Message) error
SetDispatched sets the status of the message to dispatched within the transaction passed via txContext
type MessageRepository ¶
type MessageRepository interface { Create(message *data.Message) error Get(channelID string, messageID string) (*data.Message, error) GetByID(id string) (*data.Message, error) GetByIDs(ids []string) ([]*data.Message, error) SetDispatched(txContext context.Context, message *data.Message) error GetMessagesNotDispatchedForCertainPeriod(delta time.Duration) []*data.Message GetMessagesForChannel(channelID string, page *data.Pagination, statusFilters ...data.MsgStatus) ([]*data.Message, *data.Pagination, error) GetMessagesFromBeforeDurationThatAreCompletelyDelivered(delta time.Duration, absoluteMaxMessages int) []*data.Message DeleteMessage(message *data.Message) error DeleteMessagesAndJobs(ctx context.Context, messageIDs []string) error GetMessageStatusCountsByChannel(channelID string) ([]*data.StatusCount[data.MsgStatus], error) }
MessageRepository allows storage operations over Message. SetDispatched does not accept TX directly to keep the API storage class independent
func NewMessageRepository ¶
func NewMessageRepository(db *sql.DB, channelRepo ChannelRepository, producerRepo ProducerRepository) MessageRepository
NewMessageRepository creates a new instance of MessageRepository
type MigrationConfig ¶
MigrationConfig represents the DB migration config
type ProducerDBRepository ¶
type ProducerDBRepository struct {
// contains filtered or unexported fields
}
ProducerDBRepository is the producer repository implementation for RDBMS
func (*ProducerDBRepository) Get ¶
func (repo *ProducerDBRepository) Get(producerID string) (*data.Producer, error)
Get retrieves the producer with matching producer id
func (*ProducerDBRepository) GetList ¶
func (repo *ProducerDBRepository) GetList(page *data.Pagination) ([]*data.Producer, *data.Pagination, error)
GetList retrieves the list of producer based on pagination params supplied. It will return a error if both after and before is present at the same time
type ProducerRepository ¶
type ProducerRepository interface { Store(producer *data.Producer) (*data.Producer, error) Get(producerID string) (*data.Producer, error) GetList(page *data.Pagination) ([]*data.Producer, *data.Pagination, error) }
ProducerRepository allows storage operation interaction for Producer
func NewCachedProducerRepository ¶
func NewCachedProducerRepository(delegate PseudoProducerRepository, ttl time.Duration) ProducerRepository
NewCachedProducerRepository creates a new CachedProducerRepository.
type PseudoChannelRepository ¶
type PseudoChannelRepository ChannelRepository
func NewChannelRepository ¶
func NewChannelRepository(db *sql.DB) PseudoChannelRepository
NewChannelRepository retrieves new instance of channel repository
type PseudoConsumerRepository ¶
type PseudoConsumerRepository ConsumerRepository
func NewConsumerRepository ¶
func NewConsumerRepository(db *sql.DB, channelRepo ChannelRepository) PseudoConsumerRepository
NewConsumerRepository initializes new consumer repository
type PseudoProducerRepository ¶
type PseudoProducerRepository ProducerRepository
func NewProducerRepository ¶
func NewProducerRepository(db *sql.DB) PseudoProducerRepository
NewProducerRepository returns a new producer repository
type RelationalDBDataAccessor ¶
type RelationalDBDataAccessor struct {
// contains filtered or unexported fields
}
RelationalDBDataAccessor represents the DataAccessor implementation for RDBMS
func (*RelationalDBDataAccessor) Close ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) Close()
Close closes the connection to DB
func (*RelationalDBDataAccessor) GetAppRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetAppRepository() AppRepository
GetAppRepository returns the AppRepository to be used for App ops
func (*RelationalDBDataAccessor) GetChannelRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetChannelRepository() ChannelRepository
GetChannelRepository returns the ProducerRepository to be used for Producer ops
func (*RelationalDBDataAccessor) GetConsumerRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetConsumerRepository() ConsumerRepository
GetConsumerRepository returns the ProducerRepository to be used for Producer ops
func (*RelationalDBDataAccessor) GetDeliveryJobRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetDeliveryJobRepository() DeliveryJobRepository
GetDeliveryJobRepository retrieves the DeliveryJobRepository to be used for DeliverJob ops
func (*RelationalDBDataAccessor) GetLockRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetLockRepository() LockRepository
GetLockRepository retrieves the LockRepository to be used for Lock ops
func (*RelationalDBDataAccessor) GetMessageRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetMessageRepository() MessageRepository
GetMessageRepository retrieves the MessageRepository to be used for Message ops
func (*RelationalDBDataAccessor) GetProducerRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetProducerRepository() ProducerRepository
GetProducerRepository returns the ProducerRepository to be used for Producer ops