storage

package
v0.0.0-...-8cff0bb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RedisFieldVersion    = "_version"
	RedisFieldData       = "_data"
	RedisFieldDeleted    = "_deleted"
	RedisFieldVersionTag = "_version_tag"
)
View Source
const (
	// Prefix for Redis list keys
	RedisListKeyPrefix  = "datahub:queue:"
	RedisIndexKeyPrefix = "datahub:index:"
	// Number of Redis list shards - fixed value to ensure consistent hashing
	RedisListShardCount = 64
)

Variables

This section is empty.

Functions

func GenQueueKey

func GenQueueKey(key string) string

func GenQueueKeyByShardId

func GenQueueKeyByShardId(shardID uint32) string

func GenerateIndexKey

func GenerateIndexKey(key string) string

func GenerateKey

func GenerateKey(pattern, uuid string) string

generateKey creates a unique Redis key for an entity

func ParseKey

func ParseKey(key string) (string, string)

Types

type MongoClient

type MongoClient struct {
	// contains filtered or unexported fields
}

MongoClient provides access to MongoDB for persistent storage

func NewMongoClient

func NewMongoClient(cfg config.MongoDBConfig) (*MongoClient, error)

NewMongoClient creates a new MongoDB client

func (*MongoClient) Close

func (m *MongoClient) Close() error

Close disconnects from MongoDB

func (*MongoClient) Delete

func (m *MongoClient) Delete(ctx context.Context, id primitive.ObjectID, collection string) error

Delete removes an entity from MongoDB

func (*MongoClient) Exists

func (m *MongoClient) Exists(ctx context.Context, id primitive.ObjectID, collection string) (bool, error)

Exists checks if a document exists in MongoDB

func (*MongoClient) Find

func (m *MongoClient) Find(ctx context.Context, collection string, filter bson.M, result any, opts ...*options.FindOptions) error

Find finds entities in MongoDB based on a filter

func (*MongoClient) Get

func (m *MongoClient) Get(ctx context.Context, id string, entity model.Entity) error

Get retrieves an entity from MongoDB

func (*MongoClient) GetCollection

func (m *MongoClient) GetCollection(name string) *mongo.Collection

GetCollection gets a MongoDB collection

func (*MongoClient) Insert

func (m *MongoClient) Insert(ctx context.Context, id primitive.ObjectID, collection string, kvs map[string]any) error

Insert inserts a new entity into MongoDB TODO:要考虑Mongo doc写入问题:

1: 数据源问题,是从redis中获取的数据?
2: version问题,是否需要递增

func (*MongoClient) Update

func (m *MongoClient) Update(ctx context.Context, id primitive.ObjectID, collection, versionTag string, version int64, kvs map[string]any) error

Update updates an existing entity in MongoDB with optimistic locking TODO:要考虑Mongo doc写入问题:

1: 数据源问题,是从redis中获取的数据?
2: version问题,是否需要递增

func (*MongoClient) Upsert

func (m *MongoClient) Upsert(ctx context.Context, id primitive.ObjectID, collection, versionTag string, version int64, kvs map[string]any) error

Upsert inserts a new document or updates an existing one in MongoDB based on version control Logic: 1. If document with _id doesn't exist, directly insert it 2. If document exists, update only if the new version > existing version, otherwise reject

type RedisClient

type RedisClient struct {
	// contains filtered or unexported fields
}

RedisClient provides access to Redis for caching

func NewRedisClient

func NewRedisClient(config config.RedisConfig, cacheExpiry time.Duration, serializer *serialize.Serializer) (*RedisClient, error)

NewRedisClient creates a new Redis client

func (*RedisClient) Close

func (r *RedisClient) Close() error

Close closes the Redis connection

func (*RedisClient) Decode

func (r *RedisClient) Decode(kvs map[string]string) (model.EntityInfo, error)

func (*RedisClient) Delete

func (r *RedisClient) Delete(ctx context.Context, id, collection string) error

TODO: TxPipeline 是否能满足跟Set操作满足数据修改原子性需求?需不需要乐观锁替代?

func (*RedisClient) Exists

func (r *RedisClient) Exists(ctx context.Context, id, collection string) (bool, error)

HExists 命令有明确的返回值语义:

当字段存在时返回 1(在Go-Redis中转换为 true)
当字段不存在时返回 0(在Go-Redis中转换为 false)
当哈希键不存在时也返回 0(在Go-Redis中转换为 false)
只有在连接问题、命令语法错误或其他Redis错误时才会返回错误。缺少的键或字段不被视为错误状态,而是通过返回布尔值 false 来表示。

func (*RedisClient) Find

func (r *RedisClient) Find(key string) (model.EntityInfo, error)

TODO: 需要对错误类型进行处理,调用者要根据错误类型进行处理,判断是否进行重试

func (*RedisClient) Get

func (r *RedisClient) Get(key string, entity model.Entity) error

Get retrieves an entity from Redis

func (*RedisClient) GetClient

func (r *RedisClient) GetClient() redis.UniversalClient

func (*RedisClient) HMSetNX

func (r *RedisClient) HMSetNX(key string, args []any) (bool, error)

func (*RedisClient) PipeHSetWithExpire

func (r *RedisClient) PipeHSetWithExpire(ctx context.Context, tx *redis.Tx, key, indexKey string, kvs map[string]any) error

func (*RedisClient) RPush

func (r *RedisClient) RPush(key, value string) error

RPush 将数据推入Redis列表

func (*RedisClient) Set

func (r *RedisClient) Set(key string, entity model.Entity) error

Set stores an entity in Redis with optimistic locking 如果version为1,则表示新创建的实体,否则表示更新实体 保证 version >= 1 如果version <= 0,则表示强制写入Redis,不进行版本控制

func (*RedisClient) SetIfNotExists

func (r *RedisClient) SetIfNotExists(key string, entity model.Entity) (bool, error)

SetIfNotExists 只在key不存在时写入数据 返回值: 是否写入成功, 错误信息

func (*RedisClient) Unmarshal

func (r *RedisClient) Unmarshal(kvs map[string]string, entity model.Entity) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL