Documentation
¶
Overview ¶
Package projector is the runnable Pattern B reference implementation from doc/search-integration.md. It tails a Murmur counter table via DynamoDB Streams and projects bucket transitions into an OpenSearch index.
Architecture (see doc/search-integration.md "Pattern B" for the full treatment):
Murmur counter pipeline → DynamoDB → DDB Streams → THIS Lambda → OpenSearch
The projector decides per-record:
- decode old + new image → log10 buckets via pkg/projection
- if bucket changed: emit OpenSearch partial-update for the popularity_bucket field
- otherwise: drop (this is the whole point — index update rate is logarithmic in counter magnitude, not linear in event rate)
Hysteresis-band wrapping (also from pkg/projection) is configurable: for documents oscillating around a boundary, set a 10–20% band to suppress flap reindexing.
The Lambda is wired against Murmur's pkg/exec/lambda/dynamodbstreams runtime — same retry/dedup/BatchItemFailures shape, just a different downstream "store" (the OpenSearch projector instead of an aggregating MergeUpdate).
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrIndexClientRequired = errors.New("projector: IndexClient is required")
ErrIndexClientRequired is returned when constructing without a client.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Index is the OpenSearch index name (the projector writes
// partial-updates with the popularity_bucket field).
Index string
// Field is the document field name to update with the bucket. Default
// "popularity_bucket". Override when projecting multiple counters into
// the same index ("post_likes_bucket", "post_views_bucket", ...).
Field string
// Hysteresis, when > 0, wraps the bucket function with the named
// hysteresis band to suppress oscillation around bucket boundaries.
// Typical values: 0.05–0.20. See pkg/projection.HysteresisBucket.
//
// Hysteresis requires the projector to remember the previous bucket
// per entity. We don't currently persist this state across Lambda
// invocations — for the prototype, the prev-bucket is derived from
// the OldImage's value (so the band suppresses flap WITHIN a single
// invocation but not across them). For full hysteresis spanning
// invocations, persist (entity, prev-bucket) in a small DDB table
// and look it up in the projector. Marked as future work.
Hysteresis float64
}
Config is the projector's deployment-time configuration. Pin these via environment variables in main.
type IndexClient ¶
type IndexClient interface {
UpdateDoc(ctx context.Context, index, docID string, fields map[string]any) error
}
IndexClient abstracts the OpenSearch UpdateDoc surface so the projector is testable without the live cluster. Production: opensearchapi.Client. Tests: a fake that records every Update call.
type Projector ¶
type Projector struct {
// contains filtered or unexported fields
}
Projector is the per-record decision engine. Held as a struct so the Lambda main can wire its dependencies once and reuse them across invocations.
func New ¶
func New(cfg Config, client IndexClient) *Projector
New constructs a Projector. The bucket function is fixed to projection.LogBucket — the right choice for popularity-counter projection, and the canonical pattern in doc/search-integration.md. For non-popularity counters with a different shape (linear bands, manual breakpoints), wire the projector against pkg/projection.LinearBucket or ManualBucket directly via NewWithBucket.
func NewWithBucket ¶
func NewWithBucket(cfg Config, client IndexClient, bf projection.BucketFn) *Projector
NewWithBucket constructs a Projector with a caller-supplied bucket function. Use when LogBucket isn't the right shape for your counter.
func (*Projector) Handle ¶
Handle processes one DDB Streams record. Returns nil if the record was handled (whether or not it triggered an index update); returns an error only when the OpenSearch update itself failed and Lambda should add the record to BatchItemFailures so it gets redelivered.
func (*Projector) HandleEvent ¶
func (p *Projector) HandleEvent(ctx context.Context, evt events.DynamoDBEvent) []events.DynamoDBBatchItemFailure
HandleEvent is the convenience handler for a full SQS-style batch. Returns the BatchItemFailures slice for Lambda's response shape.
type Stats ¶
type Stats struct {
Decoded atomic.Int64 // records successfully decoded
Skipped atomic.Int64 // bucket unchanged → no reindex
Indexed atomic.Int64 // bucket changed → reindex emitted
IndexErrors atomic.Int64 // OpenSearch update returned a non-2xx
DecodeErrs atomic.Int64 // missing pk / bad attribute / etc.
}
Stats reports projector activity. The Lambda runtime can publish these to CloudWatch via metrics.Recorder; tests assert on them directly.
func (*Stats) MarshalJSON ¶
MarshalStats returns a JSON snapshot of the projector counters. Useful for the Lambda warmup path or as a debug endpoint when running locally.