chatapps

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ParseModeNone     = base.ParseModeNone
	ParseModeMarkdown = base.ParseModeMarkdown
	ParseModeHTML     = base.ParseModeHTML
)

Variables

View Source
var (
	// MessagesAggregatedTotal counts total messages aggregated
	MessagesAggregatedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_aggregated_total",
			Help: "Total number of messages that have been aggregated",
		},
		[]string{"event_type", "platform"},
	)

	// MessagesFlushedTotal counts total flushes by reason
	MessagesFlushedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_flushed_total",
			Help: "Total number of times the buffer was flushed",
		},
		[]string{"event_type", "platform", "reason"},
	)

	// MessagesDroppedTotal counts dropped messages
	MessagesDroppedTotal = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "hotplex_aggregator_messages_dropped_total",
			Help: "Total number of messages dropped due to buffer overflow",
		},
		[]string{"event_type", "platform", "reason"},
	)

	// BufferSizeGauge tracks current buffer size per platform
	BufferSizeGauge = promauto.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "hotplex_aggregator_buffer_size",
			Help: "Current number of messages in the buffer per platform",
		},
		[]string{"platform"},
	)

	// BufferDurationHistogram tracks time from first message to flush
	BufferDurationHistogram = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "hotplex_aggregator_buffer_duration_seconds",
			Help:    "Time in seconds from first message arrival to buffer flush",
			Buckets: []float64{0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0},
		},
		[]string{"platform"},
	)

	// MessageSizeHistogram tracks message size distribution
	MessageSizeHistogram = promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "hotplex_aggregator_message_size_bytes",
			Help:    "Size distribution of aggregated messages in bytes",
			Buckets: []float64{64, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768},
		},
		[]string{"event_type", "platform"},
	)
)
View Source
var ErrQueueFull = &QueueError{Message: "queue is full"}

Functions

func GenerateInteractionID added in v0.12.0

func GenerateInteractionID(sessionID string, timestamp time.Time) string

GenerateInteractionID generates a unique interaction ID.

func GetExtraChunks added in v0.12.0

func GetExtraChunks(msg *base.ChatMessage) []string

GetExtraChunks returns any extra chunks stored in message metadata. Returns nil if no extra chunks exist.

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError classifies errors as retryable or non-retryable

func RetryWithBackoff

func RetryWithBackoff(ctx context.Context, config RetryConfig, fn func() error) error

Types

type AdapterManager

type AdapterManager struct {
	// contains filtered or unexported fields
}

func NewAdapterManager

func NewAdapterManager(logger *slog.Logger) *AdapterManager

func Setup added in v0.11.0

func Setup(ctx context.Context, logger *slog.Logger) (http.Handler, *AdapterManager, error)

Setup initializes all enabled ChatApps and their dedicated Engines. It returns an http.Handler that handles all webhook routes.

func (*AdapterManager) GetAdapter

func (m *AdapterManager) GetAdapter(platform string) (ChatAdapter, bool)

func (*AdapterManager) Handler added in v0.11.0

func (m *AdapterManager) Handler() http.Handler

Handler returns an http.Handler with all adapter webhooks mounted This is a convenience method when you don't need gorilla/mux

func (*AdapterManager) ListPlatforms

func (m *AdapterManager) ListPlatforms() []string

func (*AdapterManager) Register

func (m *AdapterManager) Register(adapter ChatAdapter) error

func (*AdapterManager) RegisterEngine added in v0.11.0

func (m *AdapterManager) RegisterEngine(eng *engine.Engine)

func (*AdapterManager) RegisterRoutes added in v0.11.0

func (m *AdapterManager) RegisterRoutes(router *mux.Router)

RegisterRoutes registers all adapter webhooks to a unified router Path format: /webhook/{platform} (e.g., /webhook/telegram, /webhook/discord)

func (*AdapterManager) SendMessage

func (m *AdapterManager) SendMessage(ctx context.Context, platform, sessionID string, msg *ChatMessage) error

SendMessage sends a message to a specific platform

func (*AdapterManager) StartAll

func (m *AdapterManager) StartAll(ctx context.Context) error

func (*AdapterManager) StopAll

func (m *AdapterManager) StopAll() error

func (*AdapterManager) Unregister

func (m *AdapterManager) Unregister(platform string) error

type AdapterMetrics

type AdapterMetrics struct {
	MessagesReceived atomic.Int64
	MessagesSent     atomic.Int64
	MessagesFailed   atomic.Int64
	LastMessageAt    atomic.Int64
	Uptime           atomic.Int64
}

func NewAdapterMetrics

func NewAdapterMetrics() *AdapterMetrics

func (*AdapterMetrics) GetStats

func (m *AdapterMetrics) GetStats() map[string]int64

func (*AdapterMetrics) RecordFailure

func (m *AdapterMetrics) RecordFailure()

func (*AdapterMetrics) RecordReceive

func (m *AdapterMetrics) RecordReceive()

func (*AdapterMetrics) RecordSend

func (m *AdapterMetrics) RecordSend()

type AggregatedMessageSender added in v0.12.0

type AggregatedMessageSender interface {
	SendAggregatedMessage(ctx context.Context, msg *base.ChatMessage) error
}

AggregatedMessageSender sends aggregated messages

type Attachment

type Attachment = base.Attachment

type ChatAdapter

type ChatAdapter = base.ChatAdapter

type ChatMessage

type ChatMessage = base.ChatMessage

func NewChatMessage added in v0.11.0

func NewChatMessage(platform, sessionID, userID, content string) *ChatMessage

type ChunkInfo added in v0.12.0

type ChunkInfo struct {
	TotalChunks  int
	CurrentChunk int
	ThreadTS     string
	ChannelID    string
}

ChunkInfo holds metadata about chunked messages.

func GetChunkInfo added in v0.12.0

func GetChunkInfo(msg *base.ChatMessage) *ChunkInfo

GetChunkInfo returns the ChunkInfo from message metadata.

type ChunkProcessor added in v0.12.0

type ChunkProcessor struct {
	// contains filtered or unexported fields
}

ChunkProcessor splits long messages into chunks that fit within platform limits. It uses the Slack chunker for Markdown-aware splitting.

func NewChunkProcessor added in v0.12.0

func NewChunkProcessor(logger *slog.Logger, opts ChunkProcessorOptions) *ChunkProcessor

NewChunkProcessor creates a new ChunkProcessor.

func (*ChunkProcessor) Name added in v0.12.0

func (p *ChunkProcessor) Name() string

Name returns the processor name.

func (*ChunkProcessor) Order added in v0.12.0

func (p *ChunkProcessor) Order() int

Order returns the processor order (runs after format conversion).

func (*ChunkProcessor) Process added in v0.12.0

func (p *ChunkProcessor) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

Process splits the message into chunks if it exceeds the character limit. Returns either a single message or a slice of messages.

type ChunkProcessorOptions added in v0.12.0

type ChunkProcessorOptions struct {
	MaxChars int // Maximum characters per chunk (default: 4000 for Slack)
}

ChunkProcessorOptions configures the ChunkProcessor.

type ConfigLoader

type ConfigLoader struct {
	// contains filtered or unexported fields
}

func NewConfigLoader

func NewConfigLoader(configDir string, logger *slog.Logger) (*ConfigLoader, error)

func (*ConfigLoader) Close added in v0.11.0

func (c *ConfigLoader) Close() error

Close stops all hot reload watchers and releases resources.

func (*ConfigLoader) GetConfig

func (c *ConfigLoader) GetConfig(platform string) *PlatformConfig

func (*ConfigLoader) GetOptions added in v0.11.0

func (c *ConfigLoader) GetOptions(platform string) map[string]any

func (*ConfigLoader) GetSystemPrompt

func (c *ConfigLoader) GetSystemPrompt(platform string) string

func (*ConfigLoader) GetTaskInstructions

func (c *ConfigLoader) GetTaskInstructions(platform string) string

func (*ConfigLoader) HasPlatform

func (c *ConfigLoader) HasPlatform(platform string) bool

func (*ConfigLoader) Load

func (c *ConfigLoader) Load(configDir string) error

func (*ConfigLoader) Platforms

func (c *ConfigLoader) Platforms() []string

func (*ConfigLoader) StartHotReload added in v0.11.0

func (c *ConfigLoader) StartHotReload(ctx context.Context, configDir string, onReload func(platform string, cfg *PlatformConfig)) error

StartHotReload starts watching all config files for changes and automatically reloads them. The onReload callback is called with the updated PlatformConfig for each platform.

type DiscordEmbed

type DiscordEmbed struct {
	Title       string                 `json:"title,omitempty"`
	Description string                 `json:"description,omitempty"`
	URL         string                 `json:"url,omitempty"`
	Color       int                    `json:"color,omitempty"`
	Fields      []DiscordEmbedField    `json:"fields,omitempty"`
	Footer      *DiscordEmbedFooter    `json:"footer,omitempty"`
	Thumbnail   *DiscordEmbedThumbnail `json:"thumbnail,omitempty"`
	Image       *DiscordEmbedImage     `json:"image,omitempty"`
	Timestamp   string                 `json:"timestamp,omitempty"`
}

type DiscordEmbedField

type DiscordEmbedField struct {
	Name   string `json:"name"`
	Value  string `json:"value"`
	Inline bool   `json:"inline,omitempty"`
}

type DiscordEmbedFooter

type DiscordEmbedFooter struct {
	Text    string `json:"text"`
	IconURL string `json:"icon_url,omitempty"`
}

type DiscordEmbedImage

type DiscordEmbedImage struct {
	URL string `json:"url"`
}

type DiscordEmbedThumbnail

type DiscordEmbedThumbnail struct {
	URL string `json:"url"`
}

type EngineConfig added in v0.11.0

type EngineConfig struct {
	Timeout     time.Duration `yaml:"timeout"`
	IdleTimeout time.Duration `yaml:"idle_timeout"`
	WorkDir     string        `yaml:"work_dir"`
}

type EngineHolder

type EngineHolder struct {
	// contains filtered or unexported fields
}

EngineHolder holds the Engine instance and configuration for ChatApps integration

func NewEngineHolder

func NewEngineHolder(opts EngineHolderOptions) (*EngineHolder, error)

NewEngineHolder creates a new EngineHolder with the given options

func (*EngineHolder) GetAdapterManager

func (h *EngineHolder) GetAdapterManager() *AdapterManager

GetAdapterManager returns the AdapterManager for sending messages

func (*EngineHolder) GetEngine

func (h *EngineHolder) GetEngine() *engine.Engine

GetEngine returns the underlying Engine instance

type EngineHolderOptions

type EngineHolderOptions struct {
	Logger           *slog.Logger
	Adapters         *AdapterManager
	Timeout          time.Duration
	IdleTimeout      time.Duration
	Namespace        string
	PermissionMode   string
	AllowedTools     []string
	DisallowedTools  []string
	DefaultWorkDir   string
	DefaultTaskInstr string
}

EngineHolderOptions configures the EngineHolder

type EngineMessageHandler

type EngineMessageHandler struct {
	// contains filtered or unexported fields
}

EngineMessageHandler implements MessageHandler and integrates with Engine

func NewEngineMessageHandler

func NewEngineMessageHandler(engine *engine.Engine, adapters *AdapterManager, opts ...EngineMessageHandlerOption) *EngineMessageHandler

NewEngineMessageHandler creates a new EngineMessageHandler

func (*EngineMessageHandler) Handle

func (h *EngineMessageHandler) Handle(ctx context.Context, msg *ChatMessage) error

Handle implements EngineMessageHandler

type EngineMessageHandlerOption

type EngineMessageHandlerOption func(*EngineMessageHandler)

EngineMessageHandlerOption configures the EngineMessageHandler

func WithConfigLoader

func WithConfigLoader(loader *ConfigLoader) EngineMessageHandlerOption

func WithLogger

func WithLogger(logger *slog.Logger) EngineMessageHandlerOption

func WithTaskInstrFn

func WithTaskInstrFn(fn func(sessionID string) string) EngineMessageHandlerOption

func WithWorkDirFn

func WithWorkDirFn(fn func(sessionID string) string) EngineMessageHandlerOption

type EventConfig added in v0.12.0

type EventConfig struct {
	Aggregate    bool // Whether to aggregate messages of this type
	SameTypeOnly bool // Only aggregate with same event type
	Immediate    bool // Send immediately, skip aggregation
	UseUpdate    bool // Use chat.update for streaming updates
}

EventConfig defines aggregation behavior for specific event types

type FormatConversionProcessor added in v0.12.0

type FormatConversionProcessor struct {
	// contains filtered or unexported fields
}

FormatConversionProcessor converts message content to platform-specific formats

func NewFormatConversionProcessor added in v0.12.0

func NewFormatConversionProcessor(logger *slog.Logger) *FormatConversionProcessor

NewFormatConversionProcessor creates a new FormatConversionProcessor

func (*FormatConversionProcessor) Name added in v0.12.0

Name returns the processor name

func (*FormatConversionProcessor) Order added in v0.12.0

func (p *FormatConversionProcessor) Order() int

Order returns the processor order

func (*FormatConversionProcessor) Process added in v0.12.0

Process converts message content based on platform

type HealthCheck

type HealthCheck struct {
	Name      string
	Status    string
	LastCheck time.Time
	LastError string
	Interval  time.Duration
	CheckFunc func() error
}

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

func NewHealthChecker

func NewHealthChecker(interval time.Duration) *HealthChecker

func (*HealthChecker) GetStatus

func (h *HealthChecker) GetStatus() map[string]HealthCheck

func (*HealthChecker) Register

func (h *HealthChecker) Register(check HealthCheck)

func (*HealthChecker) Start

func (h *HealthChecker) Start()

func (*HealthChecker) Stop

func (h *HealthChecker) Stop()

type InlineKeyboardButton

type InlineKeyboardButton struct {
	Text         string `json:"text"`
	URL          string `json:"url,omitempty"`
	CallbackData string `json:"callback_data,omitempty"`
}

type InlineKeyboardMarkup

type InlineKeyboardMarkup struct {
	InlineKeyboard [][]InlineKeyboardButton `json:"inline_keyboard"`
}

type InteractionCallback added in v0.12.0

type InteractionCallback func(interaction *PendingInteraction) error

InteractionCallback is a function that handles an interaction callback.

type InteractionManager added in v0.12.0

type InteractionManager struct {
	// contains filtered or unexported fields
}

InteractionManager manages pending interactions for Slack interactive components.

func NewInteractionManager added in v0.12.0

func NewInteractionManager(logger *slog.Logger, opts InteractionManagerOptions) *InteractionManager

NewInteractionManager creates a new InteractionManager.

func (*InteractionManager) Complete added in v0.12.0

func (m *InteractionManager) Complete(id string, response *InteractionResponse) error

Complete marks an interaction as completed with a response.

func (*InteractionManager) Count added in v0.12.0

func (m *InteractionManager) Count() int

Count returns the number of pending interactions.

func (*InteractionManager) Delete added in v0.12.0

func (m *InteractionManager) Delete(id string)

Delete removes a pending interaction.

func (*InteractionManager) Expire added in v0.12.0

func (m *InteractionManager) Expire(id string) error

Expire marks an interaction as expired.

func (*InteractionManager) Get added in v0.12.0

Get retrieves a pending interaction by ID. Returns nil if not found or expired.

func (*InteractionManager) GetBySession added in v0.12.0

func (m *InteractionManager) GetBySession(sessionID string) []*PendingInteraction

GetBySession retrieves all pending interactions for a session.

func (*InteractionManager) HandleCallback added in v0.12.0

func (m *InteractionManager) HandleCallback(interactionID, userID, actionID, callbackData string) error

HandleCallback processes an interaction callback. It looks up the interaction, calls the callback, and removes the interaction.

func (*InteractionManager) PendingCount added in v0.12.0

func (m *InteractionManager) PendingCount() int

PendingCount returns the number of pending (non-expired) interactions.

func (*InteractionManager) Stop added in v0.12.0

func (m *InteractionManager) Stop()

Stop stops the cleanup goroutine.

func (*InteractionManager) Store added in v0.12.0

func (m *InteractionManager) Store(interaction *PendingInteraction) string

Store adds a new pending interaction and returns its ID.

func (*InteractionManager) TotalCount added in v0.12.0

func (m *InteractionManager) TotalCount() int

TotalCount returns the total number of interactions (including expired).

type InteractionManagerOptions added in v0.12.0

type InteractionManagerOptions struct {
	CleanupInterval time.Duration // How often to run cleanup (default: 1 min)
	TTL             time.Duration // How long to keep pending interactions (default: 10 min)
}

InteractionManagerOptions configures the InteractionManager.

type InteractionResponse added in v0.12.0

type InteractionResponse struct {
	ActionID    string    `json:"action_id"`
	Value       string    `json:"value"`
	UserID      string    `json:"user_id"`
	RespondedAt time.Time `json:"responded_at"`
}

InteractionResponse represents the user's response to an interaction.

type InteractionStatus added in v0.12.0

type InteractionStatus string

InteractionStatus is the status of a pending interaction.

const (
	InteractionStatusPending   InteractionStatus = "pending"
	InteractionStatusCompleted InteractionStatus = "completed"
	InteractionStatusExpired   InteractionStatus = "expired"
	InteractionStatusCancelled InteractionStatus = "cancelled"
)

type InteractionType added in v0.12.0

type InteractionType string

InteractionType defines the type of interactive message.

const (
	// InteractionTypePermission is for Claude Code permission requests (Issue #39).
	InteractionTypePermission InteractionType = "permission"
	// InteractionTypeApproval is for general approval requests (Issue #37).
	InteractionTypeApproval InteractionType = "approval"
	// InteractionTypeSelection is for selection/choice requests.
	InteractionTypeSelection InteractionType = "selection"
)

type LifecycleManager

type LifecycleManager struct {
	// contains filtered or unexported fields
}

func NewLifecycleManager

func NewLifecycleManager() *LifecycleManager

func (*LifecycleManager) OnStart

func (m *LifecycleManager) OnStart(hook func(ChatAdapter) error)

func (*LifecycleManager) OnStop

func (m *LifecycleManager) OnStop(hook func(ChatAdapter) error)

func (*LifecycleManager) RegisterAdapter

func (m *LifecycleManager) RegisterAdapter(adapter ChatAdapter, startPriority int)

func (*LifecycleManager) StartAll

func (m *LifecycleManager) StartAll(ctx context.Context) error

func (*LifecycleManager) StopAll

func (m *LifecycleManager) StopAll() error

type Logger

type Logger = slog.Logger

type MessageAggregatorProcessor added in v0.12.0

type MessageAggregatorProcessor struct {
	// contains filtered or unexported fields
}

MessageAggregatorProcessor aggregates multiple rapid messages into one

func NewMessageAggregatorProcessor added in v0.12.0

func NewMessageAggregatorProcessor(logger *slog.Logger, opts MessageAggregatorProcessorOptions) *MessageAggregatorProcessor

NewMessageAggregatorProcessor creates a new MessageAggregatorProcessor

func (*MessageAggregatorProcessor) Name added in v0.12.0

Name returns the processor name

func (*MessageAggregatorProcessor) Order added in v0.12.0

func (p *MessageAggregatorProcessor) Order() int

Order returns the processor order

func (*MessageAggregatorProcessor) Process added in v0.12.0

Process aggregates messages with event-type awareness

func (*MessageAggregatorProcessor) SetSender added in v0.12.0

SetSender sets the sender for flushing aggregated messages

func (*MessageAggregatorProcessor) Stop added in v0.12.0

func (p *MessageAggregatorProcessor) Stop()

Stop stops the aggregator and cleans up buffers

type MessageAggregatorProcessorOptions added in v0.12.0

type MessageAggregatorProcessorOptions struct {
	Window     time.Duration // Time window to wait for more messages
	MinContent int           // Minimum characters before sending immediately
	MaxMsgs    int           // Maximum messages in buffer (default: maxBufferMsgs)
	MaxBytes   int           // Maximum total bytes in buffer (default: maxBufferBytes)
}

MessageAggregatorProcessorOptions configures the aggregator

type MessageHandler

type MessageHandler = base.MessageHandler

type MessageProcessor added in v0.12.0

type MessageProcessor interface {
	// Process processes a message and returns the processed message
	// Can return the same message pointer if no modification needed
	// Can return a new message pointer if modification needed
	// Can return an error to stop processing
	Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

	// Name returns the processor name for logging and debugging
	Name() string

	// Order returns the processor order in the chain (lower = earlier)
	Order() int
}

MessageProcessor defines the interface for processing messages before sending

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

func NewMessageQueue

func NewMessageQueue(logger *slog.Logger, maxSize, dlqSize, workers int) *MessageQueue

func (*MessageQueue) AddToDLQ

func (q *MessageQueue) AddToDLQ(msg *QueuedMessage)

AddToDLQ stores a failed message to the Dead Letter Queue

func (*MessageQueue) DLQLen

func (q *MessageQueue) DLQLen() int

DLQLen returns the number of messages in the Dead Letter Queue

func (*MessageQueue) Dequeue

func (q *MessageQueue) Dequeue() (*QueuedMessage, bool)

func (*MessageQueue) Enqueue

func (q *MessageQueue) Enqueue(platform, sessionID string, msg *ChatMessage) error

func (*MessageQueue) GetDLQ

func (q *MessageQueue) GetDLQ() []*QueuedMessage

GetDLQ returns all messages in the Dead Letter Queue

func (*MessageQueue) Requeue

func (q *MessageQueue) Requeue(msg *QueuedMessage) error

Requeue adds a message back to the queue for retry

func (*MessageQueue) Size

func (q *MessageQueue) Size() int

func (*MessageQueue) Start

func (q *MessageQueue) Start(adapterGetter func(string) (ChatAdapter, bool), sendFn func(context.Context, string, string, *ChatMessage) error)

func (*MessageQueue) Stop

func (q *MessageQueue) Stop()

type ParseMode

type ParseMode = base.ParseMode

type PendingInteraction added in v0.12.0

type PendingInteraction struct {
	ID           string
	SessionID    string
	ChannelID    string
	MessageTS    string
	ActionID     string
	UserID       string
	CallbackData string
	Callback     InteractionCallback
	CreatedAt    time.Time
	ExpiresAt    time.Time
	Type         InteractionType
	ThreadTS     string
	Status       InteractionStatus
	Response     *InteractionResponse
	Metadata     map[string]any
}

PendingInteraction represents a pending interactive action (e.g., button click).

func CreatePendingInteraction added in v0.12.0

func CreatePendingInteraction(
	sessionID string,
	userID string,
	channelID string,
	interactionType InteractionType,
	metadata map[string]any,
	ttl time.Duration,
) *PendingInteraction

CreatePendingInteraction creates a new PendingInteraction with default values.

func (*PendingInteraction) IsExpired added in v0.12.0

func (p *PendingInteraction) IsExpired() bool

IsExpired returns true if the interaction has expired.

func (*PendingInteraction) TimeUntilExpiry added in v0.12.0

func (p *PendingInteraction) TimeUntilExpiry() time.Duration

TimeUntilExpiry returns the duration until the interaction expires.

type PlatformConfig

type PlatformConfig struct {
	Platform         string                  `yaml:"platform"`
	SystemPrompt     string                  `yaml:"system_prompt"`
	TaskInstructions string                  `yaml:"task_instructions"`
	Engine           EngineConfig            `yaml:"engine"`
	Provider         provider.ProviderConfig `yaml:"provider"`
	Options          map[string]any          `yaml:"options,omitempty"`
}

type ProcessorChain added in v0.12.0

type ProcessorChain struct {
	// contains filtered or unexported fields
}

ProcessorChain executes a chain of message processors

func NewDefaultProcessorChain added in v0.12.0

func NewDefaultProcessorChain(logger *slog.Logger) *ProcessorChain

NewDefaultProcessorChain creates a default processor chain with all standard processors

func NewProcessorChain added in v0.12.0

func NewProcessorChain(processors ...MessageProcessor) *ProcessorChain

NewProcessorChain creates a new processor chain with the given processors Processors are automatically sorted by Order()

func (*ProcessorChain) AddProcessor added in v0.12.0

func (c *ProcessorChain) AddProcessor(processor MessageProcessor)

AddProcessor adds a processor to the chain and re-sorts Note: This method is thread-safe but should preferably be called during initialization

func (*ProcessorChain) Process added in v0.12.0

func (c *ProcessorChain) Process(ctx context.Context, msg *base.ChatMessage) (*base.ChatMessage, error)

Process executes all processors in order

func (*ProcessorChain) SetAggregatorSender added in v0.12.0

func (c *ProcessorChain) SetAggregatorSender(sender AggregatedMessageSender)

SetAggregatorSender sets the sender for MessageAggregatorProcessor if present

type ProcessorOrder added in v0.12.0

type ProcessorOrder int

ProcessorOrder defines standard processor ordering

const (
	// OrderRateLimit should run first to prevent abuse
	OrderRateLimit ProcessorOrder = 10
	// OrderThread manages thread_ts caching for message chunking
	OrderThread ProcessorOrder = 15
	// OrderAggregation groups messages together
	OrderAggregation ProcessorOrder = 20
	// OrderRichContent processes reactions, attachments, blocks
	OrderRichContent ProcessorOrder = 30
	// OrderFormatConversion converts markdown to platform-specific format
	OrderFormatConversion ProcessorOrder = 40
	// OrderChunk splits long messages for Slack API limits
	OrderChunk ProcessorOrder = 50
)

type QueueError

type QueueError struct {
	Message string
}

func (*QueueError) Error

func (e *QueueError) Error() string

type QueuedMessage

type QueuedMessage struct {
	Platform  string
	SessionID string
	Message   *ChatMessage
	Retries   int
	CreatedAt time.Time
}

type RateLimitProcessor added in v0.12.0

type RateLimitProcessor struct {
	// contains filtered or unexported fields
}

RateLimitProcessor implements rate limiting for message sending

func NewRateLimitProcessor added in v0.12.0

func NewRateLimitProcessor(logger *slog.Logger, opts RateLimitProcessorOptions) *RateLimitProcessor

NewRateLimitProcessor creates a new RateLimitProcessor

func (*RateLimitProcessor) Cleanup added in v0.12.0

func (p *RateLimitProcessor) Cleanup()

Cleanup removes old session rate limits

func (*RateLimitProcessor) GetSessionStats added in v0.12.0

func (p *RateLimitProcessor) GetSessionStats(platform, sessionID string) (lastSend time.Time, exists bool)

GetSessionStats returns rate limit stats for a session

func (*RateLimitProcessor) Name added in v0.12.0

func (p *RateLimitProcessor) Name() string

Name returns the processor name

func (*RateLimitProcessor) Order added in v0.12.0

func (p *RateLimitProcessor) Order() int

Order returns the processor order (should run first)

func (*RateLimitProcessor) Process added in v0.12.0

Process applies rate limiting to the message It will wait if necessary to enforce the minimum interval between messages

type RateLimitProcessorOptions added in v0.12.0

type RateLimitProcessorOptions struct {
	MinInterval time.Duration // Minimum interval between messages
	MaxBurst    int           // Maximum messages in burst window
	BurstWindow time.Duration // Time window for burst calculation
}

RateLimitProcessorOptions configures the rate limit processor

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

func NewRateLimiter

func NewRateLimiter(maxTokens float64, refillRate float64) *RateLimiter

func (*RateLimiter) Allow

func (r *RateLimiter) Allow() bool

func (*RateLimiter) Wait

func (r *RateLimiter) Wait(ctx context.Context) error

type RetryConfig

type RetryConfig struct {
	MaxAttempts int
	BaseDelay   time.Duration
	MaxDelay    time.Duration
}

type RichContent

type RichContent = base.RichContent

type RichContentProcessor added in v0.12.0

type RichContentProcessor struct {
	// contains filtered or unexported fields
}

RichContentProcessor processes RichContent (reactions, attachments, blocks) and converts them to platform-specific formats

func NewRichContentProcessor added in v0.12.0

func NewRichContentProcessor(logger *slog.Logger) *RichContentProcessor

NewRichContentProcessor creates a new RichContentProcessor

func (*RichContentProcessor) Name added in v0.12.0

func (p *RichContentProcessor) Name() string

Name returns the processor name

func (*RichContentProcessor) Order added in v0.12.0

func (p *RichContentProcessor) Order() int

Order returns the processor order

func (*RichContentProcessor) Process added in v0.12.0

Process processes the message's RichContent

type SlackBlock

type SlackBlock map[string]any

type StreamAdapter

type StreamAdapter interface {
	ChatAdapter
	SendStreamMessage(ctx context.Context, sessionID string, msg *ChatMessage) (StreamHandler, error)
	UpdateMessage(ctx context.Context, sessionID, messageID string, msg *ChatMessage) error
}

type StreamCallback

type StreamCallback struct {
	// contains filtered or unexported fields
}

StreamCallback implements event.Callback to receive Engine events and forward to ChatApp

func NewStreamCallback

func NewStreamCallback(ctx context.Context, sessionID, platform string, adapters *AdapterManager, logger *slog.Logger, metadata map[string]any) *StreamCallback

NewStreamCallback creates a new StreamCallback

func (*StreamCallback) Handle

func (c *StreamCallback) Handle(eventType string, data any) error

Handle implements event.Callback

func (*StreamCallback) SendAggregatedMessage added in v0.12.0

func (c *StreamCallback) SendAggregatedMessage(ctx context.Context, msg *ChatMessage) error

SendAggregatedMessage implements AggregatedMessageSender interface This is called by the MessageAggregatorProcessor when timer flushes buffered messages

type StreamHandler

type StreamHandler func(ctx context.Context, sessionID string, chunk string, isFinal bool) error

type StreamState added in v0.12.0

type StreamState struct {
	ChannelID   string
	MessageTS   string
	LastUpdated time.Time
	// contains filtered or unexported fields
}

StreamState tracks the state for streaming updates

type ThreadInfo added in v0.12.0

type ThreadInfo struct {
	ThreadTS     string
	ChannelID    string
	LastActivity time.Time
}

ThreadInfo holds thread-related metadata for a session.

type ThreadProcessor added in v0.12.0

type ThreadProcessor struct {
	// contains filtered or unexported fields
}

ThreadProcessor manages thread_ts caching for message chunking. It tracks the first message's timestamp for each session to associate subsequent chunked messages in the same thread.

func NewThreadProcessor added in v0.12.0

func NewThreadProcessor(logger *slog.Logger, opts ThreadProcessorOptions) *ThreadProcessor

NewThreadProcessor creates a new ThreadProcessor.

func (*ThreadProcessor) Delete added in v0.12.0

func (p *ThreadProcessor) Delete(sessionID string)

Delete removes thread info for a session.

func (*ThreadProcessor) GetThreadTS added in v0.12.0

func (p *ThreadProcessor) GetThreadTS(sessionID string) string

GetThreadTS returns the stored thread_ts for a session. Returns empty string if no thread info exists.

func (*ThreadProcessor) Name added in v0.12.0

func (p *ThreadProcessor) Name() string

Name returns the processor name.

func (*ThreadProcessor) Order added in v0.12.0

func (p *ThreadProcessor) Order() int

Order returns the processor order (runs after rate limit, before aggregation).

func (*ThreadProcessor) Process added in v0.12.0

Process manages thread_ts for the message. For the first message: stores thread_ts from metadata if present. For subsequent messages: attaches stored thread_ts to metadata.

func (*ThreadProcessor) SetThreadTS added in v0.12.0

func (p *ThreadProcessor) SetThreadTS(sessionID, threadTS, channelID string)

SetThreadTS explicitly sets the thread_ts for a session. This is useful when the first message response provides the thread_ts.

func (*ThreadProcessor) Stop added in v0.12.0

func (p *ThreadProcessor) Stop()

Stop stops the cleanup goroutine.

type ThreadProcessorOptions added in v0.12.0

type ThreadProcessorOptions struct {
	CleanupInterval time.Duration // How often to run cleanup (default: 5 min)
	TTL             time.Duration // How long to keep thread info (default: 30 min)
}

ThreadProcessorOptions configures the ThreadProcessor.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL