06-event-streaming

command
v0.0.0-...-da59b90 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2025 License: MIT Imports: 7 Imported by: 0

README ΒΆ

Event Streaming Memory Example

Redis Streams for event sourcing, audit trails, and debugging AI agent behavior.

🎯 What This Example Shows

  • Event sourcing with Redis Streams
  • Real-time event processing
  • Consumer groups for distributed processing
  • Event replay and time travel
  • Audit trail generation
  • Analytics and debugging

πŸ“‹ Features

  • βœ… Event sourcing - Immutable event log
  • βœ… Real-time streaming - Process events as they happen
  • βœ… Consumer groups - Distributed processing
  • βœ… Event replay - Time travel through history
  • βœ… Audit trails - Complete activity log
  • βœ… Analytics - Event statistics and patterns

πŸš€ Quick Start

Prerequisites
  1. Redis server (6.0+ for Streams)
  2. Environment variables:
# Required
export REDIS_URL="localhost:6379"

# Optional
export REDIS_PASSWORD=""
Run the Example
go run main.go

πŸ’» Architecture Overview

    Event Occurs
         ↓
    Publish to Stream
         ↓
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚Redis Streamβ”‚
    β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
          ↓
    β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    ↓            ↓          ↓         ↓
Analytics    Audit Log   Debugger   Replay
Consumer     Consumer    Consumer   System

πŸ“‘ Event Types

const (
    EventMessageAdded    = "message.added"
    EventSessionStarted  = "session.started"
    EventSessionEnded    = "session.ended"
    EventSummaryCreated  = "summary.created"
    EventSearchPerformed = "search.performed"
    EventMemoryCleared   = "memory.cleared"
    EventErrorOccurred   = "error.occurred"
)

πŸ”„ Event Flow

Publishing Events
// Every action generates an event
event := Event{
    Type:      EventMessageAdded,
    SessionID: "session-123",
    UserID:    "user-456",
    Timestamp: time.Now(),
    Data: map[string]interface{}{
        "message_id": "msg-789",
        "role":       "user",
        "tokens":     150,
    },
}

// Publish to stream
redis.XAdd(ctx, &redis.XAddArgs{
    Stream: "memory:events",
    Values: eventToMap(event),
})
Consuming Events
// Real-time consumption
for {
    events := redis.XRead(ctx, &redis.XReadArgs{
        Streams: []string{"memory:events", lastID},
        Block:   1 * time.Second,
    })
    
    for _, event := range events {
        processEvent(event)
    }
}

πŸ‘₯ Consumer Groups

Multiple Consumers
Stream: memory:events
    β”‚
    β”œβ”€β†’ analytics-group
    β”‚     β”œβ”€β†’ analytics-consumer-1
    β”‚     └─→ analytics-consumer-2
    β”‚
    β”œβ”€β†’ audit-group
    β”‚     └─→ audit-consumer-1
    β”‚
    └─→ debug-group
          └─→ debug-consumer-1
Group Benefits
  • Parallel processing - Multiple consumers per group
  • At-least-once delivery - Acknowledgment required
  • Load balancing - Automatic work distribution
  • Fault tolerance - Reassign pending messages

πŸ• Event Replay

Time-Based Replay
// Replay events from last hour
oneHourAgo := time.Now().Add(-1 * time.Hour)
events := redis.XRange(ctx, "memory:events", 
    fmt.Sprintf("%d", oneHourAgo.UnixMilli()), "+")

// Process historical events
for _, event := range events {
    replayEvent(event)
}
Session Reconstruction
// Rebuild complete session from events
func reconstructSession(sessionID string) Session {
    events := redis.XRange(ctx, "memory:events", "-", "+")
    
    session := Session{}
    for _, event := range events {
        if event.SessionID == sessionID {
            applyEventToSession(&session, event)
        }
    }
    return session
}

πŸ“Š Event Analytics

Real-time Metrics
type EventMetrics struct {
    TotalEvents        int64
    EventsPerSecond    float64
    AverageLatency     time.Duration
    ErrorRate          float64
    ActiveSessions     int
    TopEventTypes      map[string]int
}
Pattern Detection
  • Identify usage patterns
  • Detect anomalies
  • Track performance trends
  • Monitor error rates

πŸ” Debugging Features

Event Inspector
// Find all events for a user
func getUserEvents(userID string) []Event {
    // Query stream for user's events
    // Useful for debugging user issues
}

// Find error events
func getErrorEvents(timeRange TimeRange) []Event {
    // Filter for error events
    // Helps identify problems
}
Session Timeline
Session: abc-123
10:00:00 session.started
10:00:05 message.added (user)
10:00:07 message.added (assistant)
10:00:45 search.performed
10:01:20 summary.created
10:15:00 session.ended

🎯 Use Cases

1. Audit Trail
  • Complete activity log
  • Compliance requirements
  • Security monitoring
2. Debugging
  • Reproduce issues
  • Trace execution flow
  • Performance analysis
3. Analytics
  • Usage patterns
  • User behavior
  • System metrics
4. Recovery
  • Rebuild state from events
  • Disaster recovery
  • Data migration

πŸ”§ Configuration

type EventStreamConfig struct {
    // Stream settings
    StreamName      string
    MaxLength       int64  // Limit stream size
    ApproxMaxLength bool   // Use approximate trimming
    
    // Consumer settings
    ConsumerGroup   string
    ConsumerName    string
    BatchSize       int
    BlockTimeout    time.Duration
    
    // Retention
    RetentionPeriod time.Duration
    CompactOldEvents bool
}

πŸ“ˆ Performance Considerations

Stream Size Management
// Trim old events
redis.XTrimMaxLen(ctx, "memory:events", 10000)

// Time-based trimming
redis.XTrimMinID(ctx, "memory:events", oneWeekAgo)
Throughput
Events/Second CPU Usage Memory Recommendation
< 100 Low < 10MB Single consumer
100-1000 Medium < 100MB 2-3 consumers
1000-10000 High < 1GB Consumer group
> 10000 Very High > 1GB Multiple Redis instances

🚨 Common Patterns

Event Sourcing Pattern
// State = fold(events)
currentState := initialState
for _, event := range events {
    currentState = applyEvent(currentState, event)
}
CQRS Pattern
// Commands write events
publishEvent(CommandExecuted{...})

// Queries read projections
projection := readProjection()

πŸ’‘ Best Practices

  1. Immutable events - Never modify, only append
  2. Event versioning - Handle schema evolution
  3. Idempotency - Handle duplicate events
  4. Compression - Compact old events
  5. Monitoring - Track lag and throughput

πŸ“š Next Steps

πŸ“„ Full Code

See main.go for the complete implementation.

Documentation ΒΆ

Overview ΒΆ

Event Streaming Memory Example Demonstrates Redis Streams for event sourcing and audit trails.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL