Documentation
¶
Index ¶
- Constants
- Variables
- func DBRecordToDebeziumRecord(record DBRecord) debezium.Record
- func DBRecordToRelayRecord(record DBRecord) outbox.Record
- func IsRelayCutoverEligible(record DBRecord, now time.Time) bool
- func NormalizeContext(ctx context.Context) context.Context
- func NormalizeRelayClaimRequest(req outbox.ClaimRequest) (outbox.ClaimRequest, error)
- func NormalizeRelayFailRequest(req outbox.FailRequest) (outbox.FailRequest, error)
- func NormalizeRelayMarkSentRequest(req outbox.MarkSentRequest) (outbox.MarkSentRequest, error)
- func NormalizeRelayRetryRequest(req outbox.RetryRequest) (outbox.RetryRequest, error)
- func NormalizeTime(value time.Time, fallback func() time.Time) time.Time
- type CutoverRequest
- type DBRecord
- type Dialect
- type GORMStore
- func (s *GORMStore) AppendDebezium(ctx context.Context, record *debezium.Record) error
- func (s *GORMStore) AppendRelay(ctx context.Context, record *outbox.Record) error
- func (s *GORMStore) ClaimRelay(ctx context.Context, req outbox.ClaimRequest) ([]outbox.Record, error)
- func (s *GORMStore) ClaimRelayMySQLForTest(tx *gorm.DB, req outbox.ClaimRequest, claimUntil time.Time) ([]DBRecord, error)
- func (s *GORMStore) ClaimRelayPostgresForTest(tx *gorm.DB, req outbox.ClaimRequest, claimUntil time.Time) ([]DBRecord, error)
- func (s *GORMStore) ClaimSelectedRelayRecordsForTest(tx *gorm.DB, ids []uint64, req outbox.ClaimRequest, claimUntil time.Time) ([]DBRecord, error)
- func (s *GORMStore) CutoverRelayBacklog(ctx context.Context, req CutoverRequest) (int64, error)
- func (s *GORMStore) DeleteDebeziumBefore(ctx context.Context, cutoff time.Time, limit int) (int64, error)
- func (s *GORMStore) MarkRelayFailed(ctx context.Context, req outbox.FailRequest) error
- func (s *GORMStore) MarkRelaySent(ctx context.Context, req outbox.MarkSentRequest) error
- func (s *GORMStore) RetryRelay(ctx context.Context, req outbox.RetryRequest) error
- func (s *GORMStore) SelectClaimCandidatesForTest(tx *gorm.DB, sqlText string, args ...any) ([]DBRecord, error)
- type GORMStoreConfig
- type Mode
Constants ¶
const (
// DefaultTableName is the shared physical outbox table used by relay and cdc.
DefaultTableName = "xevent_outbox_records"
)
Variables ¶
var ErrUnsupportedDialect = errors.New("xevent outbox shared gorm dialect is unsupported")
ErrUnsupportedDialect indicates the configured SQL dialect is not supported.
Functions ¶
func DBRecordToDebeziumRecord ¶
DBRecordToDebeziumRecord converts a DBRecord back to a debezium Record.
func DBRecordToRelayRecord ¶
DBRecordToRelayRecord converts a DBRecord back to a relay Record.
func IsRelayCutoverEligible ¶
IsRelayCutoverEligible reports whether a relay record is eligible for cutover to the CDC path at the given time.
func NormalizeContext ¶
NormalizeContext returns context.Background when ctx is nil, otherwise returns ctx unchanged.
func NormalizeRelayClaimRequest ¶
func NormalizeRelayClaimRequest(req outbox.ClaimRequest) (outbox.ClaimRequest, error)
NormalizeRelayClaimRequest exposes relay request normalization to package-local tests.
func NormalizeRelayFailRequest ¶
func NormalizeRelayFailRequest(req outbox.FailRequest) (outbox.FailRequest, error)
NormalizeRelayFailRequest exposes relay request normalization to package-local tests.
func NormalizeRelayMarkSentRequest ¶
func NormalizeRelayMarkSentRequest(req outbox.MarkSentRequest) (outbox.MarkSentRequest, error)
NormalizeRelayMarkSentRequest exposes relay request normalization to package-local tests.
func NormalizeRelayRetryRequest ¶
func NormalizeRelayRetryRequest(req outbox.RetryRequest) (outbox.RetryRequest, error)
NormalizeRelayRetryRequest exposes relay request normalization to package-local tests.
Types ¶
type CutoverRequest ¶
type CutoverRequest struct {
// Now is the reference time for eligibility checks.
Now time.Time
// BatchSize limits how many records are migrated in one transaction.
BatchSize int
}
CutoverRequest controls one relay backlog cutover run.
type DBRecord ¶
type DBRecord struct {
// Identity: auto-incremented primary key and logical identifiers.
ID uint64 `gorm:"primaryKey;autoIncrement;index:idx_xevent_outbox_mode_status_partition_available_id,priority:5"`
MessageID string `gorm:"column:message_id;size:36;not null;default:''"`
Mode Mode `` /* 153-byte string literal not displayed */
HandoffFromID *uint64 `gorm:"uniqueIndex:idx_xevent_outbox_handoff_from_id"`
// Event metadata: describes the domain event carried by this row.
EventType string `gorm:"size:255;not null"`
EventID string `gorm:"size:255;not null;default:''"`
PartitionKey string `gorm:"size:255;not null;default:'';index:idx_xevent_outbox_mode_status_partition_available_id,priority:3"`
Payload []byte `gorm:"not null"`
Topic string `gorm:"size:255;not null;default:''"`
// Timing: controls when the record becomes eligible for processing.
AvailableAt time.Time `gorm:"not null;index:idx_xevent_outbox_mode_status_partition_available_id,priority:4"`
// Status and retry tracking: tracks the lifecycle and failure state.
Status string `gorm:"type:varchar(16);not null;default:'';index:idx_xevent_outbox_mode_status_partition_available_id,priority:2"`
Attempts int `gorm:"not null;default:0"`
LastError string `gorm:"type:text"`
// Claim tracking: which owner is currently processing this record and until when.
ClaimOwner string `gorm:"size:255;not null;default:''"`
ClaimUntil *time.Time
// Timestamps: lifecycle milestones.
SentAt *time.Time
CreatedAt time.Time `gorm:"not null;index:idx_xevent_outbox_mode_created_at,priority:2"`
UpdatedAt time.Time `gorm:"not null"`
}
DBRecord is the shared physical outbox row.
func CloneDBRecord ¶
CloneDBRecord returns a deep copy of record with all slice and pointer fields duplicated so that mutations to the original do not affect the clone.
func DebeziumRecordToDBRecord ¶
DebeziumRecordToDBRecord converts a debezium Record into a DBRecord with validation. It returns an error if EventType or Topic is empty.
func PrepareCutoverDebeziumRecord ¶
PrepareCutoverDebeziumRecord builds a CDC DBRecord from a relay DBRecord for handoff during cutover. The resulting record references the source relay row via HandoffFromID.
func RelayRecordToDBRecord ¶
RelayRecordToDBRecord converts a relay Record into a DBRecord, applying defaults for MessageID, Mode, Status, and normalising all timestamps to UTC.
type Dialect ¶
type Dialect string
Dialect selects the SQL strategy used by GORMStore.
const ( // DialectStandard uses portable SQL without dialect-specific locking. DialectStandard Dialect = "standard" // DialectMySQL uses MySQL-specific claim SQL with FOR UPDATE SKIP LOCKED. DialectMySQL Dialect = "mysql" // DialectPostgres uses PostgreSQL-specific claim SQL with RETURNING. DialectPostgres Dialect = "postgres" )
func DetectDialect ¶
DetectDialect inspects the GORM Dialector name and returns the matching Dialect constant, defaulting to DialectStandard.
func NormalizeDialect ¶
NormalizeDialect lowercases and trims the dialect value, returning an error for unsupported values.
type GORMStore ¶
type GORMStore struct {
// contains filtered or unexported fields
}
GORMStore persists shared outbox rows through GORM.
func NewGORMStore ¶
func NewGORMStore(cfg GORMStoreConfig) (*GORMStore, error)
NewGORMStore creates a configured shared GORM-backed outbox store.
func (*GORMStore) AppendDebezium ¶
AppendDebezium inserts one CDC outbox row and updates record with the database-assigned values.
func (*GORMStore) AppendRelay ¶
AppendRelay inserts one relay outbox row and updates record with the database-assigned values.
func (*GORMStore) ClaimRelay ¶
func (s *GORMStore) ClaimRelay(ctx context.Context, req outbox.ClaimRequest) ([]outbox.Record, error)
ClaimRelay reserves one batch of eligible relay records ordered by available_at then id, using the dialect-appropriate locking strategy.
func (*GORMStore) ClaimRelayMySQLForTest ¶
func (s *GORMStore) ClaimRelayMySQLForTest( tx *gorm.DB, req outbox.ClaimRequest, claimUntil time.Time, ) ([]DBRecord, error)
ClaimRelayMySQLForTest exposes the MySQL-specific relay claim path to package-local tests.
func (*GORMStore) ClaimRelayPostgresForTest ¶
func (s *GORMStore) ClaimRelayPostgresForTest( tx *gorm.DB, req outbox.ClaimRequest, claimUntil time.Time, ) ([]DBRecord, error)
ClaimRelayPostgresForTest exposes the PostgreSQL-specific relay claim path to package-local tests.
func (*GORMStore) ClaimSelectedRelayRecordsForTest ¶
func (s *GORMStore) ClaimSelectedRelayRecordsForTest( tx *gorm.DB, ids []uint64, req outbox.ClaimRequest, claimUntil time.Time, ) ([]DBRecord, error)
ClaimSelectedRelayRecordsForTest exposes the optimistic claim update helper to package-local tests.
func (*GORMStore) CutoverRelayBacklog ¶
CutoverRelayBacklog migrates eligible relay rows to CDC rows in batches, marking each source relay row as handed off. It returns the total number of rows migrated.
func (*GORMStore) DeleteDebeziumBefore ¶
func (s *GORMStore) DeleteDebeziumBefore(ctx context.Context, cutoff time.Time, limit int) (int64, error)
DeleteDebeziumBefore deletes up to limit CDC rows whose created_at is older than cutoff, returning the number of rows deleted.
func (*GORMStore) MarkRelayFailed ¶
MarkRelayFailed marks one claimed relay record as permanently failed.
func (*GORMStore) MarkRelaySent ¶
MarkRelaySent marks one claimed relay record as sent.
func (*GORMStore) RetryRelay ¶
RetryRelay requeues one claimed relay record for a later attempt.
type GORMStoreConfig ¶
type GORMStoreConfig struct {
// DB is the GORM database handle used for all operations.
DB *gorm.DB
// TableName overrides the default outbox table name when non-empty.
TableName string
// Dialect selects the SQL strategy; auto-detected when empty.
Dialect Dialect
// SessionFromContext optionally extracts a per-request GORM session from ctx.
SessionFromContext func(context.Context) *gorm.DB
}
GORMStoreConfig configures the shared GORM-backed store implementation.