events

package
v0.0.0-...-e9fa201 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package events provides S3 event notification infrastructure.

Community edition: - Events are queued via the taskqueue but not delivered - Provides the Emitter interface for S3 handlers to call

Enterprise edition: - EventHandler processes queued events - Redis, Kafka, and Webhook publishers deliver events

Index

Constants

This section is empty.

Variables

View Source
var (
	// EventsEmittedTotal tracks total events emitted by event type
	EventsEmittedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "events",
		Name:      "emitted_total",
		Help:      "Total number of S3 events emitted",
	}, []string{"event_type"}) // event_type: "s3:ObjectCreated:Put", etc.

	// EventsDroppedTotal tracks events dropped (emitter disabled)
	EventsDroppedTotal = prometheus.NewCounter(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "events",
		Name:      "dropped_total",
		Help:      "Total number of S3 events dropped (emitter disabled)",
	})

	// EventsErrorsTotal tracks event emission errors
	EventsErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "events",
		Name:      "errors_total",
		Help:      "Total number of event emission errors",
	}, []string{"error_type"}) // error_type: "marshal", "enqueue"

	// EventsDeliveredTotal tracks events delivered by publisher type (Enterprise)
	EventsDeliveredTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "events",
		Name:      "delivered_total",
		Help:      "Total number of S3 events delivered to publishers",
	}, []string{"publisher"}) // publisher: "redis", "kafka", "webhook"

	// EventsDeliveryErrorsTotal tracks delivery errors by publisher (Enterprise)
	EventsDeliveryErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "zapfs",
		Subsystem: "events",
		Name:      "delivery_errors_total",
		Help:      "Total number of event delivery errors",
	}, []string{"publisher"}) // publisher: "redis", "kafka", "webhook"

	// EventsDeliveryDuration tracks event delivery latency by publisher (Enterprise)
	EventsDeliveryDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
		Namespace: "zapfs",
		Subsystem: "events",
		Name:      "delivery_duration_seconds",
		Help:      "Time spent delivering events to publishers",
		Buckets:   []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5},
	}, []string{"publisher"}) // publisher: "redis", "kafka", "webhook"

	// EventsQueueDepth tracks current event queue depth
	EventsQueueDepth = prometheus.NewGauge(prometheus.GaugeOpts{
		Namespace: "zapfs",
		Subsystem: "events",
		Name:      "queue_depth",
		Help:      "Current number of events pending delivery",
	})
)

Functions

func MatchesEventType

func MatchesEventType(pattern EventType, eventName string) bool

MatchesEventType checks if an event name matches an event type pattern. Supports wildcard matching (e.g., "s3:ObjectCreated:*" matches "s3:ObjectCreated:Put").

func MatchesFilterRules

func MatchesFilterRules(key string, prefix, suffix string) bool

MatchesFilterRules checks if an object key matches the filter rules. Returns true if no rules are specified or if the key matches all rules.

Types

type Config

type Config struct {
	// Enabled controls whether event emission is active.
	// When false, Emitter.Emit() is a no-op.
	Enabled bool `mapstructure:"enabled"`

	// Redis publisher configuration (Enterprise)
	Redis RedisConfig `mapstructure:"redis"`

	// Kafka publisher configuration (Enterprise)
	Kafka KafkaConfig `mapstructure:"kafka"`

	// Webhook publisher configuration (Enterprise)
	Webhook WebhookConfig `mapstructure:"webhook"`
}

Config holds event notification configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with default values.

func (*Config) HasPublishers

func (c *Config) HasPublishers() bool

HasPublishers returns true if at least one publisher is enabled.

func (*Config) Validate

func (c *Config) Validate()

Validate checks the config and applies defaults for invalid values.

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

Emitter queues S3 events for async delivery via the taskqueue.

In community edition, events are queued but the EventHandler stub simply drops them. In enterprise edition, EventHandler delivers events to configured destinations (Redis, Kafka, Webhook).

func NewEmitter

func NewEmitter(cfg EmitterConfig) *Emitter

NewEmitter creates an event emitter.

func NoopEmitter

func NoopEmitter() *Emitter

NoopEmitter returns an emitter that drops all events. Use this when event notifications are disabled.

func (*Emitter) Emit

func (e *Emitter) Emit(ctx context.Context, payload *taskqueue.EventPayload)

Emit queues an S3 event for delivery. Returns immediately; delivery is async via the taskqueue.

If the emitter is disabled or the queue is nil, events are silently dropped. Errors are logged but not returned to avoid blocking S3 operations.

func (*Emitter) EmitObjectCreated

func (e *Emitter) EmitObjectCreated(ctx context.Context, eventType EventType, bucket, key string, size int64, etag, versionID, ownerID, requestID, sourceIP string)

EmitObjectCreated emits an object creation event.

func (*Emitter) EmitObjectRemoved

func (e *Emitter) EmitObjectRemoved(ctx context.Context, eventType EventType, bucket, key, versionID, ownerID, requestID, sourceIP string)

EmitObjectRemoved emits an object deletion event.

func (*Emitter) EmitObjectTagging

func (e *Emitter) EmitObjectTagging(ctx context.Context, eventType EventType, bucket, key, versionID, ownerID, requestID, sourceIP string)

EmitObjectTagging emits an object tagging event.

func (*Emitter) IsEnabled

func (e *Emitter) IsEnabled() bool

IsEnabled returns whether the emitter is enabled.

func (*Emitter) Stats

func (e *Emitter) Stats() EmitterStats

Stats returns emitter statistics. Note: Detailed metrics are available via Prometheus at /metrics endpoint.

type EmitterConfig

type EmitterConfig struct {
	// Queue is the taskqueue for persisting events.
	// If nil, events are silently dropped.
	Queue taskqueue.Queue

	// Enabled controls whether events are queued.
	// If false, Emit() is a no-op.
	Enabled bool

	// Region is the region identifier for events.
	Region string
}

EmitterConfig configures the event emitter.

type EmitterStats

type EmitterStats struct {
	Enabled bool   `json:"enabled"`
	Region  string `json:"region,omitempty"`
}

EmitterStats contains emitter status information. Detailed metrics are exposed via Prometheus (zapfs_events_*).

type EventType

type EventType string

EventType categorizes S3 events.

const (
	// Object created events
	EventObjectCreated               EventType = "s3:ObjectCreated:*"
	EventObjectCreatedPut            EventType = "s3:ObjectCreated:Put"
	EventObjectCreatedPost           EventType = "s3:ObjectCreated:Post"
	EventObjectCreatedCopy           EventType = "s3:ObjectCreated:Copy"
	EventObjectCreatedCompleteUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload"

	// Object removed events
	EventObjectRemoved             EventType = "s3:ObjectRemoved:*"
	EventObjectRemovedDelete       EventType = "s3:ObjectRemoved:Delete"
	EventObjectRemovedDeleteMarker EventType = "s3:ObjectRemoved:DeleteMarkerCreated"

	// Object restore events (Enterprise - Glacier-like)
	EventObjectRestore          EventType = "s3:ObjectRestore:*"
	EventObjectRestorePost      EventType = "s3:ObjectRestore:Post"
	EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed"
	EventObjectRestoreDelete    EventType = "s3:ObjectRestore:Delete"

	// Object tagging events
	EventObjectTagging       EventType = "s3:ObjectTagging:*"
	EventObjectTaggingPut    EventType = "s3:ObjectTagging:Put"
	EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete"

	// Object ACL events
	EventObjectACLPut EventType = "s3:ObjectAcl:Put"

	// Lifecycle events
	EventLifecycleExpiration             EventType = "s3:LifecycleExpiration:*"
	EventLifecycleExpirationDelete       EventType = "s3:LifecycleExpiration:Delete"
	EventLifecycleExpirationDeleteMarker EventType = "s3:LifecycleExpiration:DeleteMarkerCreated"
	EventLifecycleTransition             EventType = "s3:LifecycleTransition"

	// Replication events (Enterprise)
	EventReplication                EventType = "s3:Replication:*"
	EventReplicationCompleted       EventType = "s3:Replication:OperationCompletedReplication"
	EventReplicationFailed          EventType = "s3:Replication:OperationFailedReplication"
	EventReplicationMissedThreshold EventType = "s3:Replication:OperationMissedThreshold"
)

S3 event type constants. See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html

type KafkaConfig

type KafkaConfig struct {
	// Enabled activates the Kafka publisher.
	Enabled bool `mapstructure:"enabled"`

	// Brokers is the list of Kafka broker addresses.
	Brokers []string `mapstructure:"brokers"`

	// Topic is the Kafka topic for events (default: "s3-events").
	Topic string `mapstructure:"topic"`

	// RequiredAcks: 0=none, 1=leader, -1=all (default: 1).
	RequiredAcks int `mapstructure:"required_acks"`

	// Compression: "none", "gzip", "snappy", "lz4", "zstd" (default: "snappy").
	Compression string `mapstructure:"compression"`

	// BatchSize is the maximum messages per batch (default: 100).
	BatchSize int `mapstructure:"batch_size"`

	// BatchTimeout is the maximum time to wait for a batch (default: 1s).
	BatchTimeout time.Duration `mapstructure:"batch_timeout"`
}

KafkaConfig holds Kafka publisher settings.

type RedisConfig

type RedisConfig struct {
	// Enabled activates the Redis publisher.
	Enabled bool `mapstructure:"enabled"`

	// Addr is the Redis server address (e.g., "localhost:6379").
	Addr string `mapstructure:"addr"`

	// Password for Redis authentication (optional).
	Password string `mapstructure:"password"`

	// DB is the Redis database number (default: 0).
	DB int `mapstructure:"db"`

	// Channel is the channel prefix for publishing events.
	// Events are published to "{channel}:{bucket}" (default: "s3:events").
	Channel string `mapstructure:"channel"`

	// PoolSize is the maximum number of connections (default: 10).
	PoolSize int `mapstructure:"pool_size"`
}

RedisConfig holds Redis publisher settings.

type S3BucketEntity

type S3BucketEntity struct {
	Name          string              `json:"name"`
	OwnerIdentity S3BucketOwnerEntity `json:"ownerIdentity"`
	ARN           string              `json:"arn"`
}

S3BucketEntity contains bucket information.

type S3BucketOwnerEntity

type S3BucketOwnerEntity struct {
	PrincipalID string `json:"principalId"`
}

S3BucketOwnerEntity contains bucket owner information.

type S3Entity

type S3Entity struct {
	SchemaVersion   string         `json:"s3SchemaVersion"`
	ConfigurationID string         `json:"configurationId"`
	Bucket          S3BucketEntity `json:"bucket"`
	Object          S3ObjectEntity `json:"object"`
}

S3Entity contains the S3-specific event data.

type S3Event

type S3Event struct {
	Records []S3EventRecord `json:"Records"`
}

S3Event represents an S3 event notification in AWS format. See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html

func BuildS3Event

func BuildS3Event(payload *taskqueue.EventPayload, region, configID string) *S3Event

BuildS3Event converts an EventPayload to an S3Event for delivery.

type S3EventRecord

type S3EventRecord struct {
	EventVersion string    `json:"eventVersion"`
	EventSource  string    `json:"eventSource"`
	AWSRegion    string    `json:"awsRegion"`
	EventTime    time.Time `json:"eventTime"`
	EventName    string    `json:"eventName"`

	UserIdentity      S3UserIdentity      `json:"userIdentity"`
	RequestParameters S3RequestParameters `json:"requestParameters"`
	ResponseElements  S3ResponseElements  `json:"responseElements"`
	S3                S3Entity            `json:"s3"`
}

S3EventRecord represents a single event within an S3 notification.

type S3ObjectEntity

type S3ObjectEntity struct {
	Key       string `json:"key"`
	Size      int64  `json:"size"`
	ETag      string `json:"eTag"`
	VersionID string `json:"versionId,omitempty"`
	Sequencer string `json:"sequencer"`
}

S3ObjectEntity contains object information.

type S3RequestParameters

type S3RequestParameters struct {
	SourceIPAddress string `json:"sourceIPAddress"`
}

S3RequestParameters contains request metadata.

type S3ResponseElements

type S3ResponseElements struct {
	RequestID string `json:"x-amz-request-id"`
	HostID    string `json:"x-amz-id-2"`
}

S3ResponseElements contains response metadata.

type S3UserIdentity

type S3UserIdentity struct {
	PrincipalID string `json:"principalId"`
}

S3UserIdentity identifies the user who made the request.

type WebhookConfig

type WebhookConfig struct {
	// Enabled activates the Webhook publisher.
	Enabled bool `mapstructure:"enabled"`

	// Timeout for HTTP requests (default: 30s).
	Timeout time.Duration `mapstructure:"timeout"`

	// MaxRetries for failed deliveries (default: 3).
	MaxRetries int `mapstructure:"max_retries"`

	// RetryDelay between retry attempts (default: 1s).
	RetryDelay time.Duration `mapstructure:"retry_delay"`

	// UserAgent for HTTP requests (default: "ZapFS/1.0").
	UserAgent string `mapstructure:"user_agent"`
}

WebhookConfig holds Webhook publisher settings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL