queue

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Queue names
	QueueEventProcessing = "events"
	QueueWebhookDelivery = "webhooks"
	QueueBatchJobs       = "batch_jobs"
	QueueDefault         = river.QueueDefault
)

Variables

This section is empty.

Functions

func NewJobInserter

func NewJobInserter(client *river.Client[pgx.Tx]) *jobInserter

NewJobInserter creates a new JobInserter

Types

type BatchJobArgs added in v0.8.0

type BatchJobArgs struct {
	TenantID string `json:"tenant_id"`
	BatchID  string `json:"batch_id"`
}

BatchJobArgs represents a batch job processing task (event repush or delivery retry). The batch data (item IDs, type) is stored in the batch_jobs table; this job just carries the reference.

func (BatchJobArgs) InsertOpts added in v0.8.0

func (BatchJobArgs) InsertOpts() river.InsertOpts

func (BatchJobArgs) Kind added in v0.8.0

func (BatchJobArgs) Kind() string

Kind returns the job kind for River queue

type BatchJobWorker added in v0.8.0

type BatchJobWorker struct {
	river.WorkerDefaults[BatchJobArgs]
	// contains filtered or unexported fields
}

BatchJobWorker processes batch jobs (event re-push and delivery retry). It reads item IDs from the batch_jobs row and dispatches each one.

func NewBatchJobWorker added in v0.8.0

func NewBatchJobWorker(webhookRepo store.RepositoryInterface, jobInserter JobInserter) *BatchJobWorker

NewBatchJobWorker creates a new batch job worker.

func (*BatchJobWorker) Work added in v0.8.0

Work processes a batch job by dispatching each item based on job_type.

type EventArgs

type EventArgs struct {
	TenantID   string            `json:"tenant_id"`
	EventID    string            `json:"event_id"`
	Namespace  string            `json:"namespace"`
	Event      string            `json:"event"`
	TTLSeconds int64             `json:"ttl_seconds"`
	Metadata   map[string]string `json:"metadata"`
	Labels     map[string]string `json:"labels"`
	CreatedAt  time.Time         `json:"created_at"`
}

EventArgs represents an event processing job Contains only essential identifiers - the payload is stored in the database

func (EventArgs) InsertOpts

func (EventArgs) InsertOpts() river.InsertOpts

func (EventArgs) Kind

func (EventArgs) Kind() string

Kind returns the job kind for River queue

type EventProcessingWorker

type EventProcessingWorker struct {
	river.WorkerDefaults[EventArgs]
	// contains filtered or unexported fields
}

EventProcessingWorker processes events and triggers webhook deliveries

func NewEventProcessingWorker

func NewEventProcessingWorker(webhookRepo store.RepositoryInterface, jobInserter JobInserter) *EventProcessingWorker

NewEventProcessingWorker creates a new event processing worker with a river client

func (*EventProcessingWorker) Work

Work processes an event and creates webhook delivery jobs

type JobInserter

type JobInserter interface {
	Insert(ctx context.Context, args river.JobArgs) (*rivertype.JobInsertResult, error)
	BatchInsert(ctx context.Context, args []river.JobArgs) ([]*rivertype.JobInsertResult, error)
}

QueueManagerInterface defines the interface for queue management.

type JobInserterWithTracing

type JobInserterWithTracing struct {
	JobInserter
	// contains filtered or unexported fields
}

JobInserterWithTracing implements JobInserter interface instrumented with open telemetry spans

func NewJobInserterWithTracing

func NewJobInserterWithTracing(base JobInserter, instance string, spanDecorator ...func(span trace.Span, params, results map[string]any)) JobInserterWithTracing

NewJobInserterWithTracing returns JobInserterWithTracing

func (JobInserterWithTracing) BatchInsert

func (_d JobInserterWithTracing) BatchInsert(ctx context.Context, args []river.JobArgs) (jpa1 []*rivertype.JobInsertResult, err error)

BatchInsert implements JobInserter

func (JobInserterWithTracing) Insert

Insert implements JobInserter

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles the River queue management

func NewManager

func NewManager(ctx context.Context, webhookRepo store.RepositoryInterface, cryptoSvc *crypto.Service, dbPool *pgxpool.Pool, clientConfig *client.Config) (*Manager, error)

NewManager creates a new queue manager

func (*Manager) GetJobInserter

func (m *Manager) GetJobInserter() JobInserter

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the queue processing

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

Stop stops the queue processing

type WebhookArgs

type WebhookArgs struct {
	TenantID       string    `json:"tenant_id"`
	DeliveryID     string    `json:"delivery_id"`
	WebhookID      string    `json:"webhook_id"`
	SubscriptionID string    `json:"subscription_id"`
	EventID        string    `json:"event_id"`
	ExpiresAt      time.Time `json:"expires_at"`
	Namespace      string    `json:"namespace"`
	MaxAttempts    int       `json:"max_attempts"`
}

WebhookArgs represents a webhook delivery job Contains only essential identifiers - webhook config and event payload retrieved from database

func (WebhookArgs) InsertOpts

func (w WebhookArgs) InsertOpts() river.InsertOpts

func (WebhookArgs) Kind

func (WebhookArgs) Kind() string

Kind returns the job kind for River queue

type WebhookWorker

type WebhookWorker struct {
	river.WorkerDefaults[WebhookArgs]
	// contains filtered or unexported fields
}

WebhookWorker handles webhook delivery jobs

func NewWebhookWorker

func NewWebhookWorker(webhookRepo store.RepositoryInterface, cryptoSvc *crypto.Service, clientConfig *client.Config) *WebhookWorker

NewWebhookWorker creates a new webhook worker

func (*WebhookWorker) Work

func (w *WebhookWorker) Work(ctx context.Context, job *river.Job[WebhookArgs]) error

Work processes the webhook delivery job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL