events

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HookEventFeatureReviewReady = "feature_review_ready"
	HookEventPostMergeClosure   = "post_merge_closure"
	HookEventBlockedStale       = "blocked_stale"
)

v1 hook event type constants. Deferred types (stash_overflow, shipment_ready) require lifecycle hook points from 007-DL and are not included in v1.

Variables

This section is empty.

Functions

func AckHookEvents

func AckHookEvents(ctx context.Context, cs *CheckpointStore, consumerID string, seq int64) error

AckHookEvents advances the consumer's checkpoint to seq, confirming that all events up to and including seq have been processed. This is a thin wrapper around CheckpointStore.SaveCheckpoint.

func CreateCheckpoint

func CreateCheckpoint(_ context.Context, checkpointDir string, stateDump string) (string, error)

CreateCheckpoint writes a timestamped state dump to the checkpoints directory. If the state dump contains a V1 schema (schema_version=1), it is parsed and validated before writing. Missing created_at, updated_at, and status fields are auto-populated. Legacy (non-V1) dumps are written as-is with atomic writes.

func LogPathForItem

func LogPathForItem(logsDir, itemID string) string

LogPathForItem returns the JSONL log path for a work item ID.

func ResolveCheckpoint added in v1.1.0

func ResolveCheckpoint(_ context.Context, checkpointDir, filename string) error

ResolveCheckpoint marks a checkpoint as resolved (idempotent).

func SaveMemory

func SaveMemory(_ context.Context, memoriesPath string, key string, summary string) error

SaveMemory persists a key-value pair to memories.json via atomic read-modify-write. A process-level mutex prevents lost updates from concurrent callers.

func ValidateCheckpoint added in v1.1.0

func ValidateCheckpoint(cp *CheckpointV1) error

ValidateCheckpoint validates a CheckpointV1 struct against its validator tags.

Types

type CheckpointContext added in v1.1.0

type CheckpointContext struct {
	ShipmentID string   `json:"shipment_id,omitempty"`
	FeatureID  string   `json:"feature_id,omitempty"`
	TaskIDs    []string `json:"task_ids,omitempty"`
	Branch     string   `json:"branch,omitempty"`
}

CheckpointContext holds shipment/feature/branch context for the checkpoint.

type CheckpointFilter added in v1.1.0

type CheckpointFilter struct {
	Agent      string        `json:"agent,omitempty"`
	Status     string        `json:"status,omitempty"`
	ShipmentID string        `json:"shipment_id,omitempty"`
	FeatureID  string        `json:"feature_id,omitempty"`
	MaxAge     time.Duration `json:"max_age,omitempty"`
}

CheckpointFilter constrains which checkpoints are returned by ListCheckpoints.

type CheckpointProgress added in v1.1.0

type CheckpointProgress struct {
	TasksCompleted []string `json:"tasks_completed,omitempty"`
	TasksRemaining []string `json:"tasks_remaining,omitempty"`
	FilesModified  []string `json:"files_modified,omitempty"`
	Decisions      []string `json:"decisions,omitempty"`
}

CheckpointProgress tracks task completion state within a checkpoint.

type CheckpointStore

type CheckpointStore struct {
	// contains filtered or unexported fields
}

CheckpointStore persists per-consumer acknowledgement positions as JSON files. Files are stored under .backlogit/runtime/hooks/{consumer_id}.checkpoint.json. The runtime/hooks directory is ephemeral (gitignored). If the directory or files are deleted, consumers restart from seq=0 (idempotent processing assumed).

func NewCheckpointStore

func NewCheckpointStore(backlogitDir string) *CheckpointStore

NewCheckpointStore creates a checkpoint store rooted at backlogitDir/runtime/hooks/.

func (*CheckpointStore) LoadCheckpoint

func (s *CheckpointStore) LoadCheckpoint(consumerID string) (int64, error)

LoadCheckpoint returns the last acknowledged sequence number for a consumer. Returns 0 and no error if no checkpoint file exists (consumer starts from the beginning).

func (*CheckpointStore) SaveCheckpoint

func (s *CheckpointStore) SaveCheckpoint(consumerID string, seq int64) error

SaveCheckpoint atomically persists the ack position for a consumer using a temp-file-then-rename pattern to prevent partial state. Returns an error wrapping ErrValidation if seq is strictly less than the current checkpoint (monotonic enforcement: ack positions must never go backward). Saving the same seq twice (idempotent ack) is allowed. Per-consumer in-process locking prevents concurrent read-modify-write races.

type CheckpointSummary added in v1.1.0

type CheckpointSummary struct {
	Filename      string    `json:"filename"`
	Agent         string    `json:"agent"`
	SessionID     string    `json:"session_id"`
	Phase         string    `json:"phase"`
	Status        string    `json:"status"`
	CreatedAt     time.Time `json:"created_at"`
	ShipmentID    string    `json:"shipment_id,omitempty"`
	FeatureID     string    `json:"feature_id,omitempty"`
	ResumeHint    string    `json:"resume_hint,omitempty"`
	ValidationErr string    `json:"validation_error,omitempty"`
	// Quarantined is true when the file was physically moved to the quarantine
	// directory due to a parse failure. ValidationErr may also be set for
	// schema validation failures that do NOT quarantine the file.
	Quarantined bool `json:"quarantined,omitempty"`
}

CheckpointSummary is a lightweight view of a checkpoint for list results.

func ListCheckpoints added in v1.1.0

func ListCheckpoints(_ context.Context, checkpointDir string, filter CheckpointFilter) ([]CheckpointSummary, error)

ListCheckpoints returns checkpoint summaries from checkpointDir, applying optional filter. Unparseable files are quarantined to .backlogit/quarantine/checkpoints/ and included in the result with ValidationErr populated.

type CheckpointV1 added in v1.1.0

type CheckpointV1 struct {
	SchemaVersion int                 `json:"schema_version" validate:"eq=1"`
	Agent         string              `json:"agent" validate:"required,oneof=ship stage"`
	SessionID     string              `json:"session_id" validate:"required"`
	Phase         string              `json:"phase" validate:"required"`
	Status        string              `json:"status" validate:"required,oneof=active resolved"`
	CreatedAt     time.Time           `json:"created_at" validate:"required"`
	UpdatedAt     time.Time           `json:"updated_at" validate:"required"`
	Context       CheckpointContext   `json:"context"`
	Progress      *CheckpointProgress `json:"progress,omitempty"`
	ResumeHint    string              `json:"resume_hint,omitempty"`
}

CheckpointV1 is the canonical schema for agent session checkpoints.

func GetCheckpoint added in v1.1.0

func GetCheckpoint(_ context.Context, checkpointDir, filename string) (*CheckpointV1, error)

GetCheckpoint reads and validates a specific checkpoint file. Returns ErrCheckpointNotFound if the file doesn't exist, ErrCheckpointInvalid if validation fails.

func ParseCheckpoint added in v1.1.0

func ParseCheckpoint(data []byte) (*CheckpointV1, error)

ParseCheckpoint decodes JSON bytes into a CheckpointV1 struct.

type CleanupResult added in v1.1.0

type CleanupResult struct {
	ArchivedCount int      `json:"archived_count"`
	ArchivedFiles []string `json:"archived_files"`
	SkippedCount  int      `json:"skipped_count"`
	Errors        []string `json:"errors,omitempty"`
}

CleanupResult reports the outcome of a checkpoint cleanup operation.

func CleanupCheckpoints added in v1.1.0

func CleanupCheckpoints(_ context.Context, checkpointDir string, retentionDays int) (CleanupResult, error)

CleanupCheckpoints archives resolved and stale checkpoints. retentionDays must be > 0.

type DerivedSignalProvider

type DerivedSignalProvider interface {
	DerivedSignals(ctx context.Context) ([]HookEvent, error)
}

DerivedSignalProvider computes ephemeral signals at poll time. The concrete implementation queries the SQLite index; this interface keeps the reader independent of database access. A nil provider is valid and causes derived signals to be skipped.

type Event

type Event struct {
	Timestamp time.Time      `json:"timestamp"`
	Actor     string         `json:"actor"`
	ItemID    string         `json:"item_id"`
	EventType string         `json:"event_type"`
	Delta     map[string]any `json:"delta"`
	CommitSHA string         `json:"commit_sha,omitempty"`
}

Event represents a state change or comment in the event stream.

func ReadAllEvents

func ReadAllEvents(_ context.Context, logsDir string, itemID string) ([]Event, error)

ReadAllEvents reads all events from a work item's JSONL log file.

func TailEvents

func TailEvents(_ context.Context, logsDir string, itemID string, limit int) ([]Event, error)

TailEvents reads the most recent events for a specific item from its JSONL log file.

type EventWriter

type EventWriter struct {
	// contains filtered or unexported fields
}

EventWriter provides goroutine-safe append-only writes to per-item JSONL log files.

func NewEventWriter

func NewEventWriter(logsDir string) *EventWriter

NewEventWriter creates an event writer for the given logs directory.

func (*EventWriter) AppendEvent

func (w *EventWriter) AppendEvent(_ context.Context, event Event) error

AppendEvent marshals and appends an event to the item's JSONL log file.

type HookEvent

type HookEvent struct {
	Seq       int64          `json:"seq"`
	Timestamp time.Time      `json:"timestamp"`
	EventType string         `json:"event_type"`
	Payload   map[string]any `json:"payload"`
}

HookEvent represents a signal emitted to the agent hook event queue.

type HookEventWriter

type HookEventWriter struct {
	// contains filtered or unexported fields
}

HookEventWriter provides goroutine-safe, cross-process-safe sequenced appends to the shared hook event queue at .backlogit/hooks_queue.jsonl. Sequence numbers are allocated under a cross-process lock on every append.

func NewHookEventWriter

func NewHookEventWriter(backlogitDir string) *HookEventWriter

NewHookEventWriter creates a writer targeting hooks_queue.jsonl under backlogitDir.

func (*HookEventWriter) AppendHookEvent

func (w *HookEventWriter) AppendHookEvent(ctx context.Context, event HookEvent) (int64, error)

AppendHookEvent appends a sequenced hook event to the queue. The sequence counter is determined by scanning existing events and incremented under a combined in-process mutex and cross-process sidecar lock to prevent duplicate sequence numbers across concurrent writers. Returns the assigned monotonic sequence number.

func (*HookEventWriter) ReadHookEvents

func (w *HookEventWriter) ReadHookEvents(_ context.Context) ([]HookEvent, error)

ReadHookEvents reads all events from the queue file in append order. Returns nil if the queue file does not exist.

type PollResult

type PollResult struct {
	// Events contains sequenced, ackable events from the queue with seq strictly
	// greater than the consumer's current checkpoint. These events should be
	// processed and then acked via AckHookEvents.
	Events []HookEvent

	// DerivedSignals contains ephemeral, non-ackable signals computed at poll time.
	// Seq is always 0 for derived signals; they must not be passed to AckHookEvents.
	DerivedSignals []HookEvent
}

PollResult separates ackable queued events from ephemeral derived signals.

func PollHookEvents

func PollHookEvents(
	ctx context.Context,
	w *HookEventWriter,
	cs *CheckpointStore,
	consumerID string,
	provider DerivedSignalProvider,
) (*PollResult, error)

PollHookEvents returns new events since the consumer's last checkpoint plus any derived signals from the provider. Events are filtered to those with seq strictly greater than the consumer's checkpoint. Derived signals always carry Seq=0 and are excluded from the ack stream. If provider is nil, DerivedSignals is empty.

type TelemetryEntry

type TelemetryEntry struct {
	Timestamp time.Time      `json:"timestamp"`
	EventType string         `json:"event_type"`
	Payload   map[string]any `json:"payload"`
}

TelemetryEntry represents an agent telemetry log entry.

type TelemetryWriter

type TelemetryWriter struct {
	// contains filtered or unexported fields
}

TelemetryWriter provides goroutine-safe writes to telemetry.jsonl.

func NewTelemetryWriter

func NewTelemetryWriter(path string) *TelemetryWriter

NewTelemetryWriter creates a telemetry writer.

func (*TelemetryWriter) LogTelemetry

func (w *TelemetryWriter) LogTelemetry(_ context.Context, entry TelemetryEntry) error

LogTelemetry appends a telemetry entry to telemetry.jsonl.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL