db

package
v0.0.0-...-7bd6b6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package db provides database access for MCP Notify.

Package db provides database access for MCP Notify.

Package db provides database access for MCP Notify.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cache

type Cache interface {
	Close() error
	Ping(ctx context.Context) error

	// Generic key-value operations
	Get(ctx context.Context, key string) ([]byte, error)
	Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
	Delete(ctx context.Context, key string) error

	// Snapshot caching
	GetCachedSnapshot(ctx context.Context) (*types.Snapshot, error)
	SetCachedSnapshot(ctx context.Context, snapshot *types.Snapshot, ttl time.Duration) error

	// Rate limiting
	IncrementRateLimit(ctx context.Context, key string, window time.Duration) (int64, error)
}

Cache defines the interface for caching operations.

type Database

type Database interface {
	// Connection management
	Close() error
	Ping(ctx context.Context) error
	Migrate(ctx context.Context) error

	// Snapshots
	SaveSnapshot(ctx context.Context, snapshot *types.Snapshot) error
	GetLatestSnapshot(ctx context.Context) (*types.Snapshot, error)
	GetSnapshotByID(ctx context.Context, id uuid.UUID) (*types.Snapshot, error)
	GetSnapshotAt(ctx context.Context, timestamp time.Time) (*types.Snapshot, error)
	DeleteOldSnapshots(ctx context.Context, olderThan time.Time) error

	// Changes
	SaveChange(ctx context.Context, change *types.Change) error
	GetChangeByID(ctx context.Context, id uuid.UUID) (*types.Change, error)
	GetChangesSince(ctx context.Context, since time.Time, limit int) ([]types.Change, error)
	GetChangesForServer(ctx context.Context, serverName string, limit int) ([]types.Change, error)
	GetChangeCountSince(ctx context.Context, since time.Time) (int, error)

	// Subscriptions
	CreateSubscription(ctx context.Context, sub *types.Subscription) error
	GetSubscriptionByID(ctx context.Context, id uuid.UUID) (*types.Subscription, error)
	GetSubscriptionByAPIKey(ctx context.Context, apiKeyHash string) (*types.Subscription, error)
	GetActiveSubscriptions(ctx context.Context) ([]types.Subscription, error)
	UpdateSubscription(ctx context.Context, sub *types.Subscription) error
	DeleteSubscription(ctx context.Context, id uuid.UUID) error
	ListSubscriptions(ctx context.Context, limit, offset int) ([]types.Subscription, int, error)

	// Channels
	CreateChannel(ctx context.Context, channel *types.Channel) error
	GetChannelByID(ctx context.Context, id uuid.UUID) (*types.Channel, error)
	GetChannelsForSubscription(ctx context.Context, subscriptionID uuid.UUID) ([]types.Channel, error)
	UpdateChannel(ctx context.Context, channel *types.Channel) error
	DeleteChannel(ctx context.Context, id uuid.UUID) error

	// Notifications
	SaveNotification(ctx context.Context, notification *types.Notification) error
	UpdateNotification(ctx context.Context, notification *types.Notification) error
	GetPendingNotifications(ctx context.Context, limit int) ([]types.Notification, error)
	GetNotificationsForSubscription(ctx context.Context, subscriptionID uuid.UUID, limit int) ([]types.Notification, error)

	// Stats
	GetStats(ctx context.Context) (*types.StatsResponse, error)
}

Database defines the interface for database operations.

type NullCache

type NullCache struct{}

NullCache is a no-op cache implementation for when Redis is not available.

func NewNullCache

func NewNullCache() *NullCache

NewNullCache creates a new null cache that does nothing.

func (*NullCache) Close

func (c *NullCache) Close() error

Close does nothing.

func (*NullCache) Delete

func (c *NullCache) Delete(ctx context.Context, key string) error

Delete does nothing.

func (*NullCache) Get

func (c *NullCache) Get(ctx context.Context, key string) ([]byte, error)

Get always returns nil (cache miss).

func (*NullCache) GetCachedSnapshot

func (c *NullCache) GetCachedSnapshot(ctx context.Context) (*types.Snapshot, error)

GetCachedSnapshot always returns nil (cache miss).

func (*NullCache) IncrementRateLimit

func (c *NullCache) IncrementRateLimit(ctx context.Context, key string, window time.Duration) (int64, error)

IncrementRateLimit always returns 0 (no rate limiting).

func (*NullCache) Ping

func (c *NullCache) Ping(ctx context.Context) error

Ping always succeeds.

func (*NullCache) Set

func (c *NullCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error

Set does nothing.

func (*NullCache) SetCachedSnapshot

func (c *NullCache) SetCachedSnapshot(ctx context.Context, snapshot *types.Snapshot, ttl time.Duration) error

SetCachedSnapshot does nothing.

type PostgresDB

type PostgresDB struct {
	// contains filtered or unexported fields
}

PostgresDB implements the Database interface using PostgreSQL.

func New

New creates a new PostgreSQL database connection.

func (*PostgresDB) Close

func (db *PostgresDB) Close() error

Close closes the database connection pool.

func (*PostgresDB) CreateChannel

func (db *PostgresDB) CreateChannel(ctx context.Context, channel *types.Channel) error

CreateChannel creates a new notification channel.

func (*PostgresDB) CreateSubscription

func (db *PostgresDB) CreateSubscription(ctx context.Context, sub *types.Subscription) error

func (*PostgresDB) DeleteChannel

func (db *PostgresDB) DeleteChannel(ctx context.Context, id uuid.UUID) error

DeleteChannel deletes a channel by ID.

func (*PostgresDB) DeleteOldSnapshots

func (db *PostgresDB) DeleteOldSnapshots(ctx context.Context, olderThan time.Time) error

DeleteOldSnapshots removes snapshots older than the given time.

func (*PostgresDB) DeleteSubscription

func (db *PostgresDB) DeleteSubscription(ctx context.Context, id uuid.UUID) error

DeleteSubscription deletes a subscription by ID.

func (*PostgresDB) GetActiveSubscriptions

func (db *PostgresDB) GetActiveSubscriptions(ctx context.Context) ([]types.Subscription, error)

GetActiveSubscriptions retrieves all active subscriptions with their channels.

func (*PostgresDB) GetChangeByID

func (db *PostgresDB) GetChangeByID(ctx context.Context, id uuid.UUID) (*types.Change, error)

GetChangeByID retrieves a change by ID.

func (*PostgresDB) GetChangeCountSince

func (db *PostgresDB) GetChangeCountSince(ctx context.Context, since time.Time) (int, error)

GetChangeCountSince returns the count of changes since the given timestamp.

func (*PostgresDB) GetChangesForServer

func (db *PostgresDB) GetChangesForServer(ctx context.Context, serverName string, limit int) ([]types.Change, error)

GetChangesForServer retrieves changes for a specific server.

func (*PostgresDB) GetChangesSince

func (db *PostgresDB) GetChangesSince(ctx context.Context, since time.Time, limit int) ([]types.Change, error)

GetChangesSince retrieves changes since the given timestamp.

func (*PostgresDB) GetChannelByID

func (db *PostgresDB) GetChannelByID(ctx context.Context, id uuid.UUID) (*types.Channel, error)

GetChannelByID retrieves a channel by ID.

func (*PostgresDB) GetChannelsForSubscription

func (db *PostgresDB) GetChannelsForSubscription(ctx context.Context, subscriptionID uuid.UUID) ([]types.Channel, error)

GetChannelsForSubscription retrieves all channels for a subscription.

func (*PostgresDB) GetLatestSnapshot

func (db *PostgresDB) GetLatestSnapshot(ctx context.Context) (*types.Snapshot, error)

GetLatestSnapshot retrieves the most recent snapshot.

func (*PostgresDB) GetNotificationsForSubscription

func (db *PostgresDB) GetNotificationsForSubscription(ctx context.Context, subscriptionID uuid.UUID, limit int) ([]types.Notification, error)

GetNotificationsForSubscription retrieves notification history for a subscription.

func (*PostgresDB) GetPendingNotifications

func (db *PostgresDB) GetPendingNotifications(ctx context.Context, limit int) ([]types.Notification, error)

GetPendingNotifications retrieves pending notifications for retry processing.

func (*PostgresDB) GetSnapshotAt

func (db *PostgresDB) GetSnapshotAt(ctx context.Context, timestamp time.Time) (*types.Snapshot, error)

GetSnapshotAt retrieves the snapshot closest to the given timestamp.

func (*PostgresDB) GetSnapshotByID

func (db *PostgresDB) GetSnapshotByID(ctx context.Context, id uuid.UUID) (*types.Snapshot, error)

GetSnapshotByID retrieves a snapshot by ID.

func (*PostgresDB) GetStats

func (db *PostgresDB) GetStats(ctx context.Context) (*types.StatsResponse, error)

GetStats returns aggregate statistics.

func (*PostgresDB) GetSubscriptionByAPIKey

func (db *PostgresDB) GetSubscriptionByAPIKey(ctx context.Context, apiKeyHash string) (*types.Subscription, error)

GetSubscriptionByAPIKey retrieves a subscription by hashed API key.

func (*PostgresDB) GetSubscriptionByID

func (db *PostgresDB) GetSubscriptionByID(ctx context.Context, id uuid.UUID) (*types.Subscription, error)

GetSubscriptionByID retrieves a subscription by ID with its channels.

func (*PostgresDB) ListSubscriptions

func (db *PostgresDB) ListSubscriptions(ctx context.Context, limit, offset int) ([]types.Subscription, int, error)

ListSubscriptions returns a paginated list of subscriptions with total count.

func (*PostgresDB) Migrate

func (db *PostgresDB) Migrate(ctx context.Context) error

Migrate runs database migrations.

func (*PostgresDB) Ping

func (db *PostgresDB) Ping(ctx context.Context) error

Ping checks if the database is reachable.

func (*PostgresDB) SaveChange

func (db *PostgresDB) SaveChange(ctx context.Context, change *types.Change) error

SaveChange saves a change to the database.

func (*PostgresDB) SaveNotification

func (db *PostgresDB) SaveNotification(ctx context.Context, notification *types.Notification) error

SaveNotification saves a notification record.

func (*PostgresDB) SaveSnapshot

func (db *PostgresDB) SaveSnapshot(ctx context.Context, snapshot *types.Snapshot) error

SaveSnapshot saves a snapshot to the database.

func (*PostgresDB) UpdateChannel

func (db *PostgresDB) UpdateChannel(ctx context.Context, channel *types.Channel) error

UpdateChannel updates a channel including stats fields.

func (*PostgresDB) UpdateNotification

func (db *PostgresDB) UpdateNotification(ctx context.Context, notification *types.Notification) error

UpdateNotification updates a notification's status, attempts, and error.

func (*PostgresDB) UpdateSubscription

func (db *PostgresDB) UpdateSubscription(ctx context.Context, sub *types.Subscription) error

UpdateSubscription updates a subscription's fields.

type RedisCache

type RedisCache struct {
	// contains filtered or unexported fields
}

RedisCache implements the Cache interface using Redis.

func NewRedisCache

func NewRedisCache(ctx context.Context, cfg config.RedisConfig) (*RedisCache, error)

NewRedisCache creates a new Redis cache connection.

func (*RedisCache) Close

func (c *RedisCache) Close() error

Close closes the Redis connection.

func (*RedisCache) Delete

func (c *RedisCache) Delete(ctx context.Context, key string) error

Delete removes a key from the cache.

func (*RedisCache) Exists

func (c *RedisCache) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a key exists in the cache.

func (*RedisCache) Expire

func (c *RedisCache) Expire(ctx context.Context, key string, ttl time.Duration) error

Expire sets the TTL on an existing key.

func (*RedisCache) FlushAll

func (c *RedisCache) FlushAll(ctx context.Context) error

FlushAll removes all keys from the cache (use with caution!).

func (*RedisCache) Get

func (c *RedisCache) Get(ctx context.Context, key string) ([]byte, error)

Get retrieves a value from the cache.

func (*RedisCache) GetCachedSnapshot

func (c *RedisCache) GetCachedSnapshot(ctx context.Context) (*types.Snapshot, error)

GetCachedSnapshot retrieves the cached snapshot from Redis.

func (*RedisCache) GetRateLimitCount

func (c *RedisCache) GetRateLimitCount(ctx context.Context, key string) (int64, error)

GetRateLimitCount returns the current count for a rate limit key without incrementing.

func (*RedisCache) IncrementRateLimit

func (c *RedisCache) IncrementRateLimit(ctx context.Context, key string, window time.Duration) (int64, error)

IncrementRateLimit increments a rate limit counter and returns the new count. The counter will expire after the given window duration. Uses a sliding window approach with Redis INCR and EXPIRE.

func (*RedisCache) Ping

func (c *RedisCache) Ping(ctx context.Context) error

Ping checks if Redis is reachable.

func (*RedisCache) ResetRateLimit

func (c *RedisCache) ResetRateLimit(ctx context.Context, key string) error

ResetRateLimit resets the rate limit counter for a key.

func (*RedisCache) Set

func (c *RedisCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error

Set stores a value in the cache with a TTL.

func (*RedisCache) SetCachedSnapshot

func (c *RedisCache) SetCachedSnapshot(ctx context.Context, snapshot *types.Snapshot, ttl time.Duration) error

SetCachedSnapshot caches a snapshot in Redis.

func (*RedisCache) SetWithNX

func (c *RedisCache) SetWithNX(ctx context.Context, key string, value []byte, ttl time.Duration) (bool, error)

SetWithNX sets a value only if the key doesn't exist (for distributed locking).

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL