Documentation
¶
Overview ¶
Package event owns the EventKey registry, RawEvent, APIClient, and dedup filter.
Index ¶
- Constants
- func RegisterKey(def KeyDefinition)
- func ResetRegistryForTest()
- func SanitizeAppID(appID string) string
- func UnregisterKeyForTest(key string)
- type APIClient
- type DedupFilter
- type KeyDefinition
- type ParamDef
- type ParamType
- type ParamValue
- type ProcessFunc
- type RawEvent
- type SchemaDef
- type SchemaSpec
Constants ¶
const ( DefaultBufferSize = 100 MaxBufferSize = 1000 )
Variables ¶
This section is empty.
Functions ¶
func RegisterKey ¶
func RegisterKey(def KeyDefinition)
RegisterKey panics on duplicate Key, empty EventType, or schema/process contract violations.
func ResetRegistryForTest ¶
func ResetRegistryForTest()
func SanitizeAppID ¶
SanitizeAppID replaces ".." / path separators / NUL with "_" to guard filepath.Join; empty/dot-only collapses to "_".
func UnregisterKeyForTest ¶
func UnregisterKeyForTest(key string)
UnregisterKeyForTest removes one key — use this (not Reset) in tests with synthetic keys alongside production keys to keep -count=N reruns idempotent.
Types ¶
type APIClient ¶
type APIClient interface {
CallAPI(ctx context.Context, method, path string, body interface{}) (json.RawMessage, error)
}
APIClient: identity is opaque so business code can't bypass pre-flight checks.
type DedupFilter ¶
type DedupFilter struct {
// contains filtered or unexported fields
}
DedupFilter: seen map is sole authority; ring only bounds map size via overflow eviction.
func NewDedupFilter ¶
func NewDedupFilter() *DedupFilter
func NewDedupFilterWithSize ¶
func NewDedupFilterWithSize(ringSize int, ttl time.Duration) *DedupFilter
func (*DedupFilter) IsDuplicate ¶
func (d *DedupFilter) IsDuplicate(eventID string) bool
type KeyDefinition ¶
type KeyDefinition struct {
Key string `json:"key"`
DisplayName string `json:"display_name,omitempty"`
Description string `json:"description,omitempty"`
EventType string `json:"event_type"`
Params []ParamDef `json:"params,omitempty"`
Schema SchemaDef `json:"schema"`
// NormalizeParams canonicalizes param values BEFORE fingerprint compute,
// PreConsume, Match, and Process. Mutates the params map in place.
// May call OAPI; runs once per consumer at startup.
//
// Use cases: resolve aliases ("me" -> real email, a name -> an ID),
// trim whitespace. On error, consume fails (no retry); caller gets the
// wrapped error.
//
// Default nil = no normalization, params pass through unchanged.
NormalizeParams func(ctx context.Context, rt APIClient, params map[string]string) error `json:"-"`
// Process required when Schema.Custom is Processed output; must be nil when Native is used.
//
// Convention: returning (nil, nil) signals "drop this event" — the
// consumer loop will skip writing it to sink and not advance the
// emitted counter. Useful for async filtering (e.g. fetch metadata,
// drop if folder doesn't match). For sync filters that don't need
// OAPI, use Match instead.
Process func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string) (json.RawMessage, error) `json:"-"`
// Match is a synchronous payload filter run on every received event
// BEFORE Process. Return false to drop the event without further work.
//
// Signature deliberately omits ctx/rt to physically enforce "no OAPI
// calls in Match". For filters that need a metadata fetch first, use
// Process and return nil to drop.
//
// Default nil = accept all events.
Match func(raw *RawEvent, params map[string]string) bool `json:"-"`
// PreConsume runs once per (EventKey, SubscriptionID) when this consumer
// is first for that scope. Returns a cleanup function that the framework
// invokes when this consumer is the last for its scope.
//
// The cleanup's error return is honored: on nil the framework prints
// "[event] cleanup done."; on non-nil it prints a WARN with an
// idempotency note.
PreConsume func(ctx context.Context, rt APIClient, params map[string]string) (cleanup func() error, err error) `json:"-"`
Scopes []string `json:"scopes,omitempty"`
// AuthTypes: whitelist of identities the EventKey accepts. Empty = no identity required.
AuthTypes []string `json:"auth_types,omitempty"`
RequiredConsoleEvents []string `json:"required_console_events,omitempty"`
BufferSize int `json:"buffer_size,omitempty"`
Workers int `json:"workers,omitempty"`
}
func Lookup ¶
func Lookup(key string) (*KeyDefinition, bool)
type ParamDef ¶
type ParamDef struct {
Name string `json:"name"`
Type ParamType `json:"type"`
Required bool `json:"required"`
Default string `json:"default,omitempty"`
Description string `json:"description"`
Values []ParamValue `json:"values,omitempty"`
// SubscriptionKey marks this param as part of the subscription identity.
// Two consumers of the same EventKey but different values for any
// SubscriptionKey-marked param are treated as DISTINCT subscriptions:
// PreConsume runs once per (EventKey, SubscriptionID), cleanup runs once per
// (EventKey, SubscriptionID).
//
// CONTRACT: only mark a param SubscriptionKey if the EventKey's server-side
// subscribe/unsubscribe API is itself scoped to that resource. Lark keys the
// subscription record by (app, user, event_type) and overwrites it rather
// than reference-counting, so for a non-per-resource API the cleanup of one
// resource's last consumer unsubscribes the shared record and silently cuts
// off every other resource sharing that event_type.
//
// Default false = the param is a filter / formatting / metadata param
// and does not affect subscription identity.
SubscriptionKey bool `json:"subscription_key,omitempty"`
}
type ParamValue ¶
ParamValue.Desc is mandatory so AI consumers can decide which value to pick.
type ProcessFunc ¶
type RawEvent ¶
type RawEvent struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
SourceTime string `json:"source_time,omitempty"`
Payload json.RawMessage `json:"payload"`
Timestamp time.Time `json:"timestamp"`
}
RawEvent: SourceTime is upstream create_time; Timestamp is local source observation time.
type SchemaDef ¶
type SchemaDef struct {
Native *SchemaSpec `json:"native,omitempty"`
Custom *SchemaSpec `json:"custom,omitempty"`
FieldOverrides map[string]schemas.FieldMeta `json:"field_overrides,omitempty"`
}
SchemaDef: exactly one of Native or Custom must be set. Native auto-wraps the SDK type in the V2 envelope; Custom passes through verbatim.
type SchemaSpec ¶
type SchemaSpec struct {
Type reflect.Type `json:"-"`
Raw json.RawMessage `json:"raw,omitempty"`
}
SchemaSpec: exactly one of Type or Raw.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package bus implements the per-AppID event-bus daemon; lifecycle is driven by consumer presence (idle timeout) and explicit shutdown.
|
Package bus implements the per-AppID event-bus daemon; lifecycle is driven by consumer presence (idle timeout) and explicit shutdown. |
|
Package busctl is the wire-level control client for the event bus daemon.
|
Package busctl is the wire-level control client for the event bus daemon. |
|
Package busdiscover enumerates live bus daemons via per-AppID PID files protected by a process-lifetime advisory lock.
|
Package busdiscover enumerates live bus daemons via per-AppID PID files protected by a process-lifetime advisory lock. |
|
Package consume drives the consume-side half of the events pipeline.
|
Package consume drives the consume-side half of the events pipeline. |
|
Package protocol defines the newline-delimited JSON wire format used over IPC.
|
Package protocol defines the newline-delimited JSON wire format used over IPC. |
|
Package schemas derives JSON Schema fragments from Go types via reflection.
|
Package schemas derives JSON Schema fragments from Go types via reflection. |
|
Package source is a pluggable event source abstraction (separate package to keep business registrations free of SDK transitive deps).
|
Package source is a pluggable event source abstraction (separate package to keep business registrations free of SDK transitive deps). |
|
Package testutil holds test-only helpers shared across event subsystem tests.
|
Package testutil holds test-only helpers shared across event subsystem tests. |
|
Package transport: Unix sockets on POSIX, named pipes on Windows.
|
Package transport: Unix sockets on POSIX, named pipes on Windows. |