shared

package
v0.0.0-...-edb1025 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultTableName is the shared physical outbox table used by relay and cdc.
	DefaultTableName = "xevent_outbox_records"
)

Variables

View Source
var ErrUnsupportedDialect = errors.New("xevent outbox shared gorm dialect is unsupported")

ErrUnsupportedDialect indicates the configured SQL dialect is not supported.

Functions

func DBRecordToDebeziumRecord

func DBRecordToDebeziumRecord(record DBRecord) debezium.Record

DBRecordToDebeziumRecord converts a DBRecord back to a debezium Record.

func DBRecordToRelayRecord

func DBRecordToRelayRecord(record DBRecord) outbox.Record

DBRecordToRelayRecord converts a DBRecord back to a relay Record.

func IsRelayCutoverEligible

func IsRelayCutoverEligible(record DBRecord, now time.Time) bool

IsRelayCutoverEligible reports whether a relay record is eligible for cutover to the CDC path at the given time.

func NormalizeContext

func NormalizeContext(ctx context.Context) context.Context

NormalizeContext returns context.Background when ctx is nil, otherwise returns ctx unchanged.

func NormalizeRelayClaimRequest

func NormalizeRelayClaimRequest(req outbox.ClaimRequest) (outbox.ClaimRequest, error)

NormalizeRelayClaimRequest exposes relay request normalization to package-local tests.

func NormalizeRelayFailRequest

func NormalizeRelayFailRequest(req outbox.FailRequest) (outbox.FailRequest, error)

NormalizeRelayFailRequest exposes relay request normalization to package-local tests.

func NormalizeRelayMarkSentRequest

func NormalizeRelayMarkSentRequest(req outbox.MarkSentRequest) (outbox.MarkSentRequest, error)

NormalizeRelayMarkSentRequest exposes relay request normalization to package-local tests.

func NormalizeRelayRetryRequest

func NormalizeRelayRetryRequest(req outbox.RetryRequest) (outbox.RetryRequest, error)

NormalizeRelayRetryRequest exposes relay request normalization to package-local tests.

func NormalizeTime

func NormalizeTime(value time.Time, fallback func() time.Time) time.Time

NormalizeTime returns fallback when value is zero, otherwise returns value normalised to UTC.

Types

type CutoverRequest

type CutoverRequest struct {
	// Now is the reference time for eligibility checks.
	Now time.Time
	// BatchSize limits how many records are migrated in one transaction.
	BatchSize int
}

CutoverRequest controls one relay backlog cutover run.

type DBRecord

type DBRecord struct {
	// Identity: auto-incremented primary key and logical identifiers.
	ID            uint64  `gorm:"primaryKey;autoIncrement;index:idx_xevent_outbox_mode_status_partition_available_id,priority:5"`
	MessageID     string  `gorm:"column:message_id;size:36;not null;default:''"`
	Mode          Mode    `` /* 153-byte string literal not displayed */
	HandoffFromID *uint64 `gorm:"uniqueIndex:idx_xevent_outbox_handoff_from_id"`

	// Event metadata: describes the domain event carried by this row.
	EventType    string `gorm:"size:255;not null"`
	EventID      string `gorm:"size:255;not null;default:''"`
	PartitionKey string `gorm:"size:255;not null;default:'';index:idx_xevent_outbox_mode_status_partition_available_id,priority:3"`
	Payload      []byte `gorm:"not null"`
	Topic        string `gorm:"size:255;not null;default:''"`

	// Timing: controls when the record becomes eligible for processing.
	AvailableAt time.Time `gorm:"not null;index:idx_xevent_outbox_mode_status_partition_available_id,priority:4"`

	// Status and retry tracking: tracks the lifecycle and failure state.
	Status    string `gorm:"type:varchar(16);not null;default:'';index:idx_xevent_outbox_mode_status_partition_available_id,priority:2"`
	Attempts  int    `gorm:"not null;default:0"`
	LastError string `gorm:"type:text"`

	// Claim tracking: which owner is currently processing this record and until when.
	ClaimOwner string `gorm:"size:255;not null;default:''"`
	ClaimUntil *time.Time

	// Timestamps: lifecycle milestones.
	SentAt    *time.Time
	CreatedAt time.Time `gorm:"not null;index:idx_xevent_outbox_mode_created_at,priority:2"`
	UpdatedAt time.Time `gorm:"not null"`
}

DBRecord is the shared physical outbox row.

func CloneDBRecord

func CloneDBRecord(record DBRecord) DBRecord

CloneDBRecord returns a deep copy of record with all slice and pointer fields duplicated so that mutations to the original do not affect the clone.

func DebeziumRecordToDBRecord

func DebeziumRecordToDBRecord(record debezium.Record, now time.Time) (DBRecord, error)

DebeziumRecordToDBRecord converts a debezium Record into a DBRecord with validation. It returns an error if EventType or Topic is empty.

func PrepareCutoverDebeziumRecord

func PrepareCutoverDebeziumRecord(record DBRecord, now time.Time) (DBRecord, error)

PrepareCutoverDebeziumRecord builds a CDC DBRecord from a relay DBRecord for handoff during cutover. The resulting record references the source relay row via HandoffFromID.

func RelayRecordToDBRecord

func RelayRecordToDBRecord(record outbox.Record, now time.Time) DBRecord

RelayRecordToDBRecord converts a relay Record into a DBRecord, applying defaults for MessageID, Mode, Status, and normalising all timestamps to UTC.

func (DBRecord) TableName

func (DBRecord) TableName() string

TableName returns the default shared table name.

type Dialect

type Dialect string

Dialect selects the SQL strategy used by GORMStore.

const (
	// DialectStandard uses portable SQL without dialect-specific locking.
	DialectStandard Dialect = "standard"
	// DialectMySQL uses MySQL-specific claim SQL with FOR UPDATE SKIP LOCKED.
	DialectMySQL Dialect = "mysql"
	// DialectPostgres uses PostgreSQL-specific claim SQL with RETURNING.
	DialectPostgres Dialect = "postgres"
)

func DetectDialect

func DetectDialect(db *gorm.DB) Dialect

DetectDialect inspects the GORM Dialector name and returns the matching Dialect constant, defaulting to DialectStandard.

func NormalizeDialect

func NormalizeDialect(dialect Dialect) (Dialect, error)

NormalizeDialect lowercases and trims the dialect value, returning an error for unsupported values.

func ResolveDialect

func ResolveDialect(db *gorm.DB, configured Dialect) (Dialect, error)

ResolveDialect returns the normalised dialect when configured is non-empty, otherwise auto-detects from db.

type GORMStore

type GORMStore struct {
	// contains filtered or unexported fields
}

GORMStore persists shared outbox rows through GORM.

func NewGORMStore

func NewGORMStore(cfg GORMStoreConfig) (*GORMStore, error)

NewGORMStore creates a configured shared GORM-backed outbox store.

func (*GORMStore) AppendDebezium

func (s *GORMStore) AppendDebezium(ctx context.Context, record *debezium.Record) error

AppendDebezium inserts one CDC outbox row and updates record with the database-assigned values.

func (*GORMStore) AppendRelay

func (s *GORMStore) AppendRelay(ctx context.Context, record *outbox.Record) error

AppendRelay inserts one relay outbox row and updates record with the database-assigned values.

func (*GORMStore) ClaimRelay

func (s *GORMStore) ClaimRelay(ctx context.Context, req outbox.ClaimRequest) ([]outbox.Record, error)

ClaimRelay reserves one batch of eligible relay records ordered by available_at then id, using the dialect-appropriate locking strategy.

func (*GORMStore) ClaimRelayMySQLForTest

func (s *GORMStore) ClaimRelayMySQLForTest(
	tx *gorm.DB,
	req outbox.ClaimRequest,
	claimUntil time.Time,
) ([]DBRecord, error)

ClaimRelayMySQLForTest exposes the MySQL-specific relay claim path to package-local tests.

func (*GORMStore) ClaimRelayPostgresForTest

func (s *GORMStore) ClaimRelayPostgresForTest(
	tx *gorm.DB,
	req outbox.ClaimRequest,
	claimUntil time.Time,
) ([]DBRecord, error)

ClaimRelayPostgresForTest exposes the PostgreSQL-specific relay claim path to package-local tests.

func (*GORMStore) ClaimSelectedRelayRecordsForTest

func (s *GORMStore) ClaimSelectedRelayRecordsForTest(
	tx *gorm.DB,
	ids []uint64,
	req outbox.ClaimRequest,
	claimUntil time.Time,
) ([]DBRecord, error)

ClaimSelectedRelayRecordsForTest exposes the optimistic claim update helper to package-local tests.

func (*GORMStore) CutoverRelayBacklog

func (s *GORMStore) CutoverRelayBacklog(ctx context.Context, req CutoverRequest) (int64, error)

CutoverRelayBacklog migrates eligible relay rows to CDC rows in batches, marking each source relay row as handed off. It returns the total number of rows migrated.

func (*GORMStore) DeleteDebeziumBefore

func (s *GORMStore) DeleteDebeziumBefore(ctx context.Context, cutoff time.Time, limit int) (int64, error)

DeleteDebeziumBefore deletes up to limit CDC rows whose created_at is older than cutoff, returning the number of rows deleted.

func (*GORMStore) MarkRelayFailed

func (s *GORMStore) MarkRelayFailed(ctx context.Context, req outbox.FailRequest) error

MarkRelayFailed marks one claimed relay record as permanently failed.

func (*GORMStore) MarkRelaySent

func (s *GORMStore) MarkRelaySent(ctx context.Context, req outbox.MarkSentRequest) error

MarkRelaySent marks one claimed relay record as sent.

func (*GORMStore) RetryRelay

func (s *GORMStore) RetryRelay(ctx context.Context, req outbox.RetryRequest) error

RetryRelay requeues one claimed relay record for a later attempt.

func (*GORMStore) SelectClaimCandidatesForTest

func (s *GORMStore) SelectClaimCandidatesForTest(
	tx *gorm.DB,
	sqlText string,
	args ...any,
) ([]DBRecord, error)

SelectClaimCandidatesForTest exposes the raw candidate query helper to package-local tests.

type GORMStoreConfig

type GORMStoreConfig struct {
	// DB is the GORM database handle used for all operations.
	DB *gorm.DB
	// TableName overrides the default outbox table name when non-empty.
	TableName string
	// Dialect selects the SQL strategy; auto-detected when empty.
	Dialect Dialect
	// SessionFromContext optionally extracts a per-request GORM session from ctx.
	SessionFromContext func(context.Context) *gorm.DB
}

GORMStoreConfig configures the shared GORM-backed store implementation.

type Mode

type Mode string

Mode identifies which publish strategy owns one persisted row.

const (
	// ModeRelay indicates the row is managed by the relay publish path.
	ModeRelay Mode = "relay"
	// ModeCDC indicates the row is managed by the CDC (Debezium) publish path.
	ModeCDC Mode = "cdc"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL