queue

package
v0.0.0-...-293aeef Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type LinkedListMessageQueuePool

type LinkedListMessageQueuePool struct {
	// contains filtered or unexported fields
}

func (*LinkedListMessageQueuePool) Add

func (*LinkedListMessageQueuePool) Create

func (mqp *LinkedListMessageQueuePool) Create() error

Creates message queues and populates the pool.

A new message queue is added to the pool each time this method is invoked on an instance of message queue pool.

func (*LinkedListMessageQueuePool) Delete

Deletes the specified message queue from the message queue pool if that message queue existed in the pool previously. Otherwise does not change the state of the pool.

Consider: currently it used pointer to the message queue for identifying the message queue but in future, we may need to use unique ID for a particular message queue if we want to scale it in a distributed manner as different computers in a queue swarm do not share the same address space.

func (*LinkedListMessageQueuePool) Get

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

func NewMemoryStore

func NewMemoryStore(topicName string) *MemoryStore

func (*MemoryStore) Extract

func (s *MemoryStore) Extract(offset int) ([]byte, error)

func (*MemoryStore) ExtractLatest

func (s *MemoryStore) ExtractLatest() ([]byte, error)

func (*MemoryStore) Insert

func (s *MemoryStore) Insert(b []byte) (int, error)

type MemoryStoreFactory

type MemoryStoreFactory struct {
	*StoreFactoryConfig
}

func NewMemoryStoreFactory

func NewMemoryStoreFactory(config *StoreFactoryConfig) *MemoryStoreFactory

func (*MemoryStoreFactory) Produce

func (msf *MemoryStoreFactory) Produce(topicName string) Store

type MemoryStoreFactoryConfig

type MemoryStoreFactoryConfig struct {
}

type MessageQueue

type MessageQueue struct {
	*MessageQueueConfig
	// contains filtered or unexported fields
}

func NewMessageQueue

func NewMessageQueue(config *MessageQueueConfig) (*MessageQueue, error)

func (*MessageQueue) GetStore

func (mq *MessageQueue) GetStore(topicName string) (Store, error)

This function gets the the corresponding store for a particular topic name. This function guarentees to return the store for that particular topic name in case any error does not occur while checking the existence or creating a new store as this will create a new store if previously store for a given did not exist.

Returns nil as store when an error occurs.

func (*MessageQueue) IsFull

func (mq *MessageQueue) IsFull() bool

Returns a boolean indicating whether the message queue is completely filled or not.

func (*MessageQueue) PublishMessage

func (mq *MessageQueue) PublishMessage(message transport.Message) (int, error)

Publishes the message to the topic mentioned in `Message.Topic` field.

If the mentioned topic does not exist, this function will create a new topic and then publish the data into that topic.

Returns the offset of the message inside the topic store as first value.

Returns an error if the topic name is empty string or unable to insert data into the topic store. If an error is thrown then offset value returned is -1.

type MessageQueueConfig

type MessageQueueConfig struct {
	// contains filtered or unexported fields
}

type MessageQueueIdentifier

type MessageQueueIdentifier interface{}

type MessageQueuePool

type MessageQueuePool interface {
	// This guarantees to provide a new message queue whenever this method
	// is invoked on a pool
	Get() (*MessageQueue, error)

	// Creates message queues and populates the pool.
	//
	// A new message queue is added to the pool each time this
	// method is invoked on an instance of message queue pool.
	Create() error

	Add(*MessageQueue) error

	// Deletes the specified message queue from the message queue pool if
	// that message queue existed in the pool previously. Otherwise does not
	// change the state of the pool.
	//
	// Consider: currently it used pointer to the message queue for identifying
	// the message queue but in future, we may need to use unique ID for a particular
	// message queue if we want to scale it in a distributed manner as different
	// computers in a queue swarm do not share the same address space.
	Remove(*MessageQueue) error
}

type Store

type Store interface {
	Insert([]byte) (int, error)
	Extract(int) ([]byte, error)
	ExtractLatest() ([]byte, error)
}

type StoreFactory

type StoreFactory interface {
	Produce(string) Store
}

type StoreFactoryConfig

type StoreFactoryConfig struct {
}

type TopicIdentifier

type TopicIdentifier string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL