Documentation
¶
Overview ¶
Package outbox implements a "Universal" Transactional Outbox for the Vertikon MCP ecosystem. Features: - Standardized event envelope (idempotency, trace/correlation, schema version, producer, privacy flags) - Native tenant-aware topic partitioning - DLQ integration with automatic promotion after max retries - Cleanup jobs for both main and DLQ tables - OpenTelemetry hooks - Helper APIs to save events atomically (within the same DB transaction as the business write)
Index ¶
- Constants
- func SaveGenericEvent(ctx context.Context, tx *sql.Tx, store OutboxStore, opts SaveEventOptions) error
- type Cleaner
- type CreateLeadRequest
- type Envelope
- type EventBus
- type EventRow
- type Lead
- type LeadHandler
- type OutboxStore
- type PostgresOutboxStore
- func (s *PostgresOutboxStore) CleanupOld(ctx context.Context, olderThan time.Duration) error
- func (s *PostgresOutboxStore) CleanupOldDLQ(ctx context.Context, olderThan time.Duration) error
- func (s *PostgresOutboxStore) GetUnprocessed(ctx context.Context, limit int) ([]*EventRow, error)
- func (s *PostgresOutboxStore) MarkAsFailed(ctx context.Context, eventID string, fail error, next time.Time) error
- func (s *PostgresOutboxStore) MarkAsPublished(ctx context.Context, eventID string) error
- func (s *PostgresOutboxStore) MoveToDLQ(ctx context.Context, eventID string, reason string) error
- func (s *PostgresOutboxStore) SaveWithTx(ctx context.Context, tx *sql.Tx, row *EventRow) error
- type PrivacyFlags
- type Publisher
- type PublisherOption
- func WithBackoff(base time.Duration) PublisherOption
- func WithInterval(d time.Duration) PublisherOption
- func WithMaxRetries(n int) PublisherOption
- func WithMaxWorkers(n int) PublisherOption
- func WithProducerName(name string) PublisherOption
- func WithTopicFunc(f func(tenantID, eventType string) string) PublisherOption
- type SaveEventOptions
Constants ¶
const CreateOutboxTablesSQL = `` /* 2182-byte string literal not displayed */
Variables ¶
This section is empty.
Functions ¶
func SaveGenericEvent ¶
func SaveGenericEvent(ctx context.Context, tx *sql.Tx, store OutboxStore, opts SaveEventOptions) error
Types ¶
type Cleaner ¶
type Cleaner struct {
// contains filtered or unexported fields
}
Cleaner is a small job to purge old rows from outbox and DLQ.
func NewCleaner ¶
func NewCleaner(store OutboxStore, outboxTTL, dlqTTL, interval time.Duration) *Cleaner
type CreateLeadRequest ¶
type Envelope ¶
type Envelope struct {
Event string `json:"event"` // e.g., "lead.created.v1"
SchemaVersion int `json:"schema_version"` // semantic version for payload schema (usually matches event suffix vN)
IdempotencyKey string `json:"idempotency_key"` // unique per side-effect to allow at-least-once consumers to dedupe
TraceID string `json:"trace_id"` // W3C trace id, copied from context span if present
CorrelationID string `json:"correlation_id"` // ties multiple events of the same business flow (e.g., lead_id)
TenantID string `json:"tenant_id"` // multi-tenant isolation
OccurredAt time.Time `json:"occurred_at"` // when the business action actually happened
Producer string `json:"producer"` // e.g., "mcp-integracao@1.2.3"
Privacy PrivacyFlags `json:"privacy"` // PII flags and consent hints
Payload json.RawMessage `json:"payload"` // actual event body
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Envelope is the standard Vertikon event envelope that rides on the bus.
type EventBus ¶
type EventBus interface {
// Publish pushes the envelope to a topic/subject.
Publish(ctx context.Context, topic string, env *Envelope) error
}
EventBus is the minimal abstraction over the transport (NATS, Kafka, etc.).
type EventRow ¶
type EventRow struct {
ID string `db:"id"`
TenantID string `db:"tenant_id"`
AggregateID string `db:"aggregate_id"`
AggregateType string `db:"aggregate_type"`
EventType string `db:"event_type"`
EventVersion int `db:"event_version"`
Payload json.RawMessage `db:"payload"`
Metadata json.RawMessage `db:"metadata"`
IdempotencyKey string `db:"idempotency_key"`
TraceID string `db:"trace_id"`
CorrelationID string `db:"correlation_id"`
CreatedAt time.Time `db:"created_at"`
ProcessedAt *time.Time `db:"processed_at"`
PublishedAt *time.Time `db:"published_at"`
RetryCount int `db:"retry_count"`
LastError *string `db:"last_error"`
ScheduledFor time.Time `db:"scheduled_for"`
PII bool `db:"pii"`
PIIFields json.RawMessage `db:"pii_fields"` // []string
Consent json.RawMessage `db:"consent"` // any
Producer string `db:"producer"`
}
EventRow represents a row in the outbox table.
type Lead ¶
Example domain helper mirroring your previous code (tokenized PII).
func (*Lead) GetTokenizedPII ¶
type LeadHandler ¶
type LeadHandler struct {
DB *sql.DB
Store OutboxStore
}
LeadHandler demonstrates atomic write + outbox save.
func (*LeadHandler) CreateLead ¶
func (h *LeadHandler) CreateLead(ctx context.Context, req *CreateLeadRequest) (*Lead, error)
type OutboxStore ¶
type OutboxStore interface {
SaveWithTx(ctx context.Context, tx *sql.Tx, row *EventRow) error
GetUnprocessed(ctx context.Context, limit int) ([]*EventRow, error)
MarkAsPublished(ctx context.Context, eventID string) error
MarkAsFailed(ctx context.Context, eventID string, err error, next time.Time) error
MoveToDLQ(ctx context.Context, eventID string, reason string) error
CleanupOld(ctx context.Context, olderThan time.Duration) error
CleanupOldDLQ(ctx context.Context, olderThan time.Duration) error
}
OutboxStore abstracts DB operations (implemented for Postgres below).
type PostgresOutboxStore ¶
type PostgresOutboxStore struct {
// contains filtered or unexported fields
}
func NewPostgresOutboxStore ¶
func NewPostgresOutboxStore(db *sql.DB) *PostgresOutboxStore
func (*PostgresOutboxStore) CleanupOld ¶
func (*PostgresOutboxStore) CleanupOldDLQ ¶
func (*PostgresOutboxStore) GetUnprocessed ¶
func (*PostgresOutboxStore) MarkAsFailed ¶
func (*PostgresOutboxStore) MarkAsPublished ¶
func (s *PostgresOutboxStore) MarkAsPublished(ctx context.Context, eventID string) error
func (*PostgresOutboxStore) SaveWithTx ¶
type PrivacyFlags ¶
type PrivacyFlags struct {
PII bool `json:"pii"` // whether payload contains PII (tokenized or raw)
Fields []string `json:"fields,omitempty"` // which fields carry PII (semantic labels, not raw names)
Consent any `json:"consent,omitempty"` // optional consent snapshot (true/false/object)
}
PrivacyFlags captures privacy-related details for compliance flows.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(store OutboxStore, bus EventBus, opts ...PublisherOption) *Publisher
type PublisherOption ¶
type PublisherOption func(*Publisher)
func WithBackoff ¶
func WithBackoff(base time.Duration) PublisherOption
func WithInterval ¶
func WithInterval(d time.Duration) PublisherOption
func WithMaxRetries ¶
func WithMaxRetries(n int) PublisherOption
func WithMaxWorkers ¶
func WithMaxWorkers(n int) PublisherOption
func WithProducerName ¶
func WithProducerName(name string) PublisherOption
func WithTopicFunc ¶
func WithTopicFunc(f func(tenantID, eventType string) string) PublisherOption
type SaveEventOptions ¶
type SaveEventOptions struct {
TenantID string
AggregateID string
AggregateType string
EventType string // e.g., "lead.created.v1"
EventVersion int
Payload any
Metadata map[string]any
IdempotencyKey string
CorrelationID string
PII bool
PIIFields []string
Consent any
Producer string
ScheduledFor time.Time // optional; defaults to now
}
SaveGenericEvent persists an outbox event atomically inside the given transaction.