mqueue

package
v1.31.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2021 License: MIT Imports: 17 Imported by: 0

README

Simple message queue based on postgres, this is for more realiably sending messages with retry on failure, accepting long failture durations such as discord being down.

Documentation

Index

Constants

View Source
const DBSchema = `` /* 266-byte string literal not displayed */

Variables

This section is empty.

Functions

func QueueMessage added in v1.4.7

func QueueMessage(elem *QueuedElement) error

QueueMessage queues a message in the message queue

func RegisterPlugin added in v1.4.1

func RegisterPlugin()

RegisterPlugin registers the mqueue plugin into the plugin system and also initializes it

func RegisterSource

func RegisterSource(name string, source PluginWithSourceDisabler)

RegisterSource registers a mqueue source, used for error handling

Types

type DiscordProcessor added in v1.28.0

type DiscordProcessor struct {
}

func (*DiscordProcessor) ProcessItem added in v1.28.0

func (d *DiscordProcessor) ProcessItem(resp chan *workResult, wi *workItem)

type ItemProcessor added in v1.28.0

type ItemProcessor interface {
	ProcessItem(resp chan *workResult, wi *workItem)
}

type MqueueServer added in v1.28.0

type MqueueServer struct {
	PushWork chan *workItem

	Stop chan *sync.WaitGroup
	// contains filtered or unexported fields
}

MqueueServer is a worker that processes mqueue items for the current shards on the process It uses primarily pubsub but it initializes the list by checking the sorted list

func NewServer added in v1.28.0

func NewServer(backend Storage, processor ItemProcessor) *MqueueServer

func (*MqueueServer) Run added in v1.28.0

func (m *MqueueServer) Run()

type Plugin added in v1.4.1

type Plugin struct {
	// contains filtered or unexported fields
}

Plugin represents the mqueue plugin

func (*Plugin) BotInit added in v1.4.1

func (p *Plugin) BotInit()

func (*Plugin) LateBotInit added in v1.11.5

func (p *Plugin) LateBotInit()

LateBotInit implements bot.LateBotInitHandler

func (*Plugin) PluginInfo added in v1.17.0

func (p *Plugin) PluginInfo() *common.PluginInfo

PluginInfo implements common.Plugin

func (*Plugin) RunBackgroundWorker added in v1.28.0

func (p *Plugin) RunBackgroundWorker()

RunBackgroundWorker implements backgroundworkers.BackgroundWorkerPlugin

func (*Plugin) StopBackgroundWorker added in v1.28.0

func (p *Plugin) StopBackgroundWorker(wg *sync.WaitGroup)

StopBackgroundWorker implements backgroundworkers.BackgroundWorkerPlugin

func (*Plugin) StopBot added in v1.4.1

func (p *Plugin) StopBot(wg *sync.WaitGroup)

StopBot implements bot.BotStopperHandler

type PluginWithSourceDisabler added in v1.20.14

type PluginWithSourceDisabler interface {
	DisableFeed(elem *QueuedElement, err error)
}

PluginWithSourceDisabler

type PluginWithWebhookAvatar added in v1.16.2

type PluginWithWebhookAvatar interface {
	WebhookAvatar() string
}

PluginWithWebhookAvatar can be implemented by plugins for custom avatars

type Producer added in v1.28.0

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) QueueMessage added in v1.28.0

func (p *Producer) QueueMessage(elem *QueuedElement) error

QueueMessage queues a message in the message queue

type QueuedElement

type QueuedElement struct {
	// The channel to send the message in
	ChannelID int64 `json:"Channel"`
	GuildID   int64 `json:"Guild"`

	ID int64

	// Where this feed originated from, responsible for handling discord specific errors
	Source string
	// Could be stuff like reddit feed element id, youtube feed element id and so on
	SourceItemID string `json:"SourceID"`

	// The actual message as a simple string
	MessageStr string `json:",omitempty"`

	// The actual message as an embed
	MessageEmbed *discordgo.MessageEmbed `json:",omitempty"`

	UseWebhook      bool
	WebhookUsername string

	AllowedMentions discordgo.AllowedMentions `json:"allowed_mentions"`

	// When the queue grows, the feeds with the highest priority gets sent first
	Priority int

	CreatedAt time.Time
}

QueuedElement represents a queued message

type RedisBackend added in v1.28.0

type RedisBackend struct {
	// contains filtered or unexported fields
}

func NewRedisBackend added in v1.29.0

func NewRedisBackend(pool *radix.Pool) *RedisBackend

func (*RedisBackend) AppendItem added in v1.28.0

func (rb *RedisBackend) AppendItem(elem *QueuedElement) error

func (*RedisBackend) DelItem added in v1.28.0

func (rb *RedisBackend) DelItem(item *workItem) error

func (*RedisBackend) GetFullQueue added in v1.28.0

func (rb *RedisBackend) GetFullQueue() ([]*workItem, error)

func (*RedisBackend) NextID added in v1.28.0

func (rb *RedisBackend) NextID() (next int64, err error)

type RedisPushServer added in v1.28.0

type RedisPushServer struct {
	// contains filtered or unexported fields
}

type Storage added in v1.28.0

type Storage interface {
	GetFullQueue() ([]*workItem, error)
	AppendItem(elem *QueuedElement) error
	DelItem(elem *workItem) error
	NextID() (int64, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL