Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateInteractionID(sessionID string, timestamp time.Time) string
- func GetExtraChunks(msg *base.ChatMessage) []string
- func IsRetryableError(err error) bool
- func RetryWithBackoff(ctx context.Context, config RetryConfig, fn func() error) error
- type AdapterManager
- func (m *AdapterManager) GetAdapter(platform string) (ChatAdapter, bool)
- func (m *AdapterManager) Handler() http.Handler
- func (m *AdapterManager) ListPlatforms() []string
- func (m *AdapterManager) Register(adapter ChatAdapter) error
- func (m *AdapterManager) RegisterEngine(eng *engine.Engine)
- func (m *AdapterManager) RegisterRoutes(router *mux.Router)
- func (m *AdapterManager) SendMessage(ctx context.Context, platform, sessionID string, msg *ChatMessage) error
- func (m *AdapterManager) StartAll(ctx context.Context) error
- func (m *AdapterManager) StopAll() error
- func (m *AdapterManager) Unregister(platform string) error
- type AdapterMetrics
- type AggregatedMessageSender
- type Attachment
- type ChatAdapter
- type ChatMessage
- type ChunkInfo
- type ChunkProcessor
- type ChunkProcessorOptions
- type ConfigLoader
- func (c *ConfigLoader) Close() error
- func (c *ConfigLoader) GetConfig(platform string) *PlatformConfig
- func (c *ConfigLoader) GetOptions(platform string) map[string]any
- func (c *ConfigLoader) GetSystemPrompt(platform string) string
- func (c *ConfigLoader) GetTaskInstructions(platform string) string
- func (c *ConfigLoader) HasPlatform(platform string) bool
- func (c *ConfigLoader) Load(configDir string) error
- func (c *ConfigLoader) Platforms() []string
- func (c *ConfigLoader) StartHotReload(ctx context.Context, configDir string, ...) error
- type DiscordEmbed
- type DiscordEmbedField
- type DiscordEmbedFooter
- type DiscordEmbedImage
- type DiscordEmbedThumbnail
- type EngineConfig
- type EngineHolder
- type EngineHolderOptions
- type EngineMessageHandler
- type EngineMessageHandlerOption
- type EventConfig
- type FormatConversionProcessor
- type HealthCheck
- type HealthChecker
- type InlineKeyboardButton
- type InlineKeyboardMarkup
- type InteractionCallback
- type InteractionManager
- func (m *InteractionManager) Complete(id string, response *InteractionResponse) error
- func (m *InteractionManager) Count() int
- func (m *InteractionManager) Delete(id string)
- func (m *InteractionManager) Expire(id string) error
- func (m *InteractionManager) Get(id string) (*PendingInteraction, bool)
- func (m *InteractionManager) GetBySession(sessionID string) []*PendingInteraction
- func (m *InteractionManager) HandleCallback(interactionID, userID, actionID, callbackData string) error
- func (m *InteractionManager) PendingCount() int
- func (m *InteractionManager) Stop()
- func (m *InteractionManager) Store(interaction *PendingInteraction) string
- func (m *InteractionManager) TotalCount() int
- type InteractionManagerOptions
- type InteractionResponse
- type InteractionStatus
- type InteractionType
- type LifecycleManager
- func (m *LifecycleManager) OnStart(hook func(ChatAdapter) error)
- func (m *LifecycleManager) OnStop(hook func(ChatAdapter) error)
- func (m *LifecycleManager) RegisterAdapter(adapter ChatAdapter, startPriority int)
- func (m *LifecycleManager) StartAll(ctx context.Context) error
- func (m *LifecycleManager) StopAll() error
- type Logger
- type MessageAggregatorProcessor
- func (p *MessageAggregatorProcessor) Name() string
- func (p *MessageAggregatorProcessor) Order() int
- func (p *MessageAggregatorProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
- func (p *MessageAggregatorProcessor) SetSender(sender AggregatedMessageSender)
- func (p *MessageAggregatorProcessor) Stop()
- type MessageAggregatorProcessorOptions
- type MessageHandler
- type MessageProcessor
- type MessageQueue
- func (q *MessageQueue) AddToDLQ(msg *QueuedMessage)
- func (q *MessageQueue) DLQLen() int
- func (q *MessageQueue) Dequeue() (*QueuedMessage, bool)
- func (q *MessageQueue) Enqueue(platform, sessionID string, msg *ChatMessage) error
- func (q *MessageQueue) GetDLQ() []*QueuedMessage
- func (q *MessageQueue) Requeue(msg *QueuedMessage) error
- func (q *MessageQueue) Size() int
- func (q *MessageQueue) Start(adapterGetter func(string) (ChatAdapter, bool), ...)
- func (q *MessageQueue) Stop()
- type ParseMode
- type PendingInteraction
- type PlatformConfig
- type ProcessorChain
- type ProcessorOrder
- type QueueError
- type QueuedMessage
- type RateLimitProcessor
- func (p *RateLimitProcessor) Cleanup()
- func (p *RateLimitProcessor) GetSessionStats(platform, sessionID string) (lastSend time.Time, exists bool)
- func (p *RateLimitProcessor) Name() string
- func (p *RateLimitProcessor) Order() int
- func (p *RateLimitProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
- type RateLimitProcessorOptions
- type RateLimiter
- type RetryConfig
- type RichContent
- type RichContentProcessor
- type SlackBlock
- type StreamAdapter
- type StreamCallback
- type StreamHandler
- type StreamState
- type ThreadInfo
- type ThreadProcessor
- func (p *ThreadProcessor) Delete(sessionID string)
- func (p *ThreadProcessor) GetThreadTS(sessionID string) string
- func (p *ThreadProcessor) Name() string
- func (p *ThreadProcessor) Order() int
- func (p *ThreadProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
- func (p *ThreadProcessor) SetThreadTS(sessionID, threadTS, channelID string)
- func (p *ThreadProcessor) Stop()
- type ThreadProcessorOptions
Constants ¶
const ( ParseModeNone = base.ParseModeNone ParseModeMarkdown = base.ParseModeMarkdown ParseModeHTML = base.ParseModeHTML )
Variables ¶
var ( // MessagesAggregatedTotal counts total messages aggregated MessagesAggregatedTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "hotplex_aggregator_messages_aggregated_total", Help: "Total number of messages that have been aggregated", }, []string{"event_type", "platform"}, ) // MessagesFlushedTotal counts total flushes by reason MessagesFlushedTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "hotplex_aggregator_messages_flushed_total", Help: "Total number of times the buffer was flushed", }, []string{"event_type", "platform", "reason"}, ) // MessagesDroppedTotal counts dropped messages MessagesDroppedTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "hotplex_aggregator_messages_dropped_total", Help: "Total number of messages dropped due to buffer overflow", }, []string{"event_type", "platform", "reason"}, ) // BufferSizeGauge tracks current buffer size per platform BufferSizeGauge = promauto.NewGaugeVec( prometheus.GaugeOpts{ Name: "hotplex_aggregator_buffer_size", Help: "Current number of messages in the buffer per platform", }, []string{"platform"}, ) // BufferDurationHistogram tracks time from first message to flush BufferDurationHistogram = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "hotplex_aggregator_buffer_duration_seconds", Help: "Time in seconds from first message arrival to buffer flush", Buckets: []float64{0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0}, }, []string{"platform"}, ) // MessageSizeHistogram tracks message size distribution MessageSizeHistogram = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "hotplex_aggregator_message_size_bytes", Help: "Size distribution of aggregated messages in bytes", Buckets: []float64{64, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768}, }, []string{"event_type", "platform"}, ) )
var ErrQueueFull = &QueueError{Message: "queue is full"}
Functions ¶
func GenerateInteractionID ¶ added in v0.12.0
GenerateInteractionID generates a unique interaction ID.
func GetExtraChunks ¶ added in v0.12.0
func GetExtraChunks(msg *base.ChatMessage) []string
GetExtraChunks returns any extra chunks stored in message metadata. Returns nil if no extra chunks exist.
func IsRetryableError ¶
IsRetryableError classifies errors as retryable or non-retryable
func RetryWithBackoff ¶
func RetryWithBackoff(ctx context.Context, config RetryConfig, fn func() error) error
Types ¶
type AdapterManager ¶
type AdapterManager struct {
// contains filtered or unexported fields
}
func NewAdapterManager ¶
func NewAdapterManager(logger *slog.Logger) *AdapterManager
func Setup ¶ added in v0.11.0
Setup initializes all enabled ChatApps and their dedicated Engines. It returns an http.Handler that handles all webhook routes.
func (*AdapterManager) GetAdapter ¶
func (m *AdapterManager) GetAdapter(platform string) (ChatAdapter, bool)
func (*AdapterManager) Handler ¶ added in v0.11.0
func (m *AdapterManager) Handler() http.Handler
Handler returns an http.Handler with all adapter webhooks mounted This is a convenience method when you don't need gorilla/mux
func (*AdapterManager) ListPlatforms ¶
func (m *AdapterManager) ListPlatforms() []string
func (*AdapterManager) Register ¶
func (m *AdapterManager) Register(adapter ChatAdapter) error
func (*AdapterManager) RegisterEngine ¶ added in v0.11.0
func (m *AdapterManager) RegisterEngine(eng *engine.Engine)
func (*AdapterManager) RegisterRoutes ¶ added in v0.11.0
func (m *AdapterManager) RegisterRoutes(router *mux.Router)
RegisterRoutes registers all adapter webhooks to a unified router Path format: /webhook/{platform} (e.g., /webhook/telegram, /webhook/discord)
func (*AdapterManager) SendMessage ¶
func (m *AdapterManager) SendMessage(ctx context.Context, platform, sessionID string, msg *ChatMessage) error
SendMessage sends a message to a specific platform
func (*AdapterManager) StopAll ¶
func (m *AdapterManager) StopAll() error
func (*AdapterManager) Unregister ¶
func (m *AdapterManager) Unregister(platform string) error
type AdapterMetrics ¶
type AdapterMetrics struct {
MessagesReceived atomic.Int64
MessagesSent atomic.Int64
MessagesFailed atomic.Int64
LastMessageAt atomic.Int64
Uptime atomic.Int64
}
func NewAdapterMetrics ¶
func NewAdapterMetrics() *AdapterMetrics
func (*AdapterMetrics) GetStats ¶
func (m *AdapterMetrics) GetStats() map[string]int64
func (*AdapterMetrics) RecordFailure ¶
func (m *AdapterMetrics) RecordFailure()
func (*AdapterMetrics) RecordReceive ¶
func (m *AdapterMetrics) RecordReceive()
func (*AdapterMetrics) RecordSend ¶
func (m *AdapterMetrics) RecordSend()
type AggregatedMessageSender ¶ added in v0.12.0
type AggregatedMessageSender interface {
SendAggregatedMessage(ctx context.Context, msg *base.ChatMessage) error
}
AggregatedMessageSender sends aggregated messages
type Attachment ¶
type Attachment = base.Attachment
type ChatAdapter ¶
type ChatAdapter = base.ChatAdapter
type ChatMessage ¶
type ChatMessage = base.ChatMessage
func NewChatMessage ¶ added in v0.11.0
func NewChatMessage(platform, sessionID, userID, content string) *ChatMessage
type ChunkInfo ¶ added in v0.12.0
ChunkInfo holds metadata about chunked messages.
func GetChunkInfo ¶ added in v0.12.0
func GetChunkInfo(msg *base.ChatMessage) *ChunkInfo
GetChunkInfo returns the ChunkInfo from message metadata.
type ChunkProcessor ¶ added in v0.12.0
type ChunkProcessor struct {
// contains filtered or unexported fields
}
ChunkProcessor splits long messages into chunks that fit within platform limits. It uses the Slack chunker for Markdown-aware splitting.
func NewChunkProcessor ¶ added in v0.12.0
func NewChunkProcessor(logger *slog.Logger, opts ChunkProcessorOptions) *ChunkProcessor
NewChunkProcessor creates a new ChunkProcessor.
func (*ChunkProcessor) Name ¶ added in v0.12.0
func (p *ChunkProcessor) Name() string
Name returns the processor name.
func (*ChunkProcessor) Order ¶ added in v0.12.0
func (p *ChunkProcessor) Order() int
Order returns the processor order (runs after format conversion).
func (*ChunkProcessor) Process ¶ added in v0.12.0
func (p *ChunkProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process splits the message into chunks if it exceeds the character limit. Returns either a single message or a slice of messages.
type ChunkProcessorOptions ¶ added in v0.12.0
type ChunkProcessorOptions struct {
MaxChars int // Maximum characters per chunk (default: 4000 for Slack)
}
ChunkProcessorOptions configures the ChunkProcessor.
type ConfigLoader ¶
type ConfigLoader struct {
// contains filtered or unexported fields
}
func NewConfigLoader ¶
func NewConfigLoader(configDir string, logger *slog.Logger) (*ConfigLoader, error)
func (*ConfigLoader) Close ¶ added in v0.11.0
func (c *ConfigLoader) Close() error
Close stops all hot reload watchers and releases resources.
func (*ConfigLoader) GetConfig ¶
func (c *ConfigLoader) GetConfig(platform string) *PlatformConfig
func (*ConfigLoader) GetOptions ¶ added in v0.11.0
func (c *ConfigLoader) GetOptions(platform string) map[string]any
func (*ConfigLoader) GetSystemPrompt ¶
func (c *ConfigLoader) GetSystemPrompt(platform string) string
func (*ConfigLoader) GetTaskInstructions ¶
func (c *ConfigLoader) GetTaskInstructions(platform string) string
func (*ConfigLoader) HasPlatform ¶
func (c *ConfigLoader) HasPlatform(platform string) bool
func (*ConfigLoader) Load ¶
func (c *ConfigLoader) Load(configDir string) error
func (*ConfigLoader) Platforms ¶
func (c *ConfigLoader) Platforms() []string
func (*ConfigLoader) StartHotReload ¶ added in v0.11.0
func (c *ConfigLoader) StartHotReload(ctx context.Context, configDir string, onReload func(platform string, cfg *PlatformConfig)) error
StartHotReload starts watching all config files for changes and automatically reloads them. The onReload callback is called with the updated PlatformConfig for each platform.
type DiscordEmbed ¶
type DiscordEmbed struct {
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
URL string `json:"url,omitempty"`
Color int `json:"color,omitempty"`
Fields []DiscordEmbedField `json:"fields,omitempty"`
Thumbnail *DiscordEmbedThumbnail `json:"thumbnail,omitempty"`
Image *DiscordEmbedImage `json:"image,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
}
type DiscordEmbedField ¶
type DiscordEmbedFooter ¶
type DiscordEmbedFooter struct {
}
type DiscordEmbedImage ¶
type DiscordEmbedImage struct {
URL string `json:"url"`
}
type DiscordEmbedThumbnail ¶
type DiscordEmbedThumbnail struct {
URL string `json:"url"`
}
type EngineConfig ¶ added in v0.11.0
type EngineHolder ¶
type EngineHolder struct {
// contains filtered or unexported fields
}
EngineHolder holds the Engine instance and configuration for ChatApps integration
func NewEngineHolder ¶
func NewEngineHolder(opts EngineHolderOptions) (*EngineHolder, error)
NewEngineHolder creates a new EngineHolder with the given options
func (*EngineHolder) GetAdapterManager ¶
func (h *EngineHolder) GetAdapterManager() *AdapterManager
GetAdapterManager returns the AdapterManager for sending messages
func (*EngineHolder) GetEngine ¶
func (h *EngineHolder) GetEngine() *engine.Engine
GetEngine returns the underlying Engine instance
type EngineHolderOptions ¶
type EngineHolderOptions struct {
Logger *slog.Logger
Adapters *AdapterManager
Timeout time.Duration
IdleTimeout time.Duration
Namespace string
PermissionMode string
AllowedTools []string
DisallowedTools []string
DefaultWorkDir string
DefaultTaskInstr string
}
EngineHolderOptions configures the EngineHolder
type EngineMessageHandler ¶
type EngineMessageHandler struct {
// contains filtered or unexported fields
}
EngineMessageHandler implements MessageHandler and integrates with Engine
func NewEngineMessageHandler ¶
func NewEngineMessageHandler(engine *engine.Engine, adapters *AdapterManager, opts ...EngineMessageHandlerOption) *EngineMessageHandler
NewEngineMessageHandler creates a new EngineMessageHandler
func (*EngineMessageHandler) Handle ¶
func (h *EngineMessageHandler) Handle(ctx context.Context, msg *ChatMessage) error
Handle implements EngineMessageHandler
type EngineMessageHandlerOption ¶
type EngineMessageHandlerOption func(*EngineMessageHandler)
EngineMessageHandlerOption configures the EngineMessageHandler
func WithConfigLoader ¶
func WithConfigLoader(loader *ConfigLoader) EngineMessageHandlerOption
func WithLogger ¶
func WithLogger(logger *slog.Logger) EngineMessageHandlerOption
func WithTaskInstrFn ¶
func WithTaskInstrFn(fn func(sessionID string) string) EngineMessageHandlerOption
func WithWorkDirFn ¶
func WithWorkDirFn(fn func(sessionID string) string) EngineMessageHandlerOption
type EventConfig ¶ added in v0.12.0
type EventConfig struct {
Aggregate bool // Whether to aggregate messages of this type
SameTypeOnly bool // Only aggregate with same event type
Immediate bool // Send immediately, skip aggregation
UseUpdate bool // Use chat.update for streaming updates
}
EventConfig defines aggregation behavior for specific event types
type FormatConversionProcessor ¶ added in v0.12.0
type FormatConversionProcessor struct {
// contains filtered or unexported fields
}
FormatConversionProcessor converts message content to platform-specific formats
func NewFormatConversionProcessor ¶ added in v0.12.0
func NewFormatConversionProcessor(logger *slog.Logger) *FormatConversionProcessor
NewFormatConversionProcessor creates a new FormatConversionProcessor
func (*FormatConversionProcessor) Name ¶ added in v0.12.0
func (p *FormatConversionProcessor) Name() string
Name returns the processor name
func (*FormatConversionProcessor) Order ¶ added in v0.12.0
func (p *FormatConversionProcessor) Order() int
Order returns the processor order
func (*FormatConversionProcessor) Process ¶ added in v0.12.0
func (p *FormatConversionProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process converts message content based on platform
type HealthCheck ¶
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
func NewHealthChecker ¶
func NewHealthChecker(interval time.Duration) *HealthChecker
func (*HealthChecker) GetStatus ¶
func (h *HealthChecker) GetStatus() map[string]HealthCheck
func (*HealthChecker) Register ¶
func (h *HealthChecker) Register(check HealthCheck)
func (*HealthChecker) Start ¶
func (h *HealthChecker) Start()
func (*HealthChecker) Stop ¶
func (h *HealthChecker) Stop()
type InlineKeyboardButton ¶
type InlineKeyboardMarkup ¶
type InlineKeyboardMarkup struct {
InlineKeyboard [][]InlineKeyboardButton `json:"inline_keyboard"`
}
type InteractionCallback ¶ added in v0.12.0
type InteractionCallback func(interaction *PendingInteraction) error
InteractionCallback is a function that handles an interaction callback.
type InteractionManager ¶ added in v0.12.0
type InteractionManager struct {
// contains filtered or unexported fields
}
InteractionManager manages pending interactions for Slack interactive components.
func NewInteractionManager ¶ added in v0.12.0
func NewInteractionManager(logger *slog.Logger, opts InteractionManagerOptions) *InteractionManager
NewInteractionManager creates a new InteractionManager.
func (*InteractionManager) Complete ¶ added in v0.12.0
func (m *InteractionManager) Complete(id string, response *InteractionResponse) error
Complete marks an interaction as completed with a response.
func (*InteractionManager) Count ¶ added in v0.12.0
func (m *InteractionManager) Count() int
Count returns the number of pending interactions.
func (*InteractionManager) Delete ¶ added in v0.12.0
func (m *InteractionManager) Delete(id string)
Delete removes a pending interaction.
func (*InteractionManager) Expire ¶ added in v0.12.0
func (m *InteractionManager) Expire(id string) error
Expire marks an interaction as expired.
func (*InteractionManager) Get ¶ added in v0.12.0
func (m *InteractionManager) Get(id string) (*PendingInteraction, bool)
Get retrieves a pending interaction by ID. Returns nil if not found or expired.
func (*InteractionManager) GetBySession ¶ added in v0.12.0
func (m *InteractionManager) GetBySession(sessionID string) []*PendingInteraction
GetBySession retrieves all pending interactions for a session.
func (*InteractionManager) HandleCallback ¶ added in v0.12.0
func (m *InteractionManager) HandleCallback(interactionID, userID, actionID, callbackData string) error
HandleCallback processes an interaction callback. It looks up the interaction, calls the callback, and removes the interaction.
func (*InteractionManager) PendingCount ¶ added in v0.12.0
func (m *InteractionManager) PendingCount() int
PendingCount returns the number of pending (non-expired) interactions.
func (*InteractionManager) Stop ¶ added in v0.12.0
func (m *InteractionManager) Stop()
Stop stops the cleanup goroutine.
func (*InteractionManager) Store ¶ added in v0.12.0
func (m *InteractionManager) Store(interaction *PendingInteraction) string
Store adds a new pending interaction and returns its ID.
func (*InteractionManager) TotalCount ¶ added in v0.12.0
func (m *InteractionManager) TotalCount() int
TotalCount returns the total number of interactions (including expired).
type InteractionManagerOptions ¶ added in v0.12.0
type InteractionManagerOptions struct {
CleanupInterval time.Duration // How often to run cleanup (default: 1 min)
TTL time.Duration // How long to keep pending interactions (default: 10 min)
}
InteractionManagerOptions configures the InteractionManager.
type InteractionResponse ¶ added in v0.12.0
type InteractionResponse struct {
ActionID string `json:"action_id"`
Value string `json:"value"`
UserID string `json:"user_id"`
RespondedAt time.Time `json:"responded_at"`
}
InteractionResponse represents the user's response to an interaction.
type InteractionStatus ¶ added in v0.12.0
type InteractionStatus string
InteractionStatus is the status of a pending interaction.
const ( InteractionStatusPending InteractionStatus = "pending" InteractionStatusCompleted InteractionStatus = "completed" InteractionStatusExpired InteractionStatus = "expired" InteractionStatusCancelled InteractionStatus = "cancelled" )
type InteractionType ¶ added in v0.12.0
type InteractionType string
InteractionType defines the type of interactive message.
const ( // InteractionTypePermission is for Claude Code permission requests (Issue #39). InteractionTypePermission InteractionType = "permission" // InteractionTypeApproval is for general approval requests (Issue #37). InteractionTypeApproval InteractionType = "approval" // InteractionTypeSelection is for selection/choice requests. InteractionTypeSelection InteractionType = "selection" )
type LifecycleManager ¶
type LifecycleManager struct {
// contains filtered or unexported fields
}
func NewLifecycleManager ¶
func NewLifecycleManager() *LifecycleManager
func (*LifecycleManager) OnStart ¶
func (m *LifecycleManager) OnStart(hook func(ChatAdapter) error)
func (*LifecycleManager) OnStop ¶
func (m *LifecycleManager) OnStop(hook func(ChatAdapter) error)
func (*LifecycleManager) RegisterAdapter ¶
func (m *LifecycleManager) RegisterAdapter(adapter ChatAdapter, startPriority int)
func (*LifecycleManager) StopAll ¶
func (m *LifecycleManager) StopAll() error
type MessageAggregatorProcessor ¶ added in v0.12.0
type MessageAggregatorProcessor struct {
// contains filtered or unexported fields
}
MessageAggregatorProcessor aggregates multiple rapid messages into one
func NewMessageAggregatorProcessor ¶ added in v0.12.0
func NewMessageAggregatorProcessor(logger *slog.Logger, opts MessageAggregatorProcessorOptions) *MessageAggregatorProcessor
NewMessageAggregatorProcessor creates a new MessageAggregatorProcessor
func (*MessageAggregatorProcessor) Name ¶ added in v0.12.0
func (p *MessageAggregatorProcessor) Name() string
Name returns the processor name
func (*MessageAggregatorProcessor) Order ¶ added in v0.12.0
func (p *MessageAggregatorProcessor) Order() int
Order returns the processor order
func (*MessageAggregatorProcessor) Process ¶ added in v0.12.0
func (p *MessageAggregatorProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process aggregates messages with event-type awareness
func (*MessageAggregatorProcessor) SetSender ¶ added in v0.12.0
func (p *MessageAggregatorProcessor) SetSender(sender AggregatedMessageSender)
SetSender sets the sender for flushing aggregated messages
func (*MessageAggregatorProcessor) Stop ¶ added in v0.12.0
func (p *MessageAggregatorProcessor) Stop()
Stop stops the aggregator and cleans up buffers
type MessageAggregatorProcessorOptions ¶ added in v0.12.0
type MessageAggregatorProcessorOptions struct {
Window time.Duration // Time window to wait for more messages
MinContent int // Minimum characters before sending immediately
MaxMsgs int // Maximum messages in buffer (default: maxBufferMsgs)
MaxBytes int // Maximum total bytes in buffer (default: maxBufferBytes)
}
MessageAggregatorProcessorOptions configures the aggregator
type MessageHandler ¶
type MessageHandler = base.MessageHandler
type MessageProcessor ¶ added in v0.12.0
type MessageProcessor interface {
// Process processes a message and returns the processed message
// Can return the same message pointer if no modification needed
// Can return a new message pointer if modification needed
// Can return an error to stop processing
Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
// Name returns the processor name for logging and debugging
Name() string
// Order returns the processor order in the chain (lower = earlier)
Order() int
}
MessageProcessor defines the interface for processing messages before sending
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
func NewMessageQueue ¶
func NewMessageQueue(logger *slog.Logger, maxSize, dlqSize, workers int) *MessageQueue
func (*MessageQueue) AddToDLQ ¶
func (q *MessageQueue) AddToDLQ(msg *QueuedMessage)
AddToDLQ stores a failed message to the Dead Letter Queue
func (*MessageQueue) DLQLen ¶
func (q *MessageQueue) DLQLen() int
DLQLen returns the number of messages in the Dead Letter Queue
func (*MessageQueue) Dequeue ¶
func (q *MessageQueue) Dequeue() (*QueuedMessage, bool)
func (*MessageQueue) Enqueue ¶
func (q *MessageQueue) Enqueue(platform, sessionID string, msg *ChatMessage) error
func (*MessageQueue) GetDLQ ¶
func (q *MessageQueue) GetDLQ() []*QueuedMessage
GetDLQ returns all messages in the Dead Letter Queue
func (*MessageQueue) Requeue ¶
func (q *MessageQueue) Requeue(msg *QueuedMessage) error
Requeue adds a message back to the queue for retry
func (*MessageQueue) Size ¶
func (q *MessageQueue) Size() int
func (*MessageQueue) Start ¶
func (q *MessageQueue) Start(adapterGetter func(string) (ChatAdapter, bool), sendFn func(context.Context, string, string, *ChatMessage) error)
func (*MessageQueue) Stop ¶
func (q *MessageQueue) Stop()
type PendingInteraction ¶ added in v0.12.0
type PendingInteraction struct {
ID string
SessionID string
ChannelID string
MessageTS string
ActionID string
UserID string
CallbackData string
Callback InteractionCallback
CreatedAt time.Time
ExpiresAt time.Time
Type InteractionType
ThreadTS string
Status InteractionStatus
Response *InteractionResponse
Metadata map[string]any
}
PendingInteraction represents a pending interactive action (e.g., button click).
func CreatePendingInteraction ¶ added in v0.12.0
func CreatePendingInteraction( sessionID string, userID string, channelID string, interactionType InteractionType, metadata map[string]any, ttl time.Duration, ) *PendingInteraction
CreatePendingInteraction creates a new PendingInteraction with default values.
func (*PendingInteraction) IsExpired ¶ added in v0.12.0
func (p *PendingInteraction) IsExpired() bool
IsExpired returns true if the interaction has expired.
func (*PendingInteraction) TimeUntilExpiry ¶ added in v0.12.0
func (p *PendingInteraction) TimeUntilExpiry() time.Duration
TimeUntilExpiry returns the duration until the interaction expires.
type PlatformConfig ¶
type PlatformConfig struct {
Platform string `yaml:"platform"`
SystemPrompt string `yaml:"system_prompt"`
TaskInstructions string `yaml:"task_instructions"`
Engine EngineConfig `yaml:"engine"`
Provider provider.ProviderConfig `yaml:"provider"`
Options map[string]any `yaml:"options,omitempty"`
}
type ProcessorChain ¶ added in v0.12.0
type ProcessorChain struct {
// contains filtered or unexported fields
}
ProcessorChain executes a chain of message processors
func NewDefaultProcessorChain ¶ added in v0.12.0
func NewDefaultProcessorChain(logger *slog.Logger) *ProcessorChain
NewDefaultProcessorChain creates a default processor chain with all standard processors
func NewProcessorChain ¶ added in v0.12.0
func NewProcessorChain(processors ...MessageProcessor) *ProcessorChain
NewProcessorChain creates a new processor chain with the given processors Processors are automatically sorted by Order()
func (*ProcessorChain) AddProcessor ¶ added in v0.12.0
func (c *ProcessorChain) AddProcessor(processor MessageProcessor)
AddProcessor adds a processor to the chain and re-sorts Note: This method is thread-safe but should preferably be called during initialization
func (*ProcessorChain) Process ¶ added in v0.12.0
func (c *ProcessorChain) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process executes all processors in order
func (*ProcessorChain) SetAggregatorSender ¶ added in v0.12.0
func (c *ProcessorChain) SetAggregatorSender(sender AggregatedMessageSender)
SetAggregatorSender sets the sender for MessageAggregatorProcessor if present
type ProcessorOrder ¶ added in v0.12.0
type ProcessorOrder int
ProcessorOrder defines standard processor ordering
const ( // OrderRateLimit should run first to prevent abuse OrderRateLimit ProcessorOrder = 10 // OrderThread manages thread_ts caching for message chunking OrderThread ProcessorOrder = 15 // OrderAggregation groups messages together OrderAggregation ProcessorOrder = 20 // OrderRichContent processes reactions, attachments, blocks OrderRichContent ProcessorOrder = 30 // OrderFormatConversion converts markdown to platform-specific format OrderFormatConversion ProcessorOrder = 40 // OrderChunk splits long messages for Slack API limits OrderChunk ProcessorOrder = 50 )
type QueueError ¶
type QueueError struct {
Message string
}
func (*QueueError) Error ¶
func (e *QueueError) Error() string
type QueuedMessage ¶
type RateLimitProcessor ¶ added in v0.12.0
type RateLimitProcessor struct {
// contains filtered or unexported fields
}
RateLimitProcessor implements rate limiting for message sending
func NewRateLimitProcessor ¶ added in v0.12.0
func NewRateLimitProcessor(logger *slog.Logger, opts RateLimitProcessorOptions) *RateLimitProcessor
NewRateLimitProcessor creates a new RateLimitProcessor
func (*RateLimitProcessor) Cleanup ¶ added in v0.12.0
func (p *RateLimitProcessor) Cleanup()
Cleanup removes old session rate limits
func (*RateLimitProcessor) GetSessionStats ¶ added in v0.12.0
func (p *RateLimitProcessor) GetSessionStats(platform, sessionID string) (lastSend time.Time, exists bool)
GetSessionStats returns rate limit stats for a session
func (*RateLimitProcessor) Name ¶ added in v0.12.0
func (p *RateLimitProcessor) Name() string
Name returns the processor name
func (*RateLimitProcessor) Order ¶ added in v0.12.0
func (p *RateLimitProcessor) Order() int
Order returns the processor order (should run first)
func (*RateLimitProcessor) Process ¶ added in v0.12.0
func (p *RateLimitProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process applies rate limiting to the message It will wait if necessary to enforce the minimum interval between messages
type RateLimitProcessorOptions ¶ added in v0.12.0
type RateLimitProcessorOptions struct {
MinInterval time.Duration // Minimum interval between messages
MaxBurst int // Maximum messages in burst window
BurstWindow time.Duration // Time window for burst calculation
}
RateLimitProcessorOptions configures the rate limit processor
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
func NewRateLimiter ¶
func NewRateLimiter(maxTokens float64, refillRate float64) *RateLimiter
func (*RateLimiter) Allow ¶
func (r *RateLimiter) Allow() bool
type RetryConfig ¶
type RichContent ¶
type RichContent = base.RichContent
type RichContentProcessor ¶ added in v0.12.0
type RichContentProcessor struct {
// contains filtered or unexported fields
}
RichContentProcessor processes RichContent (reactions, attachments, blocks) and converts them to platform-specific formats
func NewRichContentProcessor ¶ added in v0.12.0
func NewRichContentProcessor(logger *slog.Logger) *RichContentProcessor
NewRichContentProcessor creates a new RichContentProcessor
func (*RichContentProcessor) Name ¶ added in v0.12.0
func (p *RichContentProcessor) Name() string
Name returns the processor name
func (*RichContentProcessor) Order ¶ added in v0.12.0
func (p *RichContentProcessor) Order() int
Order returns the processor order
func (*RichContentProcessor) Process ¶ added in v0.12.0
func (p *RichContentProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process processes the message's RichContent
type SlackBlock ¶
type StreamAdapter ¶
type StreamAdapter interface {
ChatAdapter
SendStreamMessage(ctx context.Context, sessionID string, msg *ChatMessage) (StreamHandler, error)
UpdateMessage(ctx context.Context, sessionID, messageID string, msg *ChatMessage) error
}
type StreamCallback ¶
type StreamCallback struct {
// contains filtered or unexported fields
}
StreamCallback implements event.Callback to receive Engine events and forward to ChatApp
func NewStreamCallback ¶
func NewStreamCallback(ctx context.Context, sessionID, platform string, adapters *AdapterManager, logger *slog.Logger, metadata map[string]any) *StreamCallback
NewStreamCallback creates a new StreamCallback
func (*StreamCallback) Handle ¶
func (c *StreamCallback) Handle(eventType string, data any) error
Handle implements event.Callback
func (*StreamCallback) SendAggregatedMessage ¶ added in v0.12.0
func (c *StreamCallback) SendAggregatedMessage(ctx context.Context, msg *ChatMessage) error
SendAggregatedMessage implements AggregatedMessageSender interface This is called by the MessageAggregatorProcessor when timer flushes buffered messages
type StreamHandler ¶
type StreamState ¶ added in v0.12.0
type StreamState struct {
ChannelID string
MessageTS string
LastUpdated time.Time
// contains filtered or unexported fields
}
StreamState tracks the state for streaming updates
type ThreadInfo ¶ added in v0.12.0
ThreadInfo holds thread-related metadata for a session.
type ThreadProcessor ¶ added in v0.12.0
type ThreadProcessor struct {
// contains filtered or unexported fields
}
ThreadProcessor manages thread_ts caching for message chunking. It tracks the first message's timestamp for each session to associate subsequent chunked messages in the same thread.
func NewThreadProcessor ¶ added in v0.12.0
func NewThreadProcessor(logger *slog.Logger, opts ThreadProcessorOptions) *ThreadProcessor
NewThreadProcessor creates a new ThreadProcessor.
func (*ThreadProcessor) Delete ¶ added in v0.12.0
func (p *ThreadProcessor) Delete(sessionID string)
Delete removes thread info for a session.
func (*ThreadProcessor) GetThreadTS ¶ added in v0.12.0
func (p *ThreadProcessor) GetThreadTS(sessionID string) string
GetThreadTS returns the stored thread_ts for a session. Returns empty string if no thread info exists.
func (*ThreadProcessor) Name ¶ added in v0.12.0
func (p *ThreadProcessor) Name() string
Name returns the processor name.
func (*ThreadProcessor) Order ¶ added in v0.12.0
func (p *ThreadProcessor) Order() int
Order returns the processor order (runs after rate limit, before aggregation).
func (*ThreadProcessor) Process ¶ added in v0.12.0
func (p *ThreadProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)
Process manages thread_ts for the message. For the first message: stores thread_ts from metadata if present. For subsequent messages: attaches stored thread_ts to metadata.
func (*ThreadProcessor) SetThreadTS ¶ added in v0.12.0
func (p *ThreadProcessor) SetThreadTS(sessionID, threadTS, channelID string)
SetThreadTS explicitly sets the thread_ts for a session. This is useful when the first message response provides the thread_ts.
func (*ThreadProcessor) Stop ¶ added in v0.12.0
func (p *ThreadProcessor) Stop()
Stop stops the cleanup goroutine.