fileoutbox

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MPL-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package fileoutbox provides a dependency-free JSON Lines outbox adapter for runtime/contracts.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Decoder

type Decoder = contracts.EventDecoder

Decoder converts one persisted JSON payload back into the typed Go event value expected by runtime/contracts subscribers.

type Option

type Option func(*Store)

Option configures a Store.

func WithBatchSize

func WithBatchSize(size int) Option

WithBatchSize sets the maximum number of records returned by one worker batch. Non-positive values keep the default.

func WithDeadLetter

func WithDeadLetter(deadLetterPath string, maxAttempts int) Option

WithDeadLetter moves records to deadLetterPath after maxAttempts failed deliveries. Non-positive maxAttempts or an empty path disables dead-lettering.

func WithDecoder

func WithDecoder(eventType string, decoder Decoder) Option

WithDecoder registers a decoder for one stored event type.

func WithJSONDecoder

func WithJSONDecoder[T any](eventType string) Option

WithJSONDecoder registers a JSON decoder for one stored event type.

func WithJSONTypeDecoder

func WithJSONTypeDecoder[T any]() Option

WithJSONTypeDecoder registers a JSON decoder using the same Go type name stored by runtime/contracts when T is emitted.

type Record

type Record struct {
	ID            string                  `json:"id"`
	EventID       string                  `json:"eventId,omitempty"`
	TraceParent   string                  `json:"traceparent,omitempty"`
	StoredAt      time.Time               `json:"storedAt"`
	Category      contracts.EventCategory `json:"category"`
	Type          string                  `json:"type"`
	Value         json.RawMessage         `json:"value"`
	Attempts      int                     `json:"attempts,omitempty"`
	LastAttemptAt *time.Time              `json:"lastAttemptAt,omitempty"`
	LastError     string                  `json:"lastError,omitempty"`
}

Record is one durable outbox row stored as a JSON Lines object.

type SeenOption added in v0.5.0

type SeenOption func(*SeenStore)

SeenOption configures a SeenStore.

func WithSeenLimit added in v0.5.0

func WithSeenLimit(limit int) SeenOption

WithSeenLimit sets the maximum retained IDs. Non-positive values keep the default window.

type SeenRecord added in v0.5.0

type SeenRecord struct {
	ID     string    `json:"id"`
	SeenAt time.Time `json:"seenAt"`
}

SeenRecord is one file-backed deduplication entry.

type SeenStore added in v0.5.0

type SeenStore struct {
	// contains filtered or unexported fields
}

SeenStore records delivered event IDs in a JSON Lines file. It is intended for local single-binary apps that also use fileoutbox.

func NewSeenStore added in v0.5.0

func NewSeenStore(path string, options ...SeenOption) *SeenStore

NewSeenStore creates a file-backed seen store at path.

func (*SeenStore) MarkIfNew added in v0.5.0

func (store *SeenStore) MarkIfNew(ctx context.Context, id string) (bool, error)

MarkIfNew records id and reports whether it was not already present in the retained file window.

func (*SeenStore) MarkSeen added in v0.5.0

func (store *SeenStore) MarkSeen(ctx context.Context, id string) error

MarkSeen records id in the retained file window.

func (*SeenStore) Seen added in v0.5.0

func (store *SeenStore) Seen(ctx context.Context, id string) (bool, error)

Seen reports whether id is present in the retained file window.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store appends event envelopes to a JSON Lines file and can replay them as an EventSource. Ack removes delivered records; Nack records retry metadata and leaves records for later delivery.

Store synchronizes access within a single process only. Pointing multiple processes at the same outbox file is not supported and can lose or double-deliver records.

func New

func New(path string, options ...Option) *Store

New creates a file-backed outbox at path.

func (*Store) DeadLetterRecords

func (store *Store) DeadLetterRecords(ctx context.Context) ([]Record, error)

DeadLetterRecords returns records moved out of the pending outbox by the configured dead-letter policy.

func (*Store) ReceiveEventBatch

func (store *Store) ReceiveEventBatch(ctx context.Context) (contracts.EventBatch, error)

ReceiveEventBatch returns the next pending records as typed event envelopes. It returns contracts.ErrEventSourceClosed when the outbox is empty.

func (*Store) Records

func (store *Store) Records(ctx context.Context) ([]Record, error)

Records returns all currently pending outbox records.

func (*Store) StoreEvents

func (store *Store) StoreEvents(ctx context.Context, events []contracts.EventEnvelope) error

StoreEvents appends events to the outbox file.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL