notifications

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCursorNotFound reports that no durable notification cursor matched the key.
	ErrCursorNotFound = errors.New("notifications: cursor not found")
	// ErrInvalidCursor reports invalid cursor identity or cursor payload.
	ErrInvalidCursor = errors.New("notifications: invalid cursor")
	// ErrNonMonotonicCursor reports a cursor advance that would move backward or fork delivery metadata.
	ErrNonMonotonicCursor = errors.New("notifications: non-monotonic cursor advance")
	// ErrResetReasonRequired reports a reset request without an explicit recovery reason.
	ErrResetReasonRequired = errors.New("notifications: reset reason required")
)

Functions

This section is empty.

Types

type AdvanceCursor

type AdvanceCursor struct {
	Key             CursorKey `json:"key"`
	LastSequence    int64     `json:"last_sequence"`
	LastDeliveredAt time.Time `json:"last_delivered_at"`
	DeliveryID      string    `json:"delivery_id,omitempty"`
	Now             time.Time `json:"now"`
}

AdvanceCursor records a confirmed delivery position.

func (AdvanceCursor) Normalize

func (a AdvanceCursor) Normalize(fallbackNow time.Time) (AdvanceCursor, error)

Normalize validates an advance request and fills missing timestamps.

type Cursor

type Cursor struct {
	Key             CursorKey `json:"key"`
	LastSequence    int64     `json:"last_sequence"`
	LastDeliveryID  string    `json:"last_delivery_id,omitempty"`
	LastDeliveredAt time.Time `json:"last_delivered_at"`
	LastError       string    `json:"last_error,omitempty"`
	UpdatedAt       time.Time `json:"updated_at"`
}

Cursor stores the latest confirmed delivery position for one consumer.

type CursorError

type CursorError struct {
	Key       CursorKey `json:"key"`
	LastError string    `json:"last_error"`
	Now       time.Time `json:"now"`
}

CursorError records a bounded diagnostic without advancing delivery progress.

func (CursorError) Normalize

func (e CursorError) Normalize(fallbackNow time.Time) (CursorError, error)

Normalize validates an error report and fills missing timestamps.

type CursorKey

type CursorKey struct {
	ConsumerID string `json:"consumer_id"`
	StreamName string `json:"stream_name"`
	SubjectID  string `json:"subject_id"`
}

CursorKey identifies one durable delivery cursor.

func (CursorKey) Normalize

func (k CursorKey) Normalize() (CursorKey, error)

Normalize trims and validates cursor identity.

type CursorQuery

type CursorQuery struct {
	ConsumerID string `json:"consumer_id,omitempty"`
	StreamName string `json:"stream_name,omitempty"`
	SubjectID  string `json:"subject_id,omitempty"`
	Limit      int    `json:"limit,omitempty"`
}

CursorQuery filters cursor diagnostics.

func (CursorQuery) Normalize

func (q CursorQuery) Normalize() CursorQuery

Normalize trims query filters.

type CursorStore

type CursorStore interface {
	GetCursor(ctx context.Context, key CursorKey) (Cursor, error)
	ListCursors(ctx context.Context, query CursorQuery) ([]Cursor, error)
	AdvanceCursor(ctx context.Context, update AdvanceCursor) (Cursor, error)
	ResetCursor(ctx context.Context, reset ResetCursor) (Cursor, error)
	RecordCursorError(ctx context.Context, report CursorError) (Cursor, error)
}

CursorStore persists confirmed notification delivery progress.

type ResetCursor

type ResetCursor struct {
	Key             CursorKey `json:"key"`
	LastSequence    int64     `json:"last_sequence"`
	LastDeliveryID  string    `json:"last_delivery_id,omitempty"`
	LastDeliveredAt time.Time `json:"last_delivered_at"`
	Reason          string    `json:"reason"`
	Now             time.Time `json:"now"`
}

ResetCursor rewinds or repairs one cursor after an explicit recovery decision.

func (ResetCursor) Normalize

func (r ResetCursor) Normalize(fallbackNow time.Time) (ResetCursor, error)

Normalize validates a reset request and fills missing timestamps.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service validates cursor requests before delegating persistence to the store.

func NewService

func NewService(store CursorStore) *Service

NewService creates a notification cursor service.

func (*Service) Advance

func (s *Service) Advance(ctx context.Context, update AdvanceCursor) (Cursor, error)

Advance records a monotonic confirmed delivery position.

func (*Service) Get

func (s *Service) Get(ctx context.Context, key CursorKey) (Cursor, error)

Get returns one durable cursor.

func (*Service) List

func (s *Service) List(ctx context.Context, query CursorQuery) ([]Cursor, error)

List returns durable cursors matching the query.

func (*Service) RecordError

func (s *Service) RecordError(ctx context.Context, report CursorError) (Cursor, error)

RecordError stores a diagnostic without moving the confirmed delivery sequence.

func (*Service) Reset

func (s *Service) Reset(ctx context.Context, reset ResetCursor) (Cursor, error)

Reset repairs one cursor after an explicit recovery decision.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL