outbox

package module
v0.0.0-...-3770506 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2025 License: MIT Imports: 11 Imported by: 0

README

vertikon

mcp vertikon

Documentation

Overview

Package outbox implements a "Universal" Transactional Outbox for the Vertikon MCP ecosystem. Features: - Standardized event envelope (idempotency, trace/correlation, schema version, producer, privacy flags) - Native tenant-aware topic partitioning - DLQ integration with automatic promotion after max retries - Cleanup jobs for both main and DLQ tables - OpenTelemetry hooks - Helper APIs to save events atomically (within the same DB transaction as the business write)

Index

Constants

View Source
const CreateOutboxTablesSQL = `` /* 2182-byte string literal not displayed */

Variables

This section is empty.

Functions

func SaveGenericEvent

func SaveGenericEvent(ctx context.Context, tx *sql.Tx, store OutboxStore, opts SaveEventOptions) error

Types

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner is a small job to purge old rows from outbox and DLQ.

func NewCleaner

func NewCleaner(store OutboxStore, outboxTTL, dlqTTL, interval time.Duration) *Cleaner

func (*Cleaner) Start

func (c *Cleaner) Start(ctx context.Context) error

type CreateLeadRequest

type CreateLeadRequest struct {
	TenantID string
	Source   string
}

type Envelope

type Envelope struct {
	Event          string                 `json:"event"`           // e.g., "lead.created.v1"
	SchemaVersion  int                    `json:"schema_version"`  // semantic version for payload schema (usually matches event suffix vN)
	IdempotencyKey string                 `json:"idempotency_key"` // unique per side-effect to allow at-least-once consumers to dedupe
	TraceID        string                 `json:"trace_id"`        // W3C trace id, copied from context span if present
	CorrelationID  string                 `json:"correlation_id"`  // ties multiple events of the same business flow (e.g., lead_id)
	TenantID       string                 `json:"tenant_id"`       // multi-tenant isolation
	OccurredAt     time.Time              `json:"occurred_at"`     // when the business action actually happened
	Producer       string                 `json:"producer"`        // e.g., "mcp-integracao@1.2.3"
	Privacy        PrivacyFlags           `json:"privacy"`         // PII flags and consent hints
	Payload        json.RawMessage        `json:"payload"`         // actual event body
	Metadata       map[string]interface{} `json:"metadata,omitempty"`
}

Envelope is the standard Vertikon event envelope that rides on the bus.

type EventBus

type EventBus interface {
	// Publish pushes the envelope to a topic/subject.
	Publish(ctx context.Context, topic string, env *Envelope) error
}

EventBus is the minimal abstraction over the transport (NATS, Kafka, etc.).

type EventRow

type EventRow struct {
	ID             string          `db:"id"`
	TenantID       string          `db:"tenant_id"`
	AggregateID    string          `db:"aggregate_id"`
	AggregateType  string          `db:"aggregate_type"`
	EventType      string          `db:"event_type"`
	EventVersion   int             `db:"event_version"`
	Payload        json.RawMessage `db:"payload"`
	Metadata       json.RawMessage `db:"metadata"`
	IdempotencyKey string          `db:"idempotency_key"`
	TraceID        string          `db:"trace_id"`
	CorrelationID  string          `db:"correlation_id"`
	CreatedAt      time.Time       `db:"created_at"`
	ProcessedAt    *time.Time      `db:"processed_at"`
	PublishedAt    *time.Time      `db:"published_at"`
	RetryCount     int             `db:"retry_count"`
	LastError      *string         `db:"last_error"`
	ScheduledFor   time.Time       `db:"scheduled_for"`
	PII            bool            `db:"pii"`
	PIIFields      json.RawMessage `db:"pii_fields"` // []string
	Consent        json.RawMessage `db:"consent"`    // any
	Producer       string          `db:"producer"`
}

EventRow represents a row in the outbox table.

type Lead

type Lead struct {
	ID       string
	TenantID string
	Source   string
	UTM      map[string]string
	Consent  any
}

Example domain helper mirroring your previous code (tokenized PII).

func (*Lead) GetTokenizedPII

func (l *Lead) GetTokenizedPII() map[string]string

type LeadHandler

type LeadHandler struct {
	DB    *sql.DB
	Store OutboxStore
}

LeadHandler demonstrates atomic write + outbox save.

func (*LeadHandler) CreateLead

func (h *LeadHandler) CreateLead(ctx context.Context, req *CreateLeadRequest) (*Lead, error)

type OutboxStore

type OutboxStore interface {
	SaveWithTx(ctx context.Context, tx *sql.Tx, row *EventRow) error
	GetUnprocessed(ctx context.Context, limit int) ([]*EventRow, error)
	MarkAsPublished(ctx context.Context, eventID string) error
	MarkAsFailed(ctx context.Context, eventID string, err error, next time.Time) error
	MoveToDLQ(ctx context.Context, eventID string, reason string) error
	CleanupOld(ctx context.Context, olderThan time.Duration) error
	CleanupOldDLQ(ctx context.Context, olderThan time.Duration) error
}

OutboxStore abstracts DB operations (implemented for Postgres below).

type PostgresOutboxStore

type PostgresOutboxStore struct {
	// contains filtered or unexported fields
}

func NewPostgresOutboxStore

func NewPostgresOutboxStore(db *sql.DB) *PostgresOutboxStore

func (*PostgresOutboxStore) CleanupOld

func (s *PostgresOutboxStore) CleanupOld(ctx context.Context, olderThan time.Duration) error

func (*PostgresOutboxStore) CleanupOldDLQ

func (s *PostgresOutboxStore) CleanupOldDLQ(ctx context.Context, olderThan time.Duration) error

func (*PostgresOutboxStore) GetUnprocessed

func (s *PostgresOutboxStore) GetUnprocessed(ctx context.Context, limit int) ([]*EventRow, error)

func (*PostgresOutboxStore) MarkAsFailed

func (s *PostgresOutboxStore) MarkAsFailed(ctx context.Context, eventID string, fail error, next time.Time) error

func (*PostgresOutboxStore) MarkAsPublished

func (s *PostgresOutboxStore) MarkAsPublished(ctx context.Context, eventID string) error

func (*PostgresOutboxStore) MoveToDLQ

func (s *PostgresOutboxStore) MoveToDLQ(ctx context.Context, eventID string, reason string) error

func (*PostgresOutboxStore) SaveWithTx

func (s *PostgresOutboxStore) SaveWithTx(ctx context.Context, tx *sql.Tx, row *EventRow) error

type PrivacyFlags

type PrivacyFlags struct {
	PII     bool     `json:"pii"`               // whether payload contains PII (tokenized or raw)
	Fields  []string `json:"fields,omitempty"`  // which fields carry PII (semantic labels, not raw names)
	Consent any      `json:"consent,omitempty"` // optional consent snapshot (true/false/object)
}

PrivacyFlags captures privacy-related details for compliance flows.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(store OutboxStore, bus EventBus, opts ...PublisherOption) *Publisher

func (*Publisher) Start

func (p *Publisher) Start(ctx context.Context) error

Start runs the outbox pump loop until context is canceled.

type PublisherOption

type PublisherOption func(*Publisher)

func WithBackoff

func WithBackoff(base time.Duration) PublisherOption

func WithInterval

func WithInterval(d time.Duration) PublisherOption

func WithMaxRetries

func WithMaxRetries(n int) PublisherOption

func WithMaxWorkers

func WithMaxWorkers(n int) PublisherOption

func WithProducerName

func WithProducerName(name string) PublisherOption

func WithTopicFunc

func WithTopicFunc(f func(tenantID, eventType string) string) PublisherOption

type SaveEventOptions

type SaveEventOptions struct {
	TenantID       string
	AggregateID    string
	AggregateType  string
	EventType      string // e.g., "lead.created.v1"
	EventVersion   int
	Payload        any
	Metadata       map[string]any
	IdempotencyKey string
	CorrelationID  string
	PII            bool
	PIIFields      []string
	Consent        any
	Producer       string
	ScheduledFor   time.Time // optional; defaults to now
}

SaveGenericEvent persists an outbox event atomically inside the given transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL