Documentation
¶
Overview ¶
Package events provides S3 event notification infrastructure.
Community edition: - Events are queued via the taskqueue but not delivered - Provides the Emitter interface for S3 handlers to call
Enterprise edition: - EventHandler processes queued events - Redis, Kafka, and Webhook publishers deliver events
Index ¶
- Variables
- func MatchesEventType(pattern EventType, eventName string) bool
- func MatchesFilterRules(key string, prefix, suffix string) bool
- type Config
- type Emitter
- func (e *Emitter) Emit(ctx context.Context, payload *taskqueue.EventPayload)
- func (e *Emitter) EmitObjectCreated(ctx context.Context, eventType EventType, bucket, key string, size int64, ...)
- func (e *Emitter) EmitObjectRemoved(ctx context.Context, eventType EventType, ...)
- func (e *Emitter) EmitObjectTagging(ctx context.Context, eventType EventType, ...)
- func (e *Emitter) IsEnabled() bool
- func (e *Emitter) Stats() EmitterStats
- type EmitterConfig
- type EmitterStats
- type EventType
- type KafkaConfig
- type RedisConfig
- type S3BucketEntity
- type S3BucketOwnerEntity
- type S3Entity
- type S3Event
- type S3EventRecord
- type S3ObjectEntity
- type S3RequestParameters
- type S3ResponseElements
- type S3UserIdentity
- type WebhookConfig
Constants ¶
This section is empty.
Variables ¶
var ( // EventsEmittedTotal tracks total events emitted by event type EventsEmittedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "events", Name: "emitted_total", Help: "Total number of S3 events emitted", }, []string{"event_type"}) // event_type: "s3:ObjectCreated:Put", etc. // EventsDroppedTotal tracks events dropped (emitter disabled) EventsDroppedTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "events", Name: "dropped_total", Help: "Total number of S3 events dropped (emitter disabled)", }) // EventsErrorsTotal tracks event emission errors EventsErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "events", Name: "errors_total", Help: "Total number of event emission errors", }, []string{"error_type"}) // error_type: "marshal", "enqueue" // EventsDeliveredTotal tracks events delivered by publisher type (Enterprise) EventsDeliveredTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "events", Name: "delivered_total", Help: "Total number of S3 events delivered to publishers", }, []string{"publisher"}) // publisher: "redis", "kafka", "webhook" // EventsDeliveryErrorsTotal tracks delivery errors by publisher (Enterprise) EventsDeliveryErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "zapfs", Subsystem: "events", Name: "delivery_errors_total", Help: "Total number of event delivery errors", }, []string{"publisher"}) // publisher: "redis", "kafka", "webhook" // EventsDeliveryDuration tracks event delivery latency by publisher (Enterprise) EventsDeliveryDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "zapfs", Subsystem: "events", Name: "delivery_duration_seconds", Help: "Time spent delivering events to publishers", Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5}, }, []string{"publisher"}) // publisher: "redis", "kafka", "webhook" // EventsQueueDepth tracks current event queue depth EventsQueueDepth = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "zapfs", Subsystem: "events", Name: "queue_depth", Help: "Current number of events pending delivery", }) )
Functions ¶
func MatchesEventType ¶
MatchesEventType checks if an event name matches an event type pattern. Supports wildcard matching (e.g., "s3:ObjectCreated:*" matches "s3:ObjectCreated:Put").
func MatchesFilterRules ¶
MatchesFilterRules checks if an object key matches the filter rules. Returns true if no rules are specified or if the key matches all rules.
Types ¶
type Config ¶
type Config struct {
// Enabled controls whether event emission is active.
// When false, Emitter.Emit() is a no-op.
Enabled bool `mapstructure:"enabled"`
// Redis publisher configuration (Enterprise)
Redis RedisConfig `mapstructure:"redis"`
// Kafka publisher configuration (Enterprise)
Kafka KafkaConfig `mapstructure:"kafka"`
// Webhook publisher configuration (Enterprise)
Webhook WebhookConfig `mapstructure:"webhook"`
}
Config holds event notification configuration.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with default values.
func (*Config) HasPublishers ¶
HasPublishers returns true if at least one publisher is enabled.
type Emitter ¶
type Emitter struct {
// contains filtered or unexported fields
}
Emitter queues S3 events for async delivery via the taskqueue.
In community edition, events are queued but the EventHandler stub simply drops them. In enterprise edition, EventHandler delivers events to configured destinations (Redis, Kafka, Webhook).
func NoopEmitter ¶
func NoopEmitter() *Emitter
NoopEmitter returns an emitter that drops all events. Use this when event notifications are disabled.
func (*Emitter) Emit ¶
func (e *Emitter) Emit(ctx context.Context, payload *taskqueue.EventPayload)
Emit queues an S3 event for delivery. Returns immediately; delivery is async via the taskqueue.
If the emitter is disabled or the queue is nil, events are silently dropped. Errors are logged but not returned to avoid blocking S3 operations.
func (*Emitter) EmitObjectCreated ¶
func (e *Emitter) EmitObjectCreated(ctx context.Context, eventType EventType, bucket, key string, size int64, etag, versionID, ownerID, requestID, sourceIP string)
EmitObjectCreated emits an object creation event.
func (*Emitter) EmitObjectRemoved ¶
func (e *Emitter) EmitObjectRemoved(ctx context.Context, eventType EventType, bucket, key, versionID, ownerID, requestID, sourceIP string)
EmitObjectRemoved emits an object deletion event.
func (*Emitter) EmitObjectTagging ¶
func (e *Emitter) EmitObjectTagging(ctx context.Context, eventType EventType, bucket, key, versionID, ownerID, requestID, sourceIP string)
EmitObjectTagging emits an object tagging event.
func (*Emitter) Stats ¶
func (e *Emitter) Stats() EmitterStats
Stats returns emitter statistics. Note: Detailed metrics are available via Prometheus at /metrics endpoint.
type EmitterConfig ¶
type EmitterConfig struct {
// Queue is the taskqueue for persisting events.
// If nil, events are silently dropped.
Queue taskqueue.Queue
// Enabled controls whether events are queued.
// If false, Emit() is a no-op.
Enabled bool
// Region is the region identifier for events.
Region string
}
EmitterConfig configures the event emitter.
type EmitterStats ¶
EmitterStats contains emitter status information. Detailed metrics are exposed via Prometheus (zapfs_events_*).
type EventType ¶
type EventType string
EventType categorizes S3 events.
const ( // Object created events EventObjectCreated EventType = "s3:ObjectCreated:*" EventObjectCreatedPut EventType = "s3:ObjectCreated:Put" EventObjectCreatedPost EventType = "s3:ObjectCreated:Post" EventObjectCreatedCopy EventType = "s3:ObjectCreated:Copy" EventObjectCreatedCompleteUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload" // Object removed events EventObjectRemoved EventType = "s3:ObjectRemoved:*" EventObjectRemovedDelete EventType = "s3:ObjectRemoved:Delete" EventObjectRemovedDeleteMarker EventType = "s3:ObjectRemoved:DeleteMarkerCreated" // Object restore events (Enterprise - Glacier-like) EventObjectRestore EventType = "s3:ObjectRestore:*" EventObjectRestorePost EventType = "s3:ObjectRestore:Post" EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed" EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete" // Object tagging events EventObjectTagging EventType = "s3:ObjectTagging:*" EventObjectTaggingPut EventType = "s3:ObjectTagging:Put" EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete" // Object ACL events EventObjectACLPut EventType = "s3:ObjectAcl:Put" // Lifecycle events EventLifecycleExpiration EventType = "s3:LifecycleExpiration:*" EventLifecycleExpirationDelete EventType = "s3:LifecycleExpiration:Delete" EventLifecycleExpirationDeleteMarker EventType = "s3:LifecycleExpiration:DeleteMarkerCreated" EventLifecycleTransition EventType = "s3:LifecycleTransition" // Replication events (Enterprise) EventReplication EventType = "s3:Replication:*" EventReplicationCompleted EventType = "s3:Replication:OperationCompletedReplication" EventReplicationFailed EventType = "s3:Replication:OperationFailedReplication" EventReplicationMissedThreshold EventType = "s3:Replication:OperationMissedThreshold" )
S3 event type constants. See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
type KafkaConfig ¶
type KafkaConfig struct {
// Enabled activates the Kafka publisher.
Enabled bool `mapstructure:"enabled"`
// Brokers is the list of Kafka broker addresses.
Brokers []string `mapstructure:"brokers"`
// Topic is the Kafka topic for events (default: "s3-events").
Topic string `mapstructure:"topic"`
// RequiredAcks: 0=none, 1=leader, -1=all (default: 1).
RequiredAcks int `mapstructure:"required_acks"`
// Compression: "none", "gzip", "snappy", "lz4", "zstd" (default: "snappy").
Compression string `mapstructure:"compression"`
// BatchSize is the maximum messages per batch (default: 100).
BatchSize int `mapstructure:"batch_size"`
// BatchTimeout is the maximum time to wait for a batch (default: 1s).
BatchTimeout time.Duration `mapstructure:"batch_timeout"`
}
KafkaConfig holds Kafka publisher settings.
type RedisConfig ¶
type RedisConfig struct {
// Enabled activates the Redis publisher.
Enabled bool `mapstructure:"enabled"`
// Addr is the Redis server address (e.g., "localhost:6379").
Addr string `mapstructure:"addr"`
// Password for Redis authentication (optional).
Password string `mapstructure:"password"`
// DB is the Redis database number (default: 0).
DB int `mapstructure:"db"`
// Channel is the channel prefix for publishing events.
// Events are published to "{channel}:{bucket}" (default: "s3:events").
Channel string `mapstructure:"channel"`
// PoolSize is the maximum number of connections (default: 10).
PoolSize int `mapstructure:"pool_size"`
}
RedisConfig holds Redis publisher settings.
type S3BucketEntity ¶
type S3BucketEntity struct {
Name string `json:"name"`
OwnerIdentity S3BucketOwnerEntity `json:"ownerIdentity"`
ARN string `json:"arn"`
}
S3BucketEntity contains bucket information.
type S3BucketOwnerEntity ¶
type S3BucketOwnerEntity struct {
PrincipalID string `json:"principalId"`
}
S3BucketOwnerEntity contains bucket owner information.
type S3Entity ¶
type S3Entity struct {
SchemaVersion string `json:"s3SchemaVersion"`
ConfigurationID string `json:"configurationId"`
Bucket S3BucketEntity `json:"bucket"`
Object S3ObjectEntity `json:"object"`
}
S3Entity contains the S3-specific event data.
type S3Event ¶
type S3Event struct {
Records []S3EventRecord `json:"Records"`
}
S3Event represents an S3 event notification in AWS format. See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
func BuildS3Event ¶
func BuildS3Event(payload *taskqueue.EventPayload, region, configID string) *S3Event
BuildS3Event converts an EventPayload to an S3Event for delivery.
type S3EventRecord ¶
type S3EventRecord struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"`
AWSRegion string `json:"awsRegion"`
EventTime time.Time `json:"eventTime"`
EventName string `json:"eventName"`
UserIdentity S3UserIdentity `json:"userIdentity"`
RequestParameters S3RequestParameters `json:"requestParameters"`
ResponseElements S3ResponseElements `json:"responseElements"`
S3 S3Entity `json:"s3"`
}
S3EventRecord represents a single event within an S3 notification.
type S3ObjectEntity ¶
type S3ObjectEntity struct {
Key string `json:"key"`
Size int64 `json:"size"`
ETag string `json:"eTag"`
VersionID string `json:"versionId,omitempty"`
Sequencer string `json:"sequencer"`
}
S3ObjectEntity contains object information.
type S3RequestParameters ¶
type S3RequestParameters struct {
SourceIPAddress string `json:"sourceIPAddress"`
}
S3RequestParameters contains request metadata.
type S3ResponseElements ¶
type S3ResponseElements struct {
RequestID string `json:"x-amz-request-id"`
HostID string `json:"x-amz-id-2"`
}
S3ResponseElements contains response metadata.
type S3UserIdentity ¶
type S3UserIdentity struct {
PrincipalID string `json:"principalId"`
}
S3UserIdentity identifies the user who made the request.
type WebhookConfig ¶
type WebhookConfig struct {
// Enabled activates the Webhook publisher.
Enabled bool `mapstructure:"enabled"`
// Timeout for HTTP requests (default: 30s).
Timeout time.Duration `mapstructure:"timeout"`
// MaxRetries for failed deliveries (default: 3).
MaxRetries int `mapstructure:"max_retries"`
// RetryDelay between retry attempts (default: 1s).
RetryDelay time.Duration `mapstructure:"retry_delay"`
// UserAgent for HTTP requests (default: "ZapFS/1.0").
UserAgent string `mapstructure:"user_agent"`
}
WebhookConfig holds Webhook publisher settings.