manager

package
v0.0.0-...-b9d6d93 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

README

TaskManager Storage Refactoring

This package contains the refactored TaskManager that uses a Storage interface for persistence instead of in-memory maps.

Overview

The TaskManager has been refactored to support multiple storage backends through the Storage interface. This allows you to choose between in-memory storage for development/testing and persistent database storage for production.

Storage Interface

The Storage interface defines methods for:

  • Messages: Store, retrieve, and delete protocol messages
  • Conversations: Manage conversation history and access tracking
  • Tasks: Store and manage cancellable tasks
  • Push Notifications: Handle push notification configurations
  • Cleanup: Manage expired conversation cleanup

Storage Implementations

1. MemoryStorage

An in-memory implementation suitable for:

  • Development and testing
  • Single-instance deployments
  • Scenarios where persistence is not required
storageOpts := DefaultStorageOptions()
manager, err := NewTaskManagerWithMemoryStorage(processor, storageOpts)
2. GormStorage

A GORM-based implementation that supports:

  • SQLite, PostgreSQL, MySQL, and other GORM-supported databases
  • Persistent storage across restarts
  • Concurrent access from multiple instances
  • Proper transaction handling
db, err := gorm.Open(sqlite.Open("taskmanager.db"), &gorm.Config{})
if err != nil {
    return err
}

storageOpts := DefaultStorageOptions()
manager, err := NewTaskManagerWithGormStorage(processor, db, storageOpts)

Key Features

Automatic Migration

The GormStorage implementation automatically creates the required database tables:

  • a2a_messages: Stores protocol messages
  • a2a_conversations: Tracks conversation history and access times
  • a2a_tasks: Stores task information (simplified, without context.CancelFunc)
  • a2a_push_notifications: Stores push notification configurations
History Management

Both storage implementations respect the MaxHistoryLength setting to limit conversation history size and automatically clean up old messages.

Concurrent Access
  • MemoryStorage uses read-write mutexes for thread safety
  • GormStorage leverages database transactions for consistency
Error Handling

All storage operations return errors that are properly propagated through the TaskManager methods.

Configuration Options

StorageOptions
type StorageOptions struct {
    MaxHistoryLength int  // Maximum number of messages per conversation
}
ManagerOptions
type ManagerOptions struct {
    EnableCleanup   bool          // Enable automatic cleanup of expired conversations
    CleanupInterval time.Duration // How often to run cleanup
    ConversationTTL time.Duration // Time after which conversations expire
}

Usage Examples

See example.go for complete examples of using both storage implementations.

Migration Notes

From Original Implementation

The original TaskManager used in-memory maps directly. When migrating:

  1. Replace taskmanager.NewMemoryTaskManager() calls with NewTaskManagerWithMemoryStorage()
  2. Add storage configuration options
  3. Handle storage-related errors in your code
Database Schema

The GORM implementation stores tasks in a simplified format since context.CancelFunc cannot be serialized. When tasks are retrieved, new cancellation contexts are created.

Performance Considerations

  • MemoryStorage: Fast read/write operations, but limited by available RAM
  • GormStorage: Slightly slower due to database I/O, but supports much larger datasets and persistence

Future Enhancements

Potential improvements could include:

  • Redis-based storage implementation
  • Distributed storage with consensus mechanisms
  • Configurable serialization formats
  • Optimized batch operations for high-throughput scenarios

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConversationHistory

type ConversationHistory struct {
	MessageIDs     []string
	LastAccessTime time.Time
}

type MemoryCancellableTask

type MemoryCancellableTask struct {
	// contains filtered or unexported fields
}

MemoryCancellableTask is a task that can be cancelled

func NewCancellableTask

func NewCancellableTask(task protocol.Task) *MemoryCancellableTask

NewCancellableTask creates a new cancellable task

func (*MemoryCancellableTask) Cancel

func (t *MemoryCancellableTask) Cancel()

Cancel cancels the task

func (*MemoryCancellableTask) Task

func (t *MemoryCancellableTask) Task() *protocol.Task

Task returns the task

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

func NewMemoryStorage

func NewMemoryStorage(options StorageOptions) *MemoryStorage

NewMemoryStorage creates a new in-memory storage implementation

func (*MemoryStorage) CleanupExpiredConversations

func (s *MemoryStorage) CleanupExpiredConversations(maxAge time.Duration) (int, error)

Cleanup operations

func (*MemoryStorage) DeleteConversation

func (s *MemoryStorage) DeleteConversation(contextID string) error

func (*MemoryStorage) DeleteMessage

func (s *MemoryStorage) DeleteMessage(messageID string) error

func (*MemoryStorage) DeletePushNotification

func (s *MemoryStorage) DeletePushNotification(taskID string) error

func (*MemoryStorage) DeleteTask

func (s *MemoryStorage) DeleteTask(taskID string) error

func (*MemoryStorage) GetConversation

func (s *MemoryStorage) GetConversation(contextID string) (*ConversationHistory, error)

func (*MemoryStorage) GetConversationStats

func (s *MemoryStorage) GetConversationStats() (map[string]interface{}, error)

func (*MemoryStorage) GetExpiredConversations

func (s *MemoryStorage) GetExpiredConversations(maxAge time.Duration) ([]string, error)

func (*MemoryStorage) GetMessage

func (s *MemoryStorage) GetMessage(messageID string) (protocol.Message, error)

func (*MemoryStorage) GetMessages

func (s *MemoryStorage) GetMessages(messageIDs []string) ([]protocol.Message, error)

func (*MemoryStorage) GetPushNotification

func (s *MemoryStorage) GetPushNotification(taskID string) (protocol.TaskPushNotificationConfig, error)

func (*MemoryStorage) GetTask

func (s *MemoryStorage) GetTask(taskID string) (*MemoryCancellableTask, error)

func (*MemoryStorage) StoreConversation

func (s *MemoryStorage) StoreConversation(contextID string, history *ConversationHistory) error

Conversation operations

func (*MemoryStorage) StoreMessage

func (s *MemoryStorage) StoreMessage(message protocol.Message) error

Message operations

func (*MemoryStorage) StorePushNotification

func (s *MemoryStorage) StorePushNotification(taskID string, config protocol.TaskPushNotificationConfig) error

Push notification operations

func (*MemoryStorage) StoreTask

func (s *MemoryStorage) StoreTask(taskID string, task *MemoryCancellableTask) error

Task operations

func (*MemoryStorage) TaskExists

func (s *MemoryStorage) TaskExists(taskID string) bool

func (*MemoryStorage) UpdateConversationAccess

func (s *MemoryStorage) UpdateConversationAccess(contextID string, timestamp time.Time) error

type Storage

type Storage interface {
	// Message operations
	StoreMessage(message protocol.Message) error
	GetMessage(messageID string) (protocol.Message, error)
	// List messages by context ID, if limit is -1, return all messages
	ListMessagesByContextID(contextID string, limit int) ([]protocol.Message, error)

	// Task operations
	StoreTask(taskID string, task *MemoryCancellableTask) error
	GetTask(taskID string) (*MemoryCancellableTask, error)
	TaskExists(taskID string) bool

	// Push notification operations
	StorePushNotification(taskID string, config protocol.TaskPushNotificationConfig) error
	GetPushNotification(taskID string) (protocol.TaskPushNotificationConfig, error)
}

Storage defines the interface for persisting task manager data

func NewStorage

func NewStorage(dbClient database.Client) Storage

type StorageOptions

type StorageOptions struct {
	MaxHistoryLength int
}

StorageOptions contains configuration options for storage implementations

type TaskManager

type TaskManager struct {

	// Processor is the user-provided message Processor
	Processor taskmanager.MessageProcessor

	// Storage handles data persistence
	Storage Storage

	// Subscribers stores the task subscribers
	// key: taskID, value: TaskSubscriber list
	// supports all event types: Message, Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent
	Subscribers map[string][]*TaskSubscriber
	// contains filtered or unexported fields
}

TaskManager is the implementation of the TaskManager interface

func NewTaskManager

func NewTaskManager(processor taskmanager.MessageProcessor, storage Storage) (*TaskManager, error)

NewTaskManager creates a new TaskManager instance

func (*TaskManager) OnCancelTask

func (m *TaskManager) OnCancelTask(ctx context.Context, params protocol.TaskIDParams) (*protocol.Task, error)

OnCancelTask handles the tasks/cancel request

func (*TaskManager) OnGetTask

func (m *TaskManager) OnGetTask(ctx context.Context, params protocol.TaskQueryParams) (*protocol.Task, error)

OnGetTask handles the tasks/get request

func (*TaskManager) OnPushNotificationGet

func (m *TaskManager) OnPushNotificationGet(
	ctx context.Context,
	params protocol.TaskIDParams,
) (*protocol.TaskPushNotificationConfig, error)

OnPushNotificationGet handles tasks/pushNotificationConfig/get requests

func (*TaskManager) OnPushNotificationSet

OnPushNotificationSet handles tasks/pushNotificationConfig/set requests

func (*TaskManager) OnResubscribe

func (m *TaskManager) OnResubscribe(
	ctx context.Context,
	params protocol.TaskIDParams,
) (<-chan protocol.StreamingMessageEvent, error)

OnResubscribe handles tasks/resubscribe requests

func (*TaskManager) OnSendMessage

func (m *TaskManager) OnSendMessage(
	ctx context.Context,
	request protocol.SendMessageParams,
) (*protocol.MessageResult, error)

OnSendMessage handles the message/tasks request

func (*TaskManager) OnSendMessageStream

func (m *TaskManager) OnSendMessageStream(
	ctx context.Context,
	request protocol.SendMessageParams,
) (<-chan protocol.StreamingMessageEvent, error)

OnSendMessageStream handles message/stream requests

func (*TaskManager) OnSendTask

func (m *TaskManager) OnSendTask(ctx context.Context, request protocol.SendTaskParams) (*protocol.Task, error)

OnSendTask deprecated method empty implementation

func (*TaskManager) OnSendTaskSubscribe

func (m *TaskManager) OnSendTaskSubscribe(ctx context.Context, request protocol.SendTaskParams) (<-chan protocol.TaskEvent, error)

OnSendTaskSubscribe deprecated method empty implementation

type TaskSubscriber

type TaskSubscriber struct {
	// contains filtered or unexported fields
}

TaskSubscriber is a subscriber for a task

func NewTaskSubscriber

func NewTaskSubscriber(taskID string, length int) *TaskSubscriber

NewTaskSubscriber creates a new task subscriber with specified buffer length

func (*TaskSubscriber) Channel

func (s *TaskSubscriber) Channel() <-chan protocol.StreamingMessageEvent

Channel returns the channel of the task subscriber

func (*TaskSubscriber) Close

func (s *TaskSubscriber) Close()

Close closes the task subscriber

func (*TaskSubscriber) Closed

func (s *TaskSubscriber) Closed() bool

Closed returns true if the task subscriber is closed

func (*TaskSubscriber) GetLastAccessTime

func (s *TaskSubscriber) GetLastAccessTime() time.Time

GetLastAccessTime returns the last access time

func (*TaskSubscriber) Send

Send sends an event to the task subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL