database

package
v2.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewDatabaseQueue

func NewDatabaseQueue(cfg DatabaseQueueConfig) (queue.Queue, error)

Types

type DatabaseQueue

type DatabaseQueue struct {
	// contains filtered or unexported fields
}

func (*DatabaseQueue) AddressedPush

func (q *DatabaseQueue) AddressedPush(priority uint64, groupId int64, address string, work queue.Work) error

func (*DatabaseQueue) Delete

func (q *DatabaseQueue) Delete(permit permit.Permit) error

func (*DatabaseQueue) Extend

func (q *DatabaseQueue) Extend(permit permit.Permit) error

func (*DatabaseQueue) Get

func (q *DatabaseQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error)

Get attempts to get a job from the queue. Blocks until a job is found and returned Parameters:

  • maxPriority uint64 - get only jobs with priority <= this value.
  • maxPriorityChan chan uint64 - while blocking (waiting for a job) pass a new maxPriority value to this channel if the capacity changes. For example, we may be blocking (waiting for work) with a maximum priority of 2; when other in-progress work completes, we may suddenly have capacity for work with a maximum priority of 4. In this case, we'd pass a `4` on this channel to notify the store that we can change our polling query to allow a new maximum priority.
  • work - returns work via a pointer

Returns:

  • Permit - A permit (uint64) for doing the work
  • Address - The work's address
  • error - An error (if error occurs); nil if successful

func (*DatabaseQueue) IsAddressInQueue

func (q *DatabaseQueue) IsAddressInQueue(address string) (bool, error)

func (*DatabaseQueue) Name

func (q *DatabaseQueue) Name() string

func (*DatabaseQueue) Peek

func (q *DatabaseQueue) Peek(filter func(work *queue.QueueWork) (bool, error), types ...uint64) ([]queue.QueueWork, error)

func (*DatabaseQueue) PollAddress

func (q *DatabaseQueue) PollAddress(address string) <-chan error

func (*DatabaseQueue) Push

func (q *DatabaseQueue) Push(priority uint64, groupId int64, work queue.Work) error

func (*DatabaseQueue) RecordFailure

func (q *DatabaseQueue) RecordFailure(address string, failure error) error

func (*DatabaseQueue) SubscribeOne

func (q *DatabaseQueue) SubscribeOne(dataType uint8, matcher broadcaster.Matcher) <-chan listener.Notification

SubscribeOne returns a new output channel that will receive one and only one broadcast event when the provided Matcher returns `true`. When an event matches the Matcher, the event is passed over the output channel and the channel is immediately unsubscribed. You should still call `Unsubscribe` with the channel in case an event is never received.

func (*DatabaseQueue) Unsubscribe

func (q *DatabaseQueue) Unsubscribe(ch <-chan listener.Notification)

Unsubscribe removes a channel from receiving broadcast events. That channel is closed as a consequence of unsubscribing.

func (*DatabaseQueue) WithDbTx

func (q *DatabaseQueue) WithDbTx(tx interface{}) queue.Queue

type DatabaseQueueConfig

type DatabaseQueueConfig struct {
	QueueName              string
	NotifyTypeWorkReady    uint8
	NotifyTypeWorkComplete uint8
	NotifyTypeChunk        uint8
	ChunkMatcher           dbqueuetypes.DatabaseQueueChunkMatcher
	CarrierFactory         metrics.CarrierFactory
	QueueStore             dbqueuetypes.QueueStore
	QueueMsgsChan          <-chan listener.Notification
	WorkMsgsChan           <-chan listener.Notification
	ChunkMsgsChan          <-chan listener.Notification
	StopChan               chan bool
	JobLifecycleWrapper    metrics.JobLifecycleWrapper
	Metrics                metrics.Metrics
}

type DefaultGroupQueue

type DefaultGroupQueue struct {
	BaseQueue  queue.Queue
	GroupQueue queue.Queue
	// contains filtered or unexported fields
}

func (*DefaultGroupQueue) BaseQueueName

func (q *DefaultGroupQueue) BaseQueueName() string

func (*DefaultGroupQueue) Group

func (*DefaultGroupQueue) Push

func (q *DefaultGroupQueue) Push(priority uint64, work queue.Work) error

func (*DefaultGroupQueue) SetEndWork

func (q *DefaultGroupQueue) SetEndWork(work interface{}, endWorkType uint8) error

func (*DefaultGroupQueue) Start

func (q *DefaultGroupQueue) Start() error

type DefaultQueueGroupFactory

type DefaultQueueGroupFactory struct {
	// contains filtered or unexported fields
}

func (*DefaultQueueGroupFactory) GetGroup

func (*DefaultQueueGroupFactory) NewGroup

type GroupQueueFactory

type GroupQueueFactory interface {
	// NewGroup returns a new GroupQueue.
	NewGroup(job groups.GroupQueueJob) (groups.GroupQueue, error)

	// GetGroup wraps an existing QueueGroup in the GroupQueue interface
	GetGroup(group groups.GroupQueueJob) groups.GroupQueue
}

type QueueGroupFactoryConfig

type QueueGroupFactoryConfig struct {
	BaseQueue  queue.Queue
	GroupQueue queue.Queue
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL