Documentation
¶
Index ¶
- func NewDatabaseQueue(cfg DatabaseQueueConfig) (queue.Queue, error)
- type DatabaseQueue
- func (q *DatabaseQueue) AddressedPush(priority uint64, groupId int64, address string, work queue.Work) error
- func (q *DatabaseQueue) Delete(permit permit.Permit) error
- func (q *DatabaseQueue) Extend(permit permit.Permit) error
- func (q *DatabaseQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, ...) (*queue.QueueWork, error)
- func (q *DatabaseQueue) IsAddressInQueue(address string) (bool, error)
- func (q *DatabaseQueue) Name() string
- func (q *DatabaseQueue) Peek(filter func(work *queue.QueueWork) (bool, error), types ...uint64) ([]queue.QueueWork, error)
- func (q *DatabaseQueue) PollAddress(address string) <-chan error
- func (q *DatabaseQueue) Push(priority uint64, groupId int64, work queue.Work) error
- func (q *DatabaseQueue) RecordFailure(address string, failure error) error
- func (q *DatabaseQueue) SubscribeOne(dataType uint8, matcher broadcaster.Matcher) <-chan listener.Notification
- func (q *DatabaseQueue) Unsubscribe(ch <-chan listener.Notification)
- func (q *DatabaseQueue) WithDbTx(tx interface{}) queue.Queue
- type DatabaseQueueConfig
- type DefaultGroupQueue
- func (q *DefaultGroupQueue) BaseQueueName() string
- func (q *DefaultGroupQueue) Group() groups.GroupQueueJob
- func (q *DefaultGroupQueue) Push(priority uint64, work queue.Work) error
- func (q *DefaultGroupQueue) SetEndWork(work interface{}, endWorkType uint8) error
- func (q *DefaultGroupQueue) Start() error
- type DefaultQueueGroupFactory
- type GroupQueueFactory
- type QueueGroupFactoryConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewDatabaseQueue ¶
func NewDatabaseQueue(cfg DatabaseQueueConfig) (queue.Queue, error)
Types ¶
type DatabaseQueue ¶
type DatabaseQueue struct {
// contains filtered or unexported fields
}
func (*DatabaseQueue) AddressedPush ¶
func (*DatabaseQueue) Get ¶
func (q *DatabaseQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error)
Get attempts to get a job from the queue. Blocks until a job is found and returned Parameters:
- maxPriority uint64 - get only jobs with priority <= this value.
- maxPriorityChan chan uint64 - while blocking (waiting for a job) pass a new maxPriority value to this channel if the capacity changes. For example, we may be blocking (waiting for work) with a maximum priority of 2; when other in-progress work completes, we may suddenly have capacity for work with a maximum priority of 4. In this case, we'd pass a `4` on this channel to notify the store that we can change our polling query to allow a new maximum priority.
- work - returns work via a pointer
Returns:
- Permit - A permit (uint64) for doing the work
- Address - The work's address
- error - An error (if error occurs); nil if successful
func (*DatabaseQueue) IsAddressInQueue ¶
func (q *DatabaseQueue) IsAddressInQueue(address string) (bool, error)
func (*DatabaseQueue) Name ¶
func (q *DatabaseQueue) Name() string
func (*DatabaseQueue) PollAddress ¶
func (q *DatabaseQueue) PollAddress(address string) <-chan error
func (*DatabaseQueue) RecordFailure ¶
func (q *DatabaseQueue) RecordFailure(address string, failure error) error
func (*DatabaseQueue) SubscribeOne ¶
func (q *DatabaseQueue) SubscribeOne(dataType uint8, matcher broadcaster.Matcher) <-chan listener.Notification
SubscribeOne returns a new output channel that will receive one and only one broadcast event when the provided Matcher returns `true`. When an event matches the Matcher, the event is passed over the output channel and the channel is immediately unsubscribed. You should still call `Unsubscribe` with the channel in case an event is never received.
func (*DatabaseQueue) Unsubscribe ¶
func (q *DatabaseQueue) Unsubscribe(ch <-chan listener.Notification)
Unsubscribe removes a channel from receiving broadcast events. That channel is closed as a consequence of unsubscribing.
func (*DatabaseQueue) WithDbTx ¶
func (q *DatabaseQueue) WithDbTx(tx interface{}) queue.Queue
type DatabaseQueueConfig ¶
type DatabaseQueueConfig struct { QueueName string NotifyTypeWorkReady uint8 NotifyTypeWorkComplete uint8 NotifyTypeChunk uint8 ChunkMatcher dbqueuetypes.DatabaseQueueChunkMatcher CarrierFactory metrics.CarrierFactory QueueStore dbqueuetypes.QueueStore QueueMsgsChan <-chan listener.Notification WorkMsgsChan <-chan listener.Notification ChunkMsgsChan <-chan listener.Notification StopChan chan bool JobLifecycleWrapper metrics.JobLifecycleWrapper Metrics metrics.Metrics }
type DefaultGroupQueue ¶
type DefaultGroupQueue struct { BaseQueue queue.Queue GroupQueue queue.Queue // contains filtered or unexported fields }
func (*DefaultGroupQueue) BaseQueueName ¶
func (q *DefaultGroupQueue) BaseQueueName() string
func (*DefaultGroupQueue) Group ¶
func (q *DefaultGroupQueue) Group() groups.GroupQueueJob
func (*DefaultGroupQueue) Push ¶
func (q *DefaultGroupQueue) Push(priority uint64, work queue.Work) error
func (*DefaultGroupQueue) SetEndWork ¶
func (q *DefaultGroupQueue) SetEndWork(work interface{}, endWorkType uint8) error
func (*DefaultGroupQueue) Start ¶
func (q *DefaultGroupQueue) Start() error
type DefaultQueueGroupFactory ¶
type DefaultQueueGroupFactory struct {
// contains filtered or unexported fields
}
func NewQueueGroupFactory ¶
func NewQueueGroupFactory(cfg QueueGroupFactoryConfig) *DefaultQueueGroupFactory
func (*DefaultQueueGroupFactory) GetGroup ¶
func (qf *DefaultQueueGroupFactory) GetGroup(g groups.GroupQueueJob) groups.GroupQueue
func (*DefaultQueueGroupFactory) NewGroup ¶
func (qf *DefaultQueueGroupFactory) NewGroup(job groups.GroupQueueJob) (q groups.GroupQueue, err error)
type GroupQueueFactory ¶
type GroupQueueFactory interface { // NewGroup returns a new GroupQueue. NewGroup(job groups.GroupQueueJob) (groups.GroupQueue, error) // GetGroup wraps an existing QueueGroup in the GroupQueue interface GetGroup(group groups.GroupQueueJob) groups.GroupQueue }
Click to show internal directories.
Click to hide internal directories.