Documentation
¶
Overview ¶
Package db provides database access for MCP Notify.
Package db provides database access for MCP Notify.
Package db provides database access for MCP Notify.
Index ¶
- type Cache
- type Database
- type NullCache
- func (c *NullCache) Close() error
- func (c *NullCache) Delete(ctx context.Context, key string) error
- func (c *NullCache) Get(ctx context.Context, key string) ([]byte, error)
- func (c *NullCache) GetCachedSnapshot(ctx context.Context) (*types.Snapshot, error)
- func (c *NullCache) IncrementRateLimit(ctx context.Context, key string, window time.Duration) (int64, error)
- func (c *NullCache) Ping(ctx context.Context) error
- func (c *NullCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
- func (c *NullCache) SetCachedSnapshot(ctx context.Context, snapshot *types.Snapshot, ttl time.Duration) error
- type PostgresDB
- func (db *PostgresDB) Close() error
- func (db *PostgresDB) CreateChannel(ctx context.Context, channel *types.Channel) error
- func (db *PostgresDB) CreateSubscription(ctx context.Context, sub *types.Subscription) error
- func (db *PostgresDB) DeleteChannel(ctx context.Context, id uuid.UUID) error
- func (db *PostgresDB) DeleteOldSnapshots(ctx context.Context, olderThan time.Time) error
- func (db *PostgresDB) DeleteSubscription(ctx context.Context, id uuid.UUID) error
- func (db *PostgresDB) GetActiveSubscriptions(ctx context.Context) ([]types.Subscription, error)
- func (db *PostgresDB) GetChangeByID(ctx context.Context, id uuid.UUID) (*types.Change, error)
- func (db *PostgresDB) GetChangeCountSince(ctx context.Context, since time.Time) (int, error)
- func (db *PostgresDB) GetChangesForServer(ctx context.Context, serverName string, limit int) ([]types.Change, error)
- func (db *PostgresDB) GetChangesSince(ctx context.Context, since time.Time, limit int) ([]types.Change, error)
- func (db *PostgresDB) GetChannelByID(ctx context.Context, id uuid.UUID) (*types.Channel, error)
- func (db *PostgresDB) GetChannelsForSubscription(ctx context.Context, subscriptionID uuid.UUID) ([]types.Channel, error)
- func (db *PostgresDB) GetLatestSnapshot(ctx context.Context) (*types.Snapshot, error)
- func (db *PostgresDB) GetNotificationsForSubscription(ctx context.Context, subscriptionID uuid.UUID, limit int) ([]types.Notification, error)
- func (db *PostgresDB) GetPendingNotifications(ctx context.Context, limit int) ([]types.Notification, error)
- func (db *PostgresDB) GetSnapshotAt(ctx context.Context, timestamp time.Time) (*types.Snapshot, error)
- func (db *PostgresDB) GetSnapshotByID(ctx context.Context, id uuid.UUID) (*types.Snapshot, error)
- func (db *PostgresDB) GetStats(ctx context.Context) (*types.StatsResponse, error)
- func (db *PostgresDB) GetSubscriptionByAPIKey(ctx context.Context, apiKeyHash string) (*types.Subscription, error)
- func (db *PostgresDB) GetSubscriptionByID(ctx context.Context, id uuid.UUID) (*types.Subscription, error)
- func (db *PostgresDB) ListSubscriptions(ctx context.Context, limit, offset int) ([]types.Subscription, int, error)
- func (db *PostgresDB) Migrate(ctx context.Context) error
- func (db *PostgresDB) Ping(ctx context.Context) error
- func (db *PostgresDB) SaveChange(ctx context.Context, change *types.Change) error
- func (db *PostgresDB) SaveNotification(ctx context.Context, notification *types.Notification) error
- func (db *PostgresDB) SaveSnapshot(ctx context.Context, snapshot *types.Snapshot) error
- func (db *PostgresDB) UpdateChannel(ctx context.Context, channel *types.Channel) error
- func (db *PostgresDB) UpdateNotification(ctx context.Context, notification *types.Notification) error
- func (db *PostgresDB) UpdateSubscription(ctx context.Context, sub *types.Subscription) error
- type RedisCache
- func (c *RedisCache) Close() error
- func (c *RedisCache) Delete(ctx context.Context, key string) error
- func (c *RedisCache) Exists(ctx context.Context, key string) (bool, error)
- func (c *RedisCache) Expire(ctx context.Context, key string, ttl time.Duration) error
- func (c *RedisCache) FlushAll(ctx context.Context) error
- func (c *RedisCache) Get(ctx context.Context, key string) ([]byte, error)
- func (c *RedisCache) GetCachedSnapshot(ctx context.Context) (*types.Snapshot, error)
- func (c *RedisCache) GetRateLimitCount(ctx context.Context, key string) (int64, error)
- func (c *RedisCache) IncrementRateLimit(ctx context.Context, key string, window time.Duration) (int64, error)
- func (c *RedisCache) Ping(ctx context.Context) error
- func (c *RedisCache) ResetRateLimit(ctx context.Context, key string) error
- func (c *RedisCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
- func (c *RedisCache) SetCachedSnapshot(ctx context.Context, snapshot *types.Snapshot, ttl time.Duration) error
- func (c *RedisCache) SetWithNX(ctx context.Context, key string, value []byte, ttl time.Duration) (bool, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cache ¶
type Cache interface {
Close() error
Ping(ctx context.Context) error
// Generic key-value operations
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
Delete(ctx context.Context, key string) error
// Snapshot caching
GetCachedSnapshot(ctx context.Context) (*types.Snapshot, error)
SetCachedSnapshot(ctx context.Context, snapshot *types.Snapshot, ttl time.Duration) error
// Rate limiting
IncrementRateLimit(ctx context.Context, key string, window time.Duration) (int64, error)
}
Cache defines the interface for caching operations.
type Database ¶
type Database interface {
// Connection management
Close() error
Ping(ctx context.Context) error
Migrate(ctx context.Context) error
// Snapshots
SaveSnapshot(ctx context.Context, snapshot *types.Snapshot) error
GetLatestSnapshot(ctx context.Context) (*types.Snapshot, error)
GetSnapshotByID(ctx context.Context, id uuid.UUID) (*types.Snapshot, error)
GetSnapshotAt(ctx context.Context, timestamp time.Time) (*types.Snapshot, error)
DeleteOldSnapshots(ctx context.Context, olderThan time.Time) error
// Changes
SaveChange(ctx context.Context, change *types.Change) error
GetChangeByID(ctx context.Context, id uuid.UUID) (*types.Change, error)
GetChangesSince(ctx context.Context, since time.Time, limit int) ([]types.Change, error)
GetChangesForServer(ctx context.Context, serverName string, limit int) ([]types.Change, error)
GetChangeCountSince(ctx context.Context, since time.Time) (int, error)
// Subscriptions
CreateSubscription(ctx context.Context, sub *types.Subscription) error
GetSubscriptionByID(ctx context.Context, id uuid.UUID) (*types.Subscription, error)
GetSubscriptionByAPIKey(ctx context.Context, apiKeyHash string) (*types.Subscription, error)
GetActiveSubscriptions(ctx context.Context) ([]types.Subscription, error)
UpdateSubscription(ctx context.Context, sub *types.Subscription) error
DeleteSubscription(ctx context.Context, id uuid.UUID) error
ListSubscriptions(ctx context.Context, limit, offset int) ([]types.Subscription, int, error)
// Channels
CreateChannel(ctx context.Context, channel *types.Channel) error
GetChannelByID(ctx context.Context, id uuid.UUID) (*types.Channel, error)
GetChannelsForSubscription(ctx context.Context, subscriptionID uuid.UUID) ([]types.Channel, error)
UpdateChannel(ctx context.Context, channel *types.Channel) error
DeleteChannel(ctx context.Context, id uuid.UUID) error
// Notifications
SaveNotification(ctx context.Context, notification *types.Notification) error
UpdateNotification(ctx context.Context, notification *types.Notification) error
GetPendingNotifications(ctx context.Context, limit int) ([]types.Notification, error)
GetNotificationsForSubscription(ctx context.Context, subscriptionID uuid.UUID, limit int) ([]types.Notification, error)
// Stats
GetStats(ctx context.Context) (*types.StatsResponse, error)
}
Database defines the interface for database operations.
type NullCache ¶
type NullCache struct{}
NullCache is a no-op cache implementation for when Redis is not available.
func NewNullCache ¶
func NewNullCache() *NullCache
NewNullCache creates a new null cache that does nothing.
func (*NullCache) GetCachedSnapshot ¶
GetCachedSnapshot always returns nil (cache miss).
func (*NullCache) IncrementRateLimit ¶
func (c *NullCache) IncrementRateLimit(ctx context.Context, key string, window time.Duration) (int64, error)
IncrementRateLimit always returns 0 (no rate limiting).
type PostgresDB ¶
type PostgresDB struct {
// contains filtered or unexported fields
}
PostgresDB implements the Database interface using PostgreSQL.
func New ¶
func New(ctx context.Context, cfg config.DatabaseConfig) (*PostgresDB, error)
New creates a new PostgreSQL database connection.
func (*PostgresDB) Close ¶
func (db *PostgresDB) Close() error
Close closes the database connection pool.
func (*PostgresDB) CreateChannel ¶
CreateChannel creates a new notification channel.
func (*PostgresDB) CreateSubscription ¶
func (db *PostgresDB) CreateSubscription(ctx context.Context, sub *types.Subscription) error
func (*PostgresDB) DeleteChannel ¶
DeleteChannel deletes a channel by ID.
func (*PostgresDB) DeleteOldSnapshots ¶
DeleteOldSnapshots removes snapshots older than the given time.
func (*PostgresDB) DeleteSubscription ¶
DeleteSubscription deletes a subscription by ID.
func (*PostgresDB) GetActiveSubscriptions ¶
func (db *PostgresDB) GetActiveSubscriptions(ctx context.Context) ([]types.Subscription, error)
GetActiveSubscriptions retrieves all active subscriptions with their channels.
func (*PostgresDB) GetChangeByID ¶
GetChangeByID retrieves a change by ID.
func (*PostgresDB) GetChangeCountSince ¶
GetChangeCountSince returns the count of changes since the given timestamp.
func (*PostgresDB) GetChangesForServer ¶
func (db *PostgresDB) GetChangesForServer(ctx context.Context, serverName string, limit int) ([]types.Change, error)
GetChangesForServer retrieves changes for a specific server.
func (*PostgresDB) GetChangesSince ¶
func (db *PostgresDB) GetChangesSince(ctx context.Context, since time.Time, limit int) ([]types.Change, error)
GetChangesSince retrieves changes since the given timestamp.
func (*PostgresDB) GetChannelByID ¶
GetChannelByID retrieves a channel by ID.
func (*PostgresDB) GetChannelsForSubscription ¶
func (db *PostgresDB) GetChannelsForSubscription(ctx context.Context, subscriptionID uuid.UUID) ([]types.Channel, error)
GetChannelsForSubscription retrieves all channels for a subscription.
func (*PostgresDB) GetLatestSnapshot ¶
GetLatestSnapshot retrieves the most recent snapshot.
func (*PostgresDB) GetNotificationsForSubscription ¶
func (db *PostgresDB) GetNotificationsForSubscription(ctx context.Context, subscriptionID uuid.UUID, limit int) ([]types.Notification, error)
GetNotificationsForSubscription retrieves notification history for a subscription.
func (*PostgresDB) GetPendingNotifications ¶
func (db *PostgresDB) GetPendingNotifications(ctx context.Context, limit int) ([]types.Notification, error)
GetPendingNotifications retrieves pending notifications for retry processing.
func (*PostgresDB) GetSnapshotAt ¶
func (db *PostgresDB) GetSnapshotAt(ctx context.Context, timestamp time.Time) (*types.Snapshot, error)
GetSnapshotAt retrieves the snapshot closest to the given timestamp.
func (*PostgresDB) GetSnapshotByID ¶
GetSnapshotByID retrieves a snapshot by ID.
func (*PostgresDB) GetStats ¶
func (db *PostgresDB) GetStats(ctx context.Context) (*types.StatsResponse, error)
GetStats returns aggregate statistics.
func (*PostgresDB) GetSubscriptionByAPIKey ¶
func (db *PostgresDB) GetSubscriptionByAPIKey(ctx context.Context, apiKeyHash string) (*types.Subscription, error)
GetSubscriptionByAPIKey retrieves a subscription by hashed API key.
func (*PostgresDB) GetSubscriptionByID ¶
func (db *PostgresDB) GetSubscriptionByID(ctx context.Context, id uuid.UUID) (*types.Subscription, error)
GetSubscriptionByID retrieves a subscription by ID with its channels.
func (*PostgresDB) ListSubscriptions ¶
func (db *PostgresDB) ListSubscriptions(ctx context.Context, limit, offset int) ([]types.Subscription, int, error)
ListSubscriptions returns a paginated list of subscriptions with total count.
func (*PostgresDB) Migrate ¶
func (db *PostgresDB) Migrate(ctx context.Context) error
Migrate runs database migrations.
func (*PostgresDB) Ping ¶
func (db *PostgresDB) Ping(ctx context.Context) error
Ping checks if the database is reachable.
func (*PostgresDB) SaveChange ¶
SaveChange saves a change to the database.
func (*PostgresDB) SaveNotification ¶
func (db *PostgresDB) SaveNotification(ctx context.Context, notification *types.Notification) error
SaveNotification saves a notification record.
func (*PostgresDB) SaveSnapshot ¶
SaveSnapshot saves a snapshot to the database.
func (*PostgresDB) UpdateChannel ¶
UpdateChannel updates a channel including stats fields.
func (*PostgresDB) UpdateNotification ¶
func (db *PostgresDB) UpdateNotification(ctx context.Context, notification *types.Notification) error
UpdateNotification updates a notification's status, attempts, and error.
func (*PostgresDB) UpdateSubscription ¶
func (db *PostgresDB) UpdateSubscription(ctx context.Context, sub *types.Subscription) error
UpdateSubscription updates a subscription's fields.
type RedisCache ¶
type RedisCache struct {
// contains filtered or unexported fields
}
RedisCache implements the Cache interface using Redis.
func NewRedisCache ¶
func NewRedisCache(ctx context.Context, cfg config.RedisConfig) (*RedisCache, error)
NewRedisCache creates a new Redis cache connection.
func (*RedisCache) Delete ¶
func (c *RedisCache) Delete(ctx context.Context, key string) error
Delete removes a key from the cache.
func (*RedisCache) FlushAll ¶
func (c *RedisCache) FlushAll(ctx context.Context) error
FlushAll removes all keys from the cache (use with caution!).
func (*RedisCache) GetCachedSnapshot ¶
GetCachedSnapshot retrieves the cached snapshot from Redis.
func (*RedisCache) GetRateLimitCount ¶
GetRateLimitCount returns the current count for a rate limit key without incrementing.
func (*RedisCache) IncrementRateLimit ¶
func (c *RedisCache) IncrementRateLimit(ctx context.Context, key string, window time.Duration) (int64, error)
IncrementRateLimit increments a rate limit counter and returns the new count. The counter will expire after the given window duration. Uses a sliding window approach with Redis INCR and EXPIRE.
func (*RedisCache) Ping ¶
func (c *RedisCache) Ping(ctx context.Context) error
Ping checks if Redis is reachable.
func (*RedisCache) ResetRateLimit ¶
func (c *RedisCache) ResetRateLimit(ctx context.Context, key string) error
ResetRateLimit resets the rate limit counter for a key.
func (*RedisCache) SetCachedSnapshot ¶
func (c *RedisCache) SetCachedSnapshot(ctx context.Context, snapshot *types.Snapshot, ttl time.Duration) error
SetCachedSnapshot caches a snapshot in Redis.