Documentation
¶
Index ¶
- Constants
- func ChainErrorHandlers(handlers ...asynq.ErrorHandler) asynq.ErrorHandler
- func NewCampaignBatchTask(campaignID uint, opts ...asynq.Option) (*asynq.Task, error)
- func NewCampaignStartTask(campaignID uint, opts ...asynq.Option) (*asynq.Task, error)
- func NewEmailSendTask(emailID uint, opts ...asynq.Option) (*asynq.Task, error)
- func NewInboundProcessTask(id uint, opts ...asynq.Option) (*asynq.Task, error)
- func SignInboundAttachmentToken(key []byte, uuid string, idx int) string
- func VerifyInboundAttachmentToken(key []byte, uuid string, idx int, token string) bool
- type CampaignPayload
- type CampaignProcessor
- type DailyReportHandler
- type EmailSendHandler
- func (h *EmailSendHandler) OnFailed(fn func())
- func (h *EmailSendHandler) OnSent(fn func())
- func (h *EmailSendHandler) ProcessTask(ctx context.Context, t *asynq.Task) error
- func (h *EmailSendHandler) SetBlobStore(bs blob.Store)
- func (h *EmailSendHandler) SetCampaignMessageRepo(r *repositories.CampaignMessageRepository)
- func (h *EmailSendHandler) SetCampaignRepo(r *repositories.CampaignRepository)
- func (h *EmailSendHandler) SetStamper(s *email.Stamper)
- type EmailSendPayload
- type ExhaustedErrorHandler
- type InboundAttachmentView
- type InboundExhaustedErrorHandler
- type InboundProcessHandler
- type InboundProcessPayload
- type InboundWebhookPayload
- type Producer
- func (p *Producer) Close() error
- func (p *Producer) EnqueueCampaignBatch(campaignID uint, delay time.Duration) error
- func (p *Producer) EnqueueCampaignStart(campaignID uint) error
- func (p *Producer) EnqueueEmailSend(emailID uint, queue string) error
- func (p *Producer) EnqueueEmailSendAt(emailID uint, queue string, sendAt time.Time) error
- func (p *Producer) EnqueueInboundProcess(inboundEmailID uint) error
Constants ¶
const ( TypeEmailSend = "email:send" TypeCampaignStart = "campaign:start" TypeCampaignBatch = "campaign:batch" TypeInboundProcess = "inbound:process" QueueTransactional = "transactional" QueueBulk = "bulk" QueueLow = "low" )
Variables ¶
This section is empty.
Functions ¶
func ChainErrorHandlers ¶ added in v0.6.0
func ChainErrorHandlers(handlers ...asynq.ErrorHandler) asynq.ErrorHandler
ChainErrorHandlers returns an asynq.ErrorHandler that invokes each handler in order. Each inner handler is expected to guard on task type and no-op otherwise.
func NewCampaignBatchTask ¶
func NewCampaignStartTask ¶
func NewEmailSendTask ¶
NewEmailSendTask creates an Asynq task to send an email.
func NewInboundProcessTask ¶ added in v0.6.0
NewInboundProcessTask creates an Asynq task to dispatch an inbound email's email.inbound webhook to subscribers.
func SignInboundAttachmentToken ¶ added in v0.6.0
SignInboundAttachmentToken creates an HMAC-signed token authorizing access to a specific inbound attachment.
Types ¶
type CampaignPayload ¶
type CampaignPayload struct {
CampaignID uint `json:"campaign_id"`
}
type CampaignProcessor ¶
type CampaignProcessor struct {
// contains filtered or unexported fields
}
CampaignProcessor handles campaign:start and campaign:batch tasks.
func NewCampaignProcessor ¶
func NewCampaignProcessor( campaignRepo *repositories.CampaignRepository, messageRepo *repositories.CampaignMessageRepository, listRepo *repositories.SubscriberListRepository, subscriberRepo *repositories.SubscriberRepository, emailRepo *repositories.EmailRepository, templateRepo *repositories.TemplateRepository, versionRepo *repositories.TemplateVersionRepository, localizationRepo *repositories.TemplateLocalizationRepository, trackingService *tracking.Service, producer *Producer, dispatcher *webhook.Dispatcher, ) *CampaignProcessor
func (*CampaignProcessor) HandleCampaignBatch ¶
HandleCampaignBatch processes a campaign:batch task. It loads a batch of pending messages, creates Email records, and enqueues them.
func (*CampaignProcessor) HandleCampaignStart ¶
HandleCampaignStart processes a campaign:start task.
type DailyReportHandler ¶ added in v0.5.1
type DailyReportHandler struct {
// contains filtered or unexported fields
}
DailyReportHandler processes daily report tasks from the Asynq queue.
func NewDailyReportHandler ¶ added in v0.5.1
func NewDailyReportHandler( notifier *notification.Service, analyticsRepo *repositories.AnalyticsRepository, bounceRepo *repositories.BounceRepository, ) *DailyReportHandler
NewDailyReportHandler creates a new daily report task handler.
func (*DailyReportHandler) ProcessTask ¶ added in v0.5.1
ProcessTask handles a daily report task for a single user.
type EmailSendHandler ¶
type EmailSendHandler struct {
// contains filtered or unexported fields
}
EmailSendHandler processes email:send tasks from the Asynq queue.
func NewEmailSendHandler ¶
func NewEmailSendHandler( emailRepo *repositories.EmailRepository, smtpRepo *repositories.SMTPRepository, serverRepo *repositories.ServerRepository, domainRepo *repositories.DomainRepository, contactRepo *repositories.ContactRepository, dispatcher *webhook.Dispatcher, ) *EmailSendHandler
func (*EmailSendHandler) OnFailed ¶
func (h *EmailSendHandler) OnFailed(fn func())
OnFailed sets a callback invoked after each permanently failed email send.
func (*EmailSendHandler) OnSent ¶
func (h *EmailSendHandler) OnSent(fn func())
OnSent sets a callback invoked after each successful email send.
func (*EmailSendHandler) ProcessTask ¶
ProcessTask handles an email:send task.
func (*EmailSendHandler) SetBlobStore ¶
func (h *EmailSendHandler) SetBlobStore(bs blob.Store)
SetBlobStore sets the blob storage backend for fetching attachment content.
func (*EmailSendHandler) SetCampaignMessageRepo ¶ added in v0.5.0
func (h *EmailSendHandler) SetCampaignMessageRepo(r *repositories.CampaignMessageRepository)
SetCampaignMessageRepo sets the campaign message repository so that campaign message statuses are updated when emails are sent or fail.
func (*EmailSendHandler) SetCampaignRepo ¶ added in v0.6.0
func (h *EmailSendHandler) SetCampaignRepo(r *repositories.CampaignRepository)
SetCampaignRepo lets the handler consult the campaign's live status before sending. When set, already-queued emails for cancelled/paused campaigns are dropped instead of dispatched.
func (*EmailSendHandler) SetStamper ¶ added in v0.6.0
func (h *EmailSendHandler) SetStamper(s *email.Stamper)
SetStamper enables X-Mailer/X-Posta-* header stamping and optional X-Posta-Signature HMAC. Nil disables stamping.
type EmailSendPayload ¶
type EmailSendPayload struct {
EmailID uint `json:"email_id"`
}
type ExhaustedErrorHandler ¶
type ExhaustedErrorHandler struct {
// contains filtered or unexported fields
}
ExhaustedErrorHandler marks emails as permanently failed when Asynq exhausts all retries. It implements asynq.ErrorHandler.
func NewExhaustedErrorHandler ¶
func NewExhaustedErrorHandler(emailRepo *repositories.EmailRepository, dispatcher *webhook.Dispatcher, onFailed func()) *ExhaustedErrorHandler
func (*ExhaustedErrorHandler) HandleError ¶
type InboundAttachmentView ¶ added in v0.6.0
type InboundAttachmentView struct {
Filename string `json:"filename"`
ContentType string `json:"content_type"`
Size int64 `json:"size"`
URL string `json:"url,omitempty"`
}
InboundAttachmentView is the attachment shape included in the email.inbound webhook. No raw content — only metadata plus a signed download URL (when configured).
type InboundExhaustedErrorHandler ¶ added in v0.6.0
type InboundExhaustedErrorHandler struct {
// contains filtered or unexported fields
}
InboundExhaustedErrorHandler marks inbound emails permanently failed once Asynq exhausts retries for the inbound:process task.
func NewInboundExhaustedErrorHandler ¶ added in v0.6.0
func NewInboundExhaustedErrorHandler(repo *repositories.InboundEmailRepository, onFailed func()) *InboundExhaustedErrorHandler
func (*InboundExhaustedErrorHandler) HandleError ¶ added in v0.6.0
type InboundProcessHandler ¶ added in v0.6.0
type InboundProcessHandler struct {
// contains filtered or unexported fields
}
InboundProcessHandler processes inbound:process tasks — builds the inbound webhook payload and dispatches it via the webhook dispatcher.
func NewInboundProcessHandler ¶ added in v0.6.0
func NewInboundProcessHandler( repo *repositories.InboundEmailRepository, dispatcher *webhook.Dispatcher, baseURL string, hmacKey []byte, ) *InboundProcessHandler
func (*InboundProcessHandler) OnFailed ¶ added in v0.6.0
func (h *InboundProcessHandler) OnFailed(fn func())
OnFailed sets a callback invoked after a permanently failed inbound forward.
func (*InboundProcessHandler) OnForwarded ¶ added in v0.6.0
func (h *InboundProcessHandler) OnForwarded(fn func())
OnForwarded sets a callback invoked after a successful inbound forward.
func (*InboundProcessHandler) ProcessTask ¶ added in v0.6.0
type InboundProcessPayload ¶ added in v0.6.0
type InboundProcessPayload struct {
InboundEmailID uint `json:"inbound_email_id"`
}
type InboundWebhookPayload ¶ added in v0.6.0
type InboundWebhookPayload struct {
Event string `json:"event"`
Timestamp string `json:"timestamp"`
InboundID string `json:"inbound_id"`
From string `json:"from"`
To []string `json:"to"`
Subject string `json:"subject"`
TextBody string `json:"text_body,omitempty"`
HTMLBody string `json:"html_body,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
Attachments []InboundAttachmentView `json:"attachments,omitempty"`
Size int64 `json:"size"`
MessageID string `json:"message_id,omitempty"`
Source string `json:"source"`
ReceivedAt string `json:"received_at"`
}
InboundWebhookPayload is the body posted to subscribers of email.inbound.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer enqueues email tasks into Redis via Asynq.
func NewProducer ¶
func (*Producer) EnqueueCampaignBatch ¶
EnqueueCampaignBatch enqueues a campaign batch processing task. A per-run random suffix keeps successive batches distinct while still letting us dedupe the *first* kick-off from Send (callers that want dedupe pass delay=0 after transitioning status atomically).
func (*Producer) EnqueueCampaignStart ¶
EnqueueCampaignStart enqueues a campaign start task (fan-out to subscribers). The TaskID is derived from the campaign ID so double-clicks and retries collapse into a single Redis entry instead of fanning out twice.
func (*Producer) EnqueueEmailSend ¶
EnqueueEmailSend enqueues an email for background delivery.
func (*Producer) EnqueueEmailSendAt ¶
EnqueueEmailSendAt enqueues an email for delivery at the specified time.
func (*Producer) EnqueueInboundProcess ¶ added in v0.6.0
EnqueueInboundProcess enqueues an inbound-process task that dispatches the email.inbound webhook for a received message.