event

package
v1.0.53 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package event owns the EventKey registry, RawEvent, APIClient, and dedup filter.

Index

Constants

View Source
const (
	DefaultBufferSize = 100
	MaxBufferSize     = 1000
)

Variables

This section is empty.

Functions

func RegisterKey

func RegisterKey(def KeyDefinition)

RegisterKey panics on duplicate Key, empty EventType, or schema/process contract violations.

func ResetRegistryForTest

func ResetRegistryForTest()

func SanitizeAppID

func SanitizeAppID(appID string) string

SanitizeAppID replaces ".." / path separators / NUL with "_" to guard filepath.Join; empty/dot-only collapses to "_".

func UnregisterKeyForTest

func UnregisterKeyForTest(key string)

UnregisterKeyForTest removes one key — use this (not Reset) in tests with synthetic keys alongside production keys to keep -count=N reruns idempotent.

Types

type APIClient

type APIClient interface {
	CallAPI(ctx context.Context, method, path string, body interface{}) (json.RawMessage, error)
}

APIClient: identity is opaque so business code can't bypass pre-flight checks.

type DedupFilter

type DedupFilter struct {
	// contains filtered or unexported fields
}

DedupFilter: seen map is sole authority; ring only bounds map size via overflow eviction.

func NewDedupFilter

func NewDedupFilter() *DedupFilter

func NewDedupFilterWithSize

func NewDedupFilterWithSize(ringSize int, ttl time.Duration) *DedupFilter

func (*DedupFilter) IsDuplicate

func (d *DedupFilter) IsDuplicate(eventID string) bool

type KeyDefinition

type KeyDefinition struct {
	Key         string `json:"key"`
	DisplayName string `json:"display_name,omitempty"`
	Description string `json:"description,omitempty"`
	EventType   string `json:"event_type"`

	Params []ParamDef `json:"params,omitempty"`

	Schema SchemaDef `json:"schema"`

	// NormalizeParams canonicalizes param values BEFORE fingerprint compute,
	// PreConsume, Match, and Process. Mutates the params map in place.
	// May call OAPI; runs once per consumer at startup.
	//
	// Use cases: resolve aliases ("me" -> real email, a name -> an ID),
	// trim whitespace. On error, consume fails (no retry); caller gets the
	// wrapped error.
	//
	// Default nil = no normalization, params pass through unchanged.
	NormalizeParams func(ctx context.Context, rt APIClient, params map[string]string) error `json:"-"`

	// Process required when Schema.Custom is Processed output; must be nil when Native is used.
	//
	// Convention: returning (nil, nil) signals "drop this event" — the
	// consumer loop will skip writing it to sink and not advance the
	// emitted counter. Useful for async filtering (e.g. fetch metadata,
	// drop if folder doesn't match). For sync filters that don't need
	// OAPI, use Match instead.
	Process func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string) (json.RawMessage, error) `json:"-"`

	// Match is a synchronous payload filter run on every received event
	// BEFORE Process. Return false to drop the event without further work.
	//
	// Signature deliberately omits ctx/rt to physically enforce "no OAPI
	// calls in Match". For filters that need a metadata fetch first, use
	// Process and return nil to drop.
	//
	// Default nil = accept all events.
	Match func(raw *RawEvent, params map[string]string) bool `json:"-"`

	// PreConsume runs once per (EventKey, SubscriptionID) when this consumer
	// is first for that scope. Returns a cleanup function that the framework
	// invokes when this consumer is the last for its scope.
	//
	// The cleanup's error return is honored: on nil the framework prints
	// "[event] cleanup done."; on non-nil it prints a WARN with an
	// idempotency note.
	PreConsume func(ctx context.Context, rt APIClient, params map[string]string) (cleanup func() error, err error) `json:"-"`

	Scopes []string `json:"scopes,omitempty"`

	// AuthTypes: whitelist of identities the EventKey accepts. Empty = no identity required.
	AuthTypes []string `json:"auth_types,omitempty"`

	RequiredConsoleEvents []string `json:"required_console_events,omitempty"`

	BufferSize int `json:"buffer_size,omitempty"`
	Workers    int `json:"workers,omitempty"`
}

func ListAll

func ListAll() []*KeyDefinition

ListAll returns all KeyDefinitions sorted by Key.

func Lookup

func Lookup(key string) (*KeyDefinition, bool)

type ParamDef

type ParamDef struct {
	Name        string       `json:"name"`
	Type        ParamType    `json:"type"`
	Required    bool         `json:"required"`
	Default     string       `json:"default,omitempty"`
	Description string       `json:"description"`
	Values      []ParamValue `json:"values,omitempty"`

	// SubscriptionKey marks this param as part of the subscription identity.
	// Two consumers of the same EventKey but different values for any
	// SubscriptionKey-marked param are treated as DISTINCT subscriptions:
	// PreConsume runs once per (EventKey, SubscriptionID), cleanup runs once per
	// (EventKey, SubscriptionID).
	//
	// CONTRACT: only mark a param SubscriptionKey if the EventKey's server-side
	// subscribe/unsubscribe API is itself scoped to that resource. Lark keys the
	// subscription record by (app, user, event_type) and overwrites it rather
	// than reference-counting, so for a non-per-resource API the cleanup of one
	// resource's last consumer unsubscribes the shared record and silently cuts
	// off every other resource sharing that event_type.
	//
	// Default false = the param is a filter / formatting / metadata param
	// and does not affect subscription identity.
	SubscriptionKey bool `json:"subscription_key,omitempty"`
}

type ParamType

type ParamType string
const (
	ParamString ParamType = "string"
	ParamEnum   ParamType = "enum"
	ParamMulti  ParamType = "multi"
	ParamBool   ParamType = "bool"
	ParamInt    ParamType = "int"
)

type ParamValue

type ParamValue struct {
	Value string `json:"value"`
	Desc  string `json:"desc"`
}

ParamValue.Desc is mandatory so AI consumers can decide which value to pick.

type ProcessFunc

type ProcessFunc = func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string) (json.RawMessage, error)

type RawEvent

type RawEvent struct {
	EventID    string          `json:"event_id"`
	EventType  string          `json:"event_type"`
	SourceTime string          `json:"source_time,omitempty"`
	Payload    json.RawMessage `json:"payload"`
	Timestamp  time.Time       `json:"timestamp"`
}

RawEvent: SourceTime is upstream create_time; Timestamp is local source observation time.

type SchemaDef

type SchemaDef struct {
	Native         *SchemaSpec                  `json:"native,omitempty"`
	Custom         *SchemaSpec                  `json:"custom,omitempty"`
	FieldOverrides map[string]schemas.FieldMeta `json:"field_overrides,omitempty"`
}

SchemaDef: exactly one of Native or Custom must be set. Native auto-wraps the SDK type in the V2 envelope; Custom passes through verbatim.

type SchemaSpec

type SchemaSpec struct {
	Type reflect.Type    `json:"-"`
	Raw  json.RawMessage `json:"raw,omitempty"`
}

SchemaSpec: exactly one of Type or Raw.

Directories

Path Synopsis
Package bus implements the per-AppID event-bus daemon; lifecycle is driven by consumer presence (idle timeout) and explicit shutdown.
Package bus implements the per-AppID event-bus daemon; lifecycle is driven by consumer presence (idle timeout) and explicit shutdown.
Package busctl is the wire-level control client for the event bus daemon.
Package busctl is the wire-level control client for the event bus daemon.
Package busdiscover enumerates live bus daemons via per-AppID PID files protected by a process-lifetime advisory lock.
Package busdiscover enumerates live bus daemons via per-AppID PID files protected by a process-lifetime advisory lock.
Package consume drives the consume-side half of the events pipeline.
Package consume drives the consume-side half of the events pipeline.
Package protocol defines the newline-delimited JSON wire format used over IPC.
Package protocol defines the newline-delimited JSON wire format used over IPC.
Package schemas derives JSON Schema fragments from Go types via reflection.
Package schemas derives JSON Schema fragments from Go types via reflection.
Package source is a pluggable event source abstraction (separate package to keep business registrations free of SDK transitive deps).
Package source is a pluggable event source abstraction (separate package to keep business registrations free of SDK transitive deps).
Package testutil holds test-only helpers shared across event subsystem tests.
Package testutil holds test-only helpers shared across event subsystem tests.
Package transport: Unix sockets on POSIX, named pipes on Windows.
Package transport: Unix sockets on POSIX, named pipes on Windows.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL