Documentation
¶
Index ¶
- Constants
- func GenQueueKey(key string) string
- func GenQueueKeyByShardId(shardID uint32) string
- func GenerateIndexKey(key string) string
- func GenerateKey(pattern, uuid string) string
- func ParseKey(key string) (string, string)
- type MongoClient
- func (m *MongoClient) Close() error
- func (m *MongoClient) Delete(ctx context.Context, id primitive.ObjectID, collection string) error
- func (m *MongoClient) Exists(ctx context.Context, id primitive.ObjectID, collection string) (bool, error)
- func (m *MongoClient) Find(ctx context.Context, collection string, filter bson.M, result any, ...) error
- func (m *MongoClient) Get(ctx context.Context, id string, entity model.Entity) error
- func (m *MongoClient) GetCollection(name string) *mongo.Collection
- func (m *MongoClient) Insert(ctx context.Context, id primitive.ObjectID, collection string, ...) error
- func (m *MongoClient) Update(ctx context.Context, id primitive.ObjectID, collection, versionTag string, ...) error
- func (m *MongoClient) Upsert(ctx context.Context, id primitive.ObjectID, collection, versionTag string, ...) error
- type RedisClient
- func (r *RedisClient) Close() error
- func (r *RedisClient) Decode(kvs map[string]string) (model.EntityInfo, error)
- func (r *RedisClient) Delete(ctx context.Context, id, collection string) error
- func (r *RedisClient) Exists(ctx context.Context, id, collection string) (bool, error)
- func (r *RedisClient) Find(key string) (model.EntityInfo, error)
- func (r *RedisClient) Get(key string, entity model.Entity) error
- func (r *RedisClient) GetClient() redis.UniversalClient
- func (r *RedisClient) HMSetNX(key string, args []any) (bool, error)
- func (r *RedisClient) PipeHSetWithExpire(ctx context.Context, tx *redis.Tx, key, indexKey string, kvs map[string]any) error
- func (r *RedisClient) RPush(key, value string) error
- func (r *RedisClient) Set(key string, entity model.Entity) error
- func (r *RedisClient) SetIfNotExists(key string, entity model.Entity) (bool, error)
- func (r *RedisClient) Unmarshal(kvs map[string]string, entity model.Entity) error
Constants ¶
const ( RedisFieldVersion = "_version" RedisFieldData = "_data" RedisFieldDeleted = "_deleted" RedisFieldVersionTag = "_version_tag" )
const ( // Prefix for Redis list keys RedisListKeyPrefix = "datahub:queue:" RedisIndexKeyPrefix = "datahub:index:" // Number of Redis list shards - fixed value to ensure consistent hashing RedisListShardCount = 64 )
Variables ¶
This section is empty.
Functions ¶
func GenQueueKey ¶
func GenQueueKeyByShardId ¶
func GenerateIndexKey ¶
func GenerateKey ¶
generateKey creates a unique Redis key for an entity
Types ¶
type MongoClient ¶
type MongoClient struct {
// contains filtered or unexported fields
}
MongoClient provides access to MongoDB for persistent storage
func NewMongoClient ¶
func NewMongoClient(cfg config.MongoDBConfig) (*MongoClient, error)
NewMongoClient creates a new MongoDB client
func (*MongoClient) Exists ¶
func (m *MongoClient) Exists(ctx context.Context, id primitive.ObjectID, collection string) (bool, error)
Exists checks if a document exists in MongoDB
func (*MongoClient) Find ¶
func (m *MongoClient) Find(ctx context.Context, collection string, filter bson.M, result any, opts ...*options.FindOptions) error
Find finds entities in MongoDB based on a filter
func (*MongoClient) GetCollection ¶
func (m *MongoClient) GetCollection(name string) *mongo.Collection
GetCollection gets a MongoDB collection
func (*MongoClient) Insert ¶
func (m *MongoClient) Insert(ctx context.Context, id primitive.ObjectID, collection string, kvs map[string]any) error
Insert inserts a new entity into MongoDB TODO:要考虑Mongo doc写入问题:
1: 数据源问题,是从redis中获取的数据? 2: version问题,是否需要递增
func (*MongoClient) Update ¶
func (m *MongoClient) Update(ctx context.Context, id primitive.ObjectID, collection, versionTag string, version int64, kvs map[string]any) error
Update updates an existing entity in MongoDB with optimistic locking TODO:要考虑Mongo doc写入问题:
1: 数据源问题,是从redis中获取的数据? 2: version问题,是否需要递增
func (*MongoClient) Upsert ¶
func (m *MongoClient) Upsert(ctx context.Context, id primitive.ObjectID, collection, versionTag string, version int64, kvs map[string]any) error
Upsert inserts a new document or updates an existing one in MongoDB based on version control Logic: 1. If document with _id doesn't exist, directly insert it 2. If document exists, update only if the new version > existing version, otherwise reject
type RedisClient ¶
type RedisClient struct {
// contains filtered or unexported fields
}
RedisClient provides access to Redis for caching
func NewRedisClient ¶
func NewRedisClient(config config.RedisConfig, cacheExpiry time.Duration, serializer *serialize.Serializer) (*RedisClient, error)
NewRedisClient creates a new Redis client
func (*RedisClient) Decode ¶
func (r *RedisClient) Decode(kvs map[string]string) (model.EntityInfo, error)
func (*RedisClient) Delete ¶
func (r *RedisClient) Delete(ctx context.Context, id, collection string) error
TODO: TxPipeline 是否能满足跟Set操作满足数据修改原子性需求?需不需要乐观锁替代?
func (*RedisClient) Exists ¶
HExists 命令有明确的返回值语义:
当字段存在时返回 1(在Go-Redis中转换为 true) 当字段不存在时返回 0(在Go-Redis中转换为 false) 当哈希键不存在时也返回 0(在Go-Redis中转换为 false) 只有在连接问题、命令语法错误或其他Redis错误时才会返回错误。缺少的键或字段不被视为错误状态,而是通过返回布尔值 false 来表示。
func (*RedisClient) Find ¶
func (r *RedisClient) Find(key string) (model.EntityInfo, error)
TODO: 需要对错误类型进行处理,调用者要根据错误类型进行处理,判断是否进行重试
func (*RedisClient) Get ¶
func (r *RedisClient) Get(key string, entity model.Entity) error
Get retrieves an entity from Redis
func (*RedisClient) GetClient ¶
func (r *RedisClient) GetClient() redis.UniversalClient
func (*RedisClient) PipeHSetWithExpire ¶
func (*RedisClient) Set ¶
func (r *RedisClient) Set(key string, entity model.Entity) error
Set stores an entity in Redis with optimistic locking 如果version为1,则表示新创建的实体,否则表示更新实体 保证 version >= 1 如果version <= 0,则表示强制写入Redis,不进行版本控制
func (*RedisClient) SetIfNotExists ¶
SetIfNotExists 只在key不存在时写入数据 返回值: 是否写入成功, 错误信息