worker

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TypeEmailSend      = "email:send"
	TypeCampaignStart  = "campaign:start"
	TypeCampaignBatch  = "campaign:batch"
	TypeInboundProcess = "inbound:process"

	QueueTransactional = "transactional"
	QueueBulk          = "bulk"
	QueueLow           = "low"
)

Variables

This section is empty.

Functions

func ChainErrorHandlers added in v0.6.0

func ChainErrorHandlers(handlers ...asynq.ErrorHandler) asynq.ErrorHandler

ChainErrorHandlers returns an asynq.ErrorHandler that invokes each handler in order. Each inner handler is expected to guard on task type and no-op otherwise.

func NewCampaignBatchTask

func NewCampaignBatchTask(campaignID uint, opts ...asynq.Option) (*asynq.Task, error)

func NewCampaignStartTask

func NewCampaignStartTask(campaignID uint, opts ...asynq.Option) (*asynq.Task, error)

func NewEmailSendTask

func NewEmailSendTask(emailID uint, opts ...asynq.Option) (*asynq.Task, error)

NewEmailSendTask creates an Asynq task to send an email.

func NewInboundProcessTask added in v0.6.0

func NewInboundProcessTask(id uint, opts ...asynq.Option) (*asynq.Task, error)

NewInboundProcessTask creates an Asynq task to dispatch an inbound email's email.inbound webhook to subscribers.

func SignInboundAttachmentToken added in v0.6.0

func SignInboundAttachmentToken(key []byte, uuid string, idx int) string

SignInboundAttachmentToken creates an HMAC-signed token authorizing access to a specific inbound attachment.

func VerifyInboundAttachmentToken added in v0.6.0

func VerifyInboundAttachmentToken(key []byte, uuid string, idx int, token string) bool

VerifyInboundAttachmentToken checks whether the provided token is valid for the given (uuid, idx) pair.

Types

type CampaignPayload

type CampaignPayload struct {
	CampaignID uint `json:"campaign_id"`
}

type CampaignProcessor

type CampaignProcessor struct {
	// contains filtered or unexported fields
}

CampaignProcessor handles campaign:start and campaign:batch tasks.

func (*CampaignProcessor) HandleCampaignBatch

func (p *CampaignProcessor) HandleCampaignBatch(_ context.Context, t *asynq.Task) error

HandleCampaignBatch processes a campaign:batch task. It loads a batch of pending messages, creates Email records, and enqueues them.

func (*CampaignProcessor) HandleCampaignStart

func (p *CampaignProcessor) HandleCampaignStart(_ context.Context, t *asynq.Task) error

HandleCampaignStart processes a campaign:start task.

type DailyReportHandler added in v0.5.1

type DailyReportHandler struct {
	// contains filtered or unexported fields
}

DailyReportHandler processes daily report tasks from the Asynq queue.

func NewDailyReportHandler added in v0.5.1

func NewDailyReportHandler(
	notifier *notification.Service,
	analyticsRepo *repositories.AnalyticsRepository,
	bounceRepo *repositories.BounceRepository,
) *DailyReportHandler

NewDailyReportHandler creates a new daily report task handler.

func (*DailyReportHandler) ProcessTask added in v0.5.1

func (h *DailyReportHandler) ProcessTask(_ context.Context, t *asynq.Task) error

ProcessTask handles a daily report task for a single user.

type EmailSendHandler

type EmailSendHandler struct {
	// contains filtered or unexported fields
}

EmailSendHandler processes email:send tasks from the Asynq queue.

func (*EmailSendHandler) OnFailed

func (h *EmailSendHandler) OnFailed(fn func())

OnFailed sets a callback invoked after each permanently failed email send.

func (*EmailSendHandler) OnSent

func (h *EmailSendHandler) OnSent(fn func())

OnSent sets a callback invoked after each successful email send.

func (*EmailSendHandler) ProcessTask

func (h *EmailSendHandler) ProcessTask(ctx context.Context, t *asynq.Task) error

ProcessTask handles an email:send task.

func (*EmailSendHandler) SetBlobStore

func (h *EmailSendHandler) SetBlobStore(bs blob.Store)

SetBlobStore sets the blob storage backend for fetching attachment content.

func (*EmailSendHandler) SetCampaignMessageRepo added in v0.5.0

func (h *EmailSendHandler) SetCampaignMessageRepo(r *repositories.CampaignMessageRepository)

SetCampaignMessageRepo sets the campaign message repository so that campaign message statuses are updated when emails are sent or fail.

func (*EmailSendHandler) SetCampaignRepo added in v0.6.0

func (h *EmailSendHandler) SetCampaignRepo(r *repositories.CampaignRepository)

SetCampaignRepo lets the handler consult the campaign's live status before sending. When set, already-queued emails for cancelled/paused campaigns are dropped instead of dispatched.

func (*EmailSendHandler) SetStamper added in v0.6.0

func (h *EmailSendHandler) SetStamper(s *email.Stamper)

SetStamper enables X-Mailer/X-Posta-* header stamping and optional X-Posta-Signature HMAC. Nil disables stamping.

type EmailSendPayload

type EmailSendPayload struct {
	EmailID uint `json:"email_id"`
}

type ExhaustedErrorHandler

type ExhaustedErrorHandler struct {
	// contains filtered or unexported fields
}

ExhaustedErrorHandler marks emails as permanently failed when Asynq exhausts all retries. It implements asynq.ErrorHandler.

func NewExhaustedErrorHandler

func NewExhaustedErrorHandler(emailRepo *repositories.EmailRepository, dispatcher *webhook.Dispatcher, onFailed func()) *ExhaustedErrorHandler

func (*ExhaustedErrorHandler) HandleError

func (e *ExhaustedErrorHandler) HandleError(_ context.Context, t *asynq.Task, err error)

type InboundAttachmentView added in v0.6.0

type InboundAttachmentView struct {
	Filename    string `json:"filename"`
	ContentType string `json:"content_type"`
	Size        int64  `json:"size"`
	URL         string `json:"url,omitempty"`
}

InboundAttachmentView is the attachment shape included in the email.inbound webhook. No raw content — only metadata plus a signed download URL (when configured).

type InboundExhaustedErrorHandler added in v0.6.0

type InboundExhaustedErrorHandler struct {
	// contains filtered or unexported fields
}

InboundExhaustedErrorHandler marks inbound emails permanently failed once Asynq exhausts retries for the inbound:process task.

func NewInboundExhaustedErrorHandler added in v0.6.0

func NewInboundExhaustedErrorHandler(repo *repositories.InboundEmailRepository, onFailed func()) *InboundExhaustedErrorHandler

func (*InboundExhaustedErrorHandler) HandleError added in v0.6.0

func (e *InboundExhaustedErrorHandler) HandleError(_ context.Context, t *asynq.Task, err error)

type InboundProcessHandler added in v0.6.0

type InboundProcessHandler struct {
	// contains filtered or unexported fields
}

InboundProcessHandler processes inbound:process tasks — builds the inbound webhook payload and dispatches it via the webhook dispatcher.

func NewInboundProcessHandler added in v0.6.0

func NewInboundProcessHandler(
	repo *repositories.InboundEmailRepository,
	dispatcher *webhook.Dispatcher,
	baseURL string,
	hmacKey []byte,
) *InboundProcessHandler

func (*InboundProcessHandler) OnFailed added in v0.6.0

func (h *InboundProcessHandler) OnFailed(fn func())

OnFailed sets a callback invoked after a permanently failed inbound forward.

func (*InboundProcessHandler) OnForwarded added in v0.6.0

func (h *InboundProcessHandler) OnForwarded(fn func())

OnForwarded sets a callback invoked after a successful inbound forward.

func (*InboundProcessHandler) ProcessTask added in v0.6.0

func (h *InboundProcessHandler) ProcessTask(_ context.Context, t *asynq.Task) error

type InboundProcessPayload added in v0.6.0

type InboundProcessPayload struct {
	InboundEmailID uint `json:"inbound_email_id"`
}

type InboundWebhookPayload added in v0.6.0

type InboundWebhookPayload struct {
	Event       string                  `json:"event"`
	Timestamp   string                  `json:"timestamp"`
	InboundID   string                  `json:"inbound_id"`
	From        string                  `json:"from"`
	To          []string                `json:"to"`
	Subject     string                  `json:"subject"`
	TextBody    string                  `json:"text_body,omitempty"`
	HTMLBody    string                  `json:"html_body,omitempty"`
	Headers     map[string]string       `json:"headers,omitempty"`
	Attachments []InboundAttachmentView `json:"attachments,omitempty"`
	Size        int64                   `json:"size"`
	MessageID   string                  `json:"message_id,omitempty"`
	Source      string                  `json:"source"`
	ReceivedAt  string                  `json:"received_at"`
}

InboundWebhookPayload is the body posted to subscribers of email.inbound.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer enqueues email tasks into Redis via Asynq.

func NewProducer

func NewProducer(redisAddr, redisPassword string, maxRetries int) *Producer

func (*Producer) Close

func (p *Producer) Close() error

Close closes the underlying Asynq client.

func (*Producer) EnqueueCampaignBatch

func (p *Producer) EnqueueCampaignBatch(campaignID uint, delay time.Duration) error

EnqueueCampaignBatch enqueues a campaign batch processing task. A per-run random suffix keeps successive batches distinct while still letting us dedupe the *first* kick-off from Send (callers that want dedupe pass delay=0 after transitioning status atomically).

func (*Producer) EnqueueCampaignStart

func (p *Producer) EnqueueCampaignStart(campaignID uint) error

EnqueueCampaignStart enqueues a campaign start task (fan-out to subscribers). The TaskID is derived from the campaign ID so double-clicks and retries collapse into a single Redis entry instead of fanning out twice.

func (*Producer) EnqueueEmailSend

func (p *Producer) EnqueueEmailSend(emailID uint, queue string) error

EnqueueEmailSend enqueues an email for background delivery.

func (*Producer) EnqueueEmailSendAt

func (p *Producer) EnqueueEmailSendAt(emailID uint, queue string, sendAt time.Time) error

EnqueueEmailSendAt enqueues an email for delivery at the specified time.

func (*Producer) EnqueueInboundProcess added in v0.6.0

func (p *Producer) EnqueueInboundProcess(inboundEmailID uint) error

EnqueueInboundProcess enqueues an inbound-process task that dispatches the email.inbound webhook for a received message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL