Documentation
¶
Index ¶
Constants ¶
const ( // Queue names QueueEventProcessing = "events" QueueWebhookDelivery = "webhooks" QueueBatchJobs = "batch_jobs" QueueDefault = river.QueueDefault )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BatchJobArgs ¶ added in v0.8.0
BatchJobArgs represents a batch job processing task (event repush or delivery retry). The batch data (item IDs, type) is stored in the batch_jobs table; this job just carries the reference.
func (BatchJobArgs) InsertOpts ¶ added in v0.8.0
func (BatchJobArgs) InsertOpts() river.InsertOpts
func (BatchJobArgs) Kind ¶ added in v0.8.0
func (BatchJobArgs) Kind() string
Kind returns the job kind for River queue
type BatchJobWorker ¶ added in v0.8.0
type BatchJobWorker struct {
river.WorkerDefaults[BatchJobArgs]
// contains filtered or unexported fields
}
BatchJobWorker processes batch jobs (event re-push and delivery retry). It reads item IDs from the batch_jobs row and dispatches each one.
func NewBatchJobWorker ¶ added in v0.8.0
func NewBatchJobWorker(webhookRepo store.RepositoryInterface, jobInserter JobInserter) *BatchJobWorker
NewBatchJobWorker creates a new batch job worker.
func (*BatchJobWorker) Work ¶ added in v0.8.0
func (w *BatchJobWorker) Work(ctx context.Context, job *river.Job[BatchJobArgs]) error
Work processes a batch job by dispatching each item based on job_type.
type EventArgs ¶
type EventArgs struct {
TenantID string `json:"tenant_id"`
EventID string `json:"event_id"`
Namespace string `json:"namespace"`
Event string `json:"event"`
TTLSeconds int64 `json:"ttl_seconds"`
Metadata map[string]string `json:"metadata"`
Labels map[string]string `json:"labels"`
CreatedAt time.Time `json:"created_at"`
}
EventArgs represents an event processing job Contains only essential identifiers - the payload is stored in the database
func (EventArgs) InsertOpts ¶
func (EventArgs) InsertOpts() river.InsertOpts
type EventProcessingWorker ¶
type EventProcessingWorker struct {
river.WorkerDefaults[EventArgs]
// contains filtered or unexported fields
}
EventProcessingWorker processes events and triggers webhook deliveries
func NewEventProcessingWorker ¶
func NewEventProcessingWorker(webhookRepo store.RepositoryInterface, jobInserter JobInserter) *EventProcessingWorker
NewEventProcessingWorker creates a new event processing worker with a river client
type JobInserter ¶
type JobInserter interface {
Insert(ctx context.Context, args river.JobArgs) (*rivertype.JobInsertResult, error)
BatchInsert(ctx context.Context, args []river.JobArgs) ([]*rivertype.JobInsertResult, error)
}
QueueManagerInterface defines the interface for queue management.
type JobInserterWithTracing ¶
type JobInserterWithTracing struct {
JobInserter
// contains filtered or unexported fields
}
JobInserterWithTracing implements JobInserter interface instrumented with open telemetry spans
func NewJobInserterWithTracing ¶
func NewJobInserterWithTracing(base JobInserter, instance string, spanDecorator ...func(span trace.Span, params, results map[string]any)) JobInserterWithTracing
NewJobInserterWithTracing returns JobInserterWithTracing
func (JobInserterWithTracing) BatchInsert ¶
func (_d JobInserterWithTracing) BatchInsert(ctx context.Context, args []river.JobArgs) (jpa1 []*rivertype.JobInsertResult, err error)
BatchInsert implements JobInserter
func (JobInserterWithTracing) Insert ¶
func (_d JobInserterWithTracing) Insert(ctx context.Context, args river.JobArgs) (jp1 *rivertype.JobInsertResult, err error)
Insert implements JobInserter
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager handles the River queue management
func NewManager ¶
func NewManager(ctx context.Context, webhookRepo store.RepositoryInterface, cryptoSvc *crypto.Service, dbPool *pgxpool.Pool, clientConfig *client.Config) (*Manager, error)
NewManager creates a new queue manager
func (*Manager) GetJobInserter ¶
func (m *Manager) GetJobInserter() JobInserter
type WebhookArgs ¶
type WebhookArgs struct {
TenantID string `json:"tenant_id"`
DeliveryID string `json:"delivery_id"`
WebhookID string `json:"webhook_id"`
SubscriptionID string `json:"subscription_id"`
EventID string `json:"event_id"`
ExpiresAt time.Time `json:"expires_at"`
Namespace string `json:"namespace"`
MaxAttempts int `json:"max_attempts"`
}
WebhookArgs represents a webhook delivery job Contains only essential identifiers - webhook config and event payload retrieved from database
func (WebhookArgs) InsertOpts ¶
func (w WebhookArgs) InsertOpts() river.InsertOpts
func (WebhookArgs) Kind ¶
func (WebhookArgs) Kind() string
Kind returns the job kind for River queue
type WebhookWorker ¶
type WebhookWorker struct {
river.WorkerDefaults[WebhookArgs]
// contains filtered or unexported fields
}
WebhookWorker handles webhook delivery jobs
func NewWebhookWorker ¶
func NewWebhookWorker(webhookRepo store.RepositoryInterface, cryptoSvc *crypto.Service, clientConfig *client.Config) *WebhookWorker
NewWebhookWorker creates a new webhook worker
func (*WebhookWorker) Work ¶
func (w *WebhookWorker) Work(ctx context.Context, job *river.Job[WebhookArgs]) error
Work processes the webhook delivery job