Documentation ¶
Index ¶
- Constants
- Variables
- func AddNode(id string) (e error)
- func AddOrUpdateUnSafeExpandNode(gid, md5 string, nodes []GroupNode) (e error)
- func AddOrUpdateUnSafeFile(gid, md5 string) (e error)
- func AddP2PFile(md5, src_node string, size uint64, times int, add_no_source_file bool) (task_id int64, e error)
- func AddTaskNode(task_id uint64, nids []string) (e error)
- func CalculateExpandNodeTimeout(state int8) (timeout int64)
- func CalculateFileSize(size uint64) (file_size uint32)
- func CanDownloadFile(nid, md5 string) (yes bool, e error)
- func CheckFileOssExist(md5 string) (ossExist int, e error)
- func DeleteFile(md5 string) (e error)
- func DeleteNode(id string) (e error)
- func DeleteTaskNodeByTask(id uint64) (e error)
- func DeleteUnSafeFile(gid, md5 string) (e error)
- func DeleteUnSafeFileExpandNode(gid, node, md5 string) (e error)
- func DoUpdateGroupFirstExpandVer(gid string, old_finish_ver uint64) (e error)
- func Download(md5 string) (nodes []Peer, group *Group, sources []Peer, e error)
- func DownloadMore(md5 string, usedGroups []string) (nodes []Peer, group *Group, sources []Peer, e error)
- func ExpandFinished(id uint64, state int8) (e error)
- func ExpandGroupToPerfectSize(gid string) (e error)
- func GenPiece(gid, nid, md5 string) (e error)
- func GetAvailableNode(num int) (node []string, e error)
- func GetChecksum(md5 string) (checksum string, e error)
- func GetExpandTask(gid, nid, md5 string) (nodes []string, file *GroupFile, group *Group, e error)
- func GetExpandTaskById(id uint64) (nodes []string, file *GroupFile, group *Group, exNode *ExpandNode, e error)
- func GetExpandTaskLevel(gid string, ver uint64) (level int8, e error)
- func GetHasUnSafeFileNode(gid, md5 string, num uint32, ex_nids []string) (nids []string, e error)
- func GetUnSafeExpandTaskById(id uint64) (exNode *UnSafeExpandNode, file *GroupFile, group *Group, e error)
- func IncrGroupFileVer(gid, md5 string) (e error)
- func Init(ds IDataSource, lg *log.MLogger, open_check bool) (e error)
- func InvalidFile(nid, gid, md5 string) (e error)
- func IsAvailable(md5 string) (ok bool, e error)
- func IsExists(md5 string) (exist bool, e error)
- func IsExistsMore(md5s []string) (m map[string]bool, e error)
- func MinFileSize(nums map[uint32]uint64) (file_size uint32)
- func P2PExpandFinished(id uint64, state int8) (e error)
- func RestartInitExpandNodeState(node string) (e error)
- func UnSafeExpandFinished(id uint64, state int) (e error)
- func UpdateChecksum(md5, checksum string) (e error)
- func UpdateNode(node *Node, groupVersions map[string]uint64, tasks []uint64, is_super int) (groups []NodeGroupDetail, exNodes []ExpandNode, e error)
- func UpdateNode2(node *Node, groupVersions map[string]uint64, tasks []uint64, is_super int) (returnGroups []NodeGroupDetail, exNodes []ExpandNode, deleteGids []string, ...)
- func UpdateNodeWeight(nids []string) (e error)
- type ConfigSet
- func (cs *ConfigSet) FlushConfigValue() (err error)
- func (cs *ConfigSet) GetFloat64Value(key string) (value float64, err error)
- func (cs *ConfigSet) GetInt64Value(key string) (value int64, err error)
- func (cs *ConfigSet) GetStringValue(key string) (value string, err error)
- func (cs *ConfigSet) InitConfigValue()
- type DataSource
- func (ds *DataSource) AddFileToGroup(gid string, g *Group, file *GroupFile, fileVer uint64) (e error)
- func (ds *DataSource) FetchExpandTasks(nid string, num int) (exnodes []ExpandNode, e error)
- func (ds *DataSource) FillEmptyGroupFile(gid string) (e error)
- func (ds *DataSource) GetNoFileNodes(gid, md5 string) (nodes []string, e error)
- func (ds *DataSource) GetNodesByIds(ids []string) (nodes []NodeDetail, e error)
- func (ds *DataSource) GetOnlineNodesByIds(ids []string, min_update_tm int64) (peers []Peer, e error)
- func (ds *DataSource) GetOnlineSourceFileNodes(md5 string, num int) (peers []Peer, e error)
- func (ds *DataSource) IsFileExists(md5 string) (ok bool, e error)
- func (ds *DataSource) IsMoreFileExists(md5s []string) (m map[string]bool, e error)
- func (ds *DataSource) IsP2PFileExists(md5 string) (ok bool, file *GroupFile, e error)
- func (ds *DataSource) UpdateExpandNodeState(gid, nid, md5 string, state int8) (e error)
- func (ds *DataSource) UpdateExpandNodeTimeout(id uint64) (e error)
- func (ds *DataSource) UpdateGroupSize(isAdd bool, group *Group, size uint64) (e error)
- type ExpandNode
- type File
- type Group
- func (group *Group) AddFile(md5 string, src_node string, size uint64) (e error)
- func (group *Group) AddP2PFile(md5 string, size uint64, src_node string, fileVer uint64) (e error)
- func (group *Group) DeleteFile(file *GroupFile) (e error)
- func (group *Group) ExpandNodes(num uint32, active_groups int8, nid string) (e error)
- func (group *Group) ExpandNodesToGroup(nodes map[string]bool) (e error)
- func (group *Group) ExpandNodesToPerfectSize(active_groups int8, nid string) (e error)
- type GroupFile
- type GroupNode
- type GroupPieceInfo
- type IDataSource
- type Node
- type NodeDetail
- type NodeGroupDetail
- type Peer
- type TaskNode
- type UnSafeExpandNode
Constants ¶
const ( UNSAFE_EXPAND_STATE_INIT = 0 UNSAFE_EXPAND_STATE_FINISHED = 1 )
危险任务状态 0-初始化 1-完成
const ( GROUPFILE_TYPE_SPRAND_FIRST = 0 GROUPFILE_TYPE_NEW_ADD = 1 )
分组文件状态 0-完成首次扩散 1- 新增文件
const ADD_FILE_COUNT_PART = 16
添加文件节点数量标准
const ADD_FILE_TEST_TIME = 1
添加文件时递归尝试次数
const ADD_P2P_FILE_CONFIG_KEY = "add_p2p_file"
addp2pfile概率key值
const ALL, NORMAL, DELETED int = 2, 1, 0
const CHECKER_CREATEGROUP_LAST_TM string = "create_group_last_tm"
const CHECKER_EXPAND_NODE_PREFIX string = "expand_prefix_"
const CHECKER_EXPAND_TASK_TIME string = "checker_expand_task_time"
const CHECKER_GEN_PIECETM_PRIFIX string = "gen_piece_"
const CHECKER_GROUP_EXPAND string = "group_expand"
检测各分组在线节点数量,并扩张在线数量不足的分组
const CHECKER_GROUP_FILE_NEW_ADD_TIMEOUT string = "new_gf_timeout"
检测 group_file 中不活跃节点(上次 AddP2pFile时间距离现在超过1周但还未完成的文件)
const CHECKER_NODE_ONLINETM_CHECKER string = "node_online_checker"
const CHECKER_ONLINE_NODE = "checker_node_online"
检测在线节点
const CHECKER_ONLINE_NODE_MIN = 5
const CHECKER_TASK_PROCESS_SLOW_MIN = 10
检测卡住任务间隔(分钟)
const CHECKER_TIMEOUT_LAST_TM string = "checker_last_tm"
p2p自启动检测服务执行时间戳
const CHECK_GROUP_EXPAND_TIME int = 5
分组扩张检测时间间隔(分钟)
const CON_CONFIG_KEY = "con"
con设置key值
const CREATE_NEWGROUP_BALANCE_RATIO int = 20
创建新分组标准,当分组总量剩余百分比时,则创建新分组
const DEFAULT_ADD_P2P_FILE int64 = 0
addp2pfile概率默认值
const DEFAULT_CON int64 = 20
节点能承受的最大下载并发数con默认值
const DEFAULT_DELEGATES_NODE_SPEED int64 = 500 * 1024
代理上行速度限制默认值(字节)
const DEFAULT_GEN_PIECE_LEVEL int64 = 0
genPieceLevel默认值
const DEFAULT_MAX_HOUR int64 = 8
max_hour默认值
const DEFAULT_OSS_SPLIT_SIZE int64 = 5 * 1024 * 1024
oss_split_size默认值
const DEFAULT_P2P_UPSPEED_LIMIT int64 = 1048576
p2p系统节点上行速度限制
const DEFAULT_SECOND_EXPAND_SPEED int64 = 300 * 1024
二次扩算任务上行速度限制默认值(字节)
const DEFAULT_TRANS_NODE int64 = 1
node直传默认值
const DELEGATES_MIN_SPEED_CONFIG_KEY string = "delegate_min_speed"
代理上行速度限制Key值
const EXPAND_GROUP_ADDRATIO uint32 = 16
组扩散时,节点扩散界限 g.SafePiece+g.MinPiece/16
const EXPAND_MAX_FAIL_TIMES_EACH_NODE uint32 = 1
const EXPAND_STATE_FAILED int8 = 4
const EXPAND_STATE_FINISHED int8 = 3
const EXPAND_STATE_INIT int8 = 0
const EXPAND_STATE_NOTIFIED int8 = 1
const EXPAND_STATE_STARTED int8 = 2
const EXPAND_TASK_DELETE_TIME int64 = 5 * 24 * 3600
扩散任务删除时间
const EXPAND_TASK_FINISH_COUNT_PART = 32
扩散完成判断标准
const EXPAND_TRANS_TYPE_BOTH int8 = 2
const EXPAND_TRANS_TYPE_NODE int8 = 1
const EXPAND_TRANS_TYPE_OSS int8 = 0
扩散传输文件方式
const FIRST_EXPAND_FINISH_NUM int = 160
首次扩散完成节点数
const GEN_PIECE_LEVEL_CONFIG_KEY = "gen_piece_level"
genPieceLevel设置的key值
const GID_LEN uint = 32
分组ID的长度
const GROUP_FILE_NEW_ADD_DIFF_TIME int = 10
const GROUP_NODE_CAPACITY uint64 = 2 * 1024 * 1024 * 1024
分组在每个节点上所占用的空间(字节)
const MAX_EXPAND_NODE_NUM uint8 = 1
并行生成碎片的最大节点数量
const MAX_EXPAND_TAKS_FAIL_NUMS uint8 = 3
扩散任务最大失败次数
const MAX_EXPAND_TASK_NUM int = 30
每次分配给节点扩充任务的数量上限
const MAX_FILE_SIZE uint64 = 50 * 1024 * 1024 * 1024
文件尺寸上限
const MAX_HOUR_CONFIG_KEY string = "max_hour"
max_hour设置key值
const MAX_NODE_EXPANDTASK_CNT uint32 = 100
节点最大的分配任务数
const NEW_NODE_ACTIVE_GROUP_COUNT int8 = 0
新节点所属未满分组最大数量
const NEW_NODE_CREATE_GROUP_COUNT = 208
新节点创建分组数量阈值(节点的数量)
const NODE_CHECKDELETE_TM int64 = 6
检测超时检点删除间隔时间(小时)
const NODE_DELETE_TIMEOUT int64 = 30
节点超时则将其从p2p系统移除(天)
const NODE_EXPAND_GROUP_VALID_TIME int64 = 120
组扩散节点选取节点有效时间(2个汇报周期)
const NODE_EXPAND_MIN_ONLINE_CNT int = 144
扩散组节点在线时长配置,需要符合online_cnt数
const NODE_MAX_ACTIVE_GROUPS int8 = 11
节点所属的未满分组的最大数量
const NODE_MIN_ACTIVE_GROUPS int8 = 9
const NODE_OCCUPY_PERCENT int8 = 50
节点空间占用比例(%)
const NODE_ONLINETM_INTERVAL_TM int = 1
节点在线检测间隔(小时)
const NODE_TAKS_MAX_NUM int8 = 100
node所能同时执行的接收任务数
const NODE_VALID_AFTER_REGTM int64 = 7 * 86400
节点老化时长
const NODE_VALID_TIME int64 = 600
节点的有效期(秒数)
const NORMAL_NODE_OCCUPY_PERCENT int8 = 1
普通节点(非超级硬盘)空间占用比例(%)
const ONLINE, OFFLINE int = 1, 0
const OSS_SPLIT_SIZE_CONFIG_KEY = "oss_split_size"
oss_split_size设置key值
const P2P_DOWNLOAD_CACHE = "download_cache"
p2p节点开启下载piece缓存
const P2P_MERGE_PIECE = "merge_piece"
p2p节点开启小文件合并
const P2P_UPSPEED_LIMIT_KEY = "p2p_upspeed_limit"
p2p系统节点上行速度限制key值
const PIECE_MIN_NUM uint32 = 32
const PIECE_PERFECT_NUM uint32 = 64
const PIECE_SAFE_NUM uint32 = 48
const PIECE_SIZE uint32 = 1024
const SPREAD_MIN_SPEED_CONFIG_KEY string = "sprand_min_speed"
二次扩散任务上行速度限制Key值
const TASK_PROCESS_SLOW_TM = 3600
定义任务卡住的超时用时
const TRANS_NODE_CONFIG_KEY = "trans_node"
node直传的key值 1 BOTH 0 OSS
const UNSAFE_EXPAND_DEL_ALIVE = 0
const UNSAFE_EXPAND_DEL_DELETE = 1
危险任务删除标志
const UPDATE_CONFIG_MAP string = "update_config_map"
更新ConfigMap
const UPDATE_CONFIG_MAP_TIME int = 60
更新ConfigMap时间间隔(秒)
const YES, NO int = 1, 0
Variables ¶
var GROUP_CONFIG []GroupPieceInfo = []GroupPieceInfo{{1024, 32, 48, 64}, {1024, 64, 96, 128}, {1024, 128, 160, 208}}
var P2pGetLockTimeOut int64 = 1 //所有使用同步锁的地方,超过该值还未获取到时,这直接放弃,业务需要根据实际情况来处理(单位秒)
var P2pLockExpireSec int64 = 5 //同步锁到期时间5秒(单位秒)
Functions ¶
func AddOrUpdateUnSafeExpandNode ¶
添加unsafe_expand_node
func AddP2PFile ¶
func AddP2PFile(md5, src_node string, size uint64, times int, add_no_source_file bool) (task_id int64, e error)
新版逻辑独立添加文件逻辑,通过查找符合条件的节点,然后确定分组,然后生成任务,并返回
func CalculateFileSize ¶
func CanDownloadFile ¶
节点是否有权下载此文件。节点所属的分组中必须含有此文件,或者节点自身就含有此文件。
func DeleteFile ¶
func DeleteNode ¶
func DeleteUnSafeFileExpandNode ¶
删除危险文件任务
func DoUpdateGroupFirstExpandVer ¶
修改分组中完成首次扩散文件的版本号
func Download ¶
获取文件的下载节点
参数:
md5: 文件的md5
返回值:
nodes: 可用的节点列表 group: 这些节点所在的分组信息 sources: 源文件所在的节点(都是在线的)
func DownloadMore ¶
func DownloadMore(md5 string, usedGroups []string) (nodes []Peer, group *Group, sources []Peer, e error)
获取更多文件的下载节点,排除掉已经用过的分组
参数:
md5: 文件的md5 usedGroups: 已经用过的分组
返回值:
nodes: 可用的节点列表,最多返回拼回原始数据所需要最小节点数的1倍,客户端如果还是下载不成功, 则认为下载失败。 group: 节点所属分组信息 sources: 源文件所在的节点(都是在线的)
func GetAvailableNode ¶
获取可以添加文件的节点(获取当前可以添加组的节点)
func GetChecksum ¶
func GetExpandTaskById ¶
func GetExpandTaskById(id uint64) (nodes []string, file *GroupFile, group *Group, exNode *ExpandNode, e error)
获取扩充节点任务的相关信息
参数:
id: Id
返回值:
nodes: 需要碎片的节点ID
func GetExpandTaskLevel ¶
根据条件确定任务优先级
func GetHasUnSafeFileNode ¶
func GetUnSafeExpandTaskById ¶
func GetUnSafeExpandTaskById(id uint64) (exNode *UnSafeExpandNode, file *GroupFile, group *Group, e error)
获取危险任务
参数:
id: Id
返回值:
func InvalidFile ¶
无效文件汇报,该文件无法生成piece 将会从p2p系统中删除,移入问题文件表
func MinFileSize ¶
func RestartInitExpandNodeState ¶
重置重启节点的任务状态
func UpdateChecksum ¶
func UpdateNode ¶
func UpdateNode(node *Node, groupVersions map[string]uint64, tasks []uint64, is_super int) (groups []NodeGroupDetail, exNodes []ExpandNode, e error)
更新节点及其分组信息
参数:
node: 节点信息 groupVersions: 节点所属分组的文件同步版本号 tasks: 正在扩散的任务的心跳
返回值:
groups: 节点所属分组的最新信息 exNodes: 需要此节点执行扩充任务的文件列表
func UpdateNode2 ¶
func UpdateNode2(node *Node, groupVersions map[string]uint64, tasks []uint64, is_super int) (returnGroups []NodeGroupDetail, exNodes []ExpandNode, deleteGids []string, e error)
更新节点及其分组信息
参数:
node: 节点信息 groupVersions: 节点所属分组的文件同步版本号 tasks: 正在扩散的任务的心跳
返回值:
groups: 节点所属分组的最新信息 exNodes: 需要此节点执行扩充任务的文件列表
Types ¶
type ConfigSet ¶
ConfigSet set
var ConfigMap *ConfigSet
func (*ConfigSet) FlushConfigValue ¶
func (*ConfigSet) GetFloat64Value ¶
func (*ConfigSet) GetInt64Value ¶
func (*ConfigSet) GetStringValue ¶
func (*ConfigSet) InitConfigValue ¶
func (cs *ConfigSet) InitConfigValue()
type DataSource ¶
type DataSource struct { //数据接口的实现 Raw IDataSource }
func (*DataSource) AddFileToGroup ¶
func (*DataSource) FetchExpandTasks ¶
func (ds *DataSource) FetchExpandTasks(nid string, num int) (exnodes []ExpandNode, e error)
取出还未通知节点的任务列表
参数:
nid: 节点ID num: 最多返回的个数
返回值:
exNodes: 任务列表
func (*DataSource) FillEmptyGroupFile ¶
func (ds *DataSource) FillEmptyGroupFile(gid string) (e error)
func (*DataSource) GetNoFileNodes ¶
func (ds *DataSource) GetNoFileNodes(gid, md5 string) (nodes []string, e error)
获取分组中没有该文件碎片的节点
参数:
gid: 分组ID md5: 文件的md5
返回值:
nodes: 没有该文件的节点列表
func (*DataSource) GetNodesByIds ¶
func (ds *DataSource) GetNodesByIds(ids []string) (nodes []NodeDetail, e error)
批量获取节点ids 参数: ids:节点ids 返回值: exNodes: 节点列表
func (*DataSource) GetOnlineNodesByIds ¶
func (ds *DataSource) GetOnlineNodesByIds(ids []string, min_update_tm int64) (peers []Peer, e error)
func (*DataSource) GetOnlineSourceFileNodes ¶
func (ds *DataSource) GetOnlineSourceFileNodes(md5 string, num int) (peers []Peer, e error)
随机获取源文件所在的节点列表 参数: md5: 文件的md5 num: 需要的数量 返回值: peers: 拥有原始文件的节点ID列表
func (*DataSource) IsFileExists ¶
func (ds *DataSource) IsFileExists(md5 string) (ok bool, e error)
func (*DataSource) IsMoreFileExists ¶
func (ds *DataSource) IsMoreFileExists(md5s []string) (m map[string]bool, e error)
func (*DataSource) IsP2PFileExists ¶
func (ds *DataSource) IsP2PFileExists(md5 string) (ok bool, file *GroupFile, e error)
判断是否需要添加该文件
func (*DataSource) UpdateExpandNodeState ¶
func (ds *DataSource) UpdateExpandNodeState(gid, nid, md5 string, state int8) (e error)
更新任务状态 参数: gid: 分组ID nid: 节点ID md5: 文件 state: 任务状态 increment_failed_times: 是否增加失败次数 返回值: exNodes: 任务列表
func (*DataSource) UpdateExpandNodeTimeout ¶
func (ds *DataSource) UpdateExpandNodeTimeout(id uint64) (e error)
更新任务超时时间 参数: gid: 分组ID nid: 节点ID md5: 文件 返回值: exNodes: 任务列表
func (*DataSource) UpdateGroupSize ¶
func (ds *DataSource) UpdateGroupSize(isAdd bool, group *Group, size uint64) (e error)
更新分组大小
参数:
isAdd: 是否是添加文件,false表示是删除文件 gid: 分组的ID size: 文件大小
返回值:
type ExpandNode ¶
type ExpandNode struct { ID uint64 `json:"id"` Group string `json:"group"` Node string `json:"node"` MD5 string `json:"md5"` State int8 `json:"state"` Tm int64 `json:"tm"` //创建时间,秒数 Timeout int64 `json:"timeout"` //超时时间,秒数 FailedTimes uint32 `json:"failed_times"` //扩散失败次数 Size uint64 `json:"size"` //文件大小 Level int8 `json:"level"` //任务优先级 Ver uint64 `json:"ver"` //文件在组中的版本 }
func (*ExpandNode) IsFinished ¶
func (en *ExpandNode) IsFinished() bool
type Group ¶
type Group struct { ID string `json:"id"` Size uint64 `json:"size"` FileSize uint32 `json:"file_size"` //分组文件大小范围,1: 1-10M,10: 10-100M, 100: 100-1000M,以此类推 PieceSize uint32 `json:"piece_size"` //碎片大小 MinPieces uint32 `json:"min_pieces"` //最小碎片数(能恢复出原始数据的最小碎片数) SafePieces uint32 `json:"safe_pieces"` //安全碎片数(小于该数量就会出发再扩散流程) PerfectPieces uint32 `json:"perfect_pieces"` //再扩散后要达到的碎片数 FirstFinishVer uint64 `json:"first_finish_ver"` //组中首次扩散完成的文件版本 DeletedVer uint64 `json:"deleted_ver"` //组中删除的文件版本号 }
客户端汇报上来的节点信息
func CheckNewNodeAndCreateGroup ¶
检测新加入节点数量并创建分组
func CreateGroup ¶
func GetNodeAvailableGroup ¶
type GroupFile ¶
type GroupFile struct { File `json:"file"` Ver uint64 `json:"ver"` State int `json:"state"` //1-NORMAL,0-DELETED Group string `json:"group"` //文件所在组 Type int `json:"type"` //文件类型 0-首次扩散 1-新增 AddVer uint64 `json:"add_ver"` // 新增文件版本 LastAddTm uint64 `json:"last_add_tm"` // 新增文件版本 SrcNode string `json:"src_node"` // 文件源节点 }
分组文件信息
func ListUpdatedFiles ¶
获取分组中版本号大于ver的文件列表
参数:
gid: 分组ID ver: 起始版本号(不包括本版本号) num: 获取文件数量
返回值:
files: 文件列表
type GroupNode ¶
type GroupPieceInfo ¶
type IDataSource ¶
type IDataSource interface { /* 自增ID */ AtomicIncrID(key string) (uint64, error) /* 获取自增ID,如果key不存在,返回0 */ GetIncrID(key string) (uint64, error) /* 获取节点表已完成超时检查的时间点(秒数)。 参数: 返回值: tm: 上次检查到的时间点(秒数) */ GetTimeoutNodeCheckedTime() (tm int64, e error) /* 更新节点表已完成超时检查的时间点(秒数)。 参数: tm: 新的时间点(秒数) 返回值: */ UpdateTimeoutNodeCheckedTime(tm int64) (e error) /* 获取state状态下文件在各分组的详情 参数: md5: 文件的md5 state: 文件的状态,ALL/NORMAL/DELETED 返回值: files: 文件分组详情,key-分组ID */ GetFileGroups(md5 string, state int) (files map[string]GroupFile, e error) //获取新增超时文件 GetNewAddTimeOutGroupFile(tm int64, num int) (files []GroupFile, e error) /* 随机获取源文件所在的节点列表 参数: md5: 文件的md5 num: 需要的数量 返回值: ids: 拥有原始文件的节点ID列表 */ GetSourceFileNodes(md5 string, num int) (ids []string, e error) /* 节点自身是否有这个文件 参数: md5: 文件的md5 nid: 节点ID 返回值: yes: 是否存在 */ IsNodeHasFile(nid string, md5 string) (yes bool, e error) /* 获取文件在系统中的数量 参数: md5: 文件的md5 返回值: count: 数量 */ GetSourceFileCount(md5 string) (count int, e error) /* 根据节点ID筛选其中在线的,并返回Peer信息 参数: ids: 候选节点 timeout: 超时的时间点 返回值: peers: 当前在线的节点信息 */ GetOnlinePeers(ids []string, timeout int64) (peers []Peer, e error) /* 获取文件详情 参数: md5: 文件的md5 state: 状态 返回值: file: 文件详情,nil-在e==nil时表示未找到 */ GetFileByMd5AndState(md5 string, state int) (files []GroupFile, e error) /* 获取分组文件详情 参数: gid: 分组ID md5: 文件的md5 返回值: file: 文件详情,nil-在e==nil时表示未找到 */ GetGroupFile(gid, md5 string) (file *GroupFile, e error) /* 获取版本号大于ver的文件详情 参数: gid: 分组ID ver: 起始版本号 num: 要获取的数量 返回值: files: 文件分组详情 */ ListUpdatedFiles(gid string, ver uint64, num int, tp int) (files []GroupFile, e error) /* 根据分组中的文件计算分组的大小,要剔除掉已删除的文件 */ CalculateGroupSize(gid string) (e error) /* 获取包含该文件的分组数量,不包括文件状态是已删除的分组 */ GetFileGroupsCount(md5 string) (count int, e error) /* 获取包含该文件的分组数量,不包括文件状态是已删除的分组 */ GetMoreFileGroupsCount(md5s []string) (m map[string]bool, e error) /* 向分组添加文件 参数: gid: 分组ID file: 文件详情 返回值: */ AddFileToGroup(gid string, file *GroupFile) (e error) UpdateGroupFile(gid string, file *GroupFile) (e error) UpdateGroupFileTpAndVer(gid, md5 string, ver uint64) (e error) DeleteGroupFile(gid string, md5 string) (e error) //添加文件版本 IncrFileVer(gid string, md5 string, ver uint64) (e error) /* 获取超时的节点 参数: from: 起始时间(秒数) to: 截止时间(秒数) num: 最多获取多少个节点 返回值: nodes: 超时节点列表 */ GetTimeoutNodes(from int64, to int64, num int) (nodes []NodeDetail, e error) AddNode(node *NodeDetail) (e error) DeleteNode(id string) (e error) IsNodeExist(nid string) (exist bool, e error) UpdateNode(node *NodeDetail) (e error) UpdateNodeWeight(nid string, weight float64) (e error) /* 获取剩余空间足够的在线节点,注册时间也要超过一定时间 参数: groupCapacity: 分组的容量(字节) updateTm: 上次更新时间必须晚于此时刻 regTm: 注册时间必须早于此时间点 num: 需要的节点数量 返回值: nodes: 节点ID列表 */ GetAvailableNodes(groupCapacity uint64, updateTm, regTm int64, offset, num uint32, active_groups int8, online_cnt int) (nodes []string, e error) GetAvailableNodesCount(groupCapacity uint64, updateTm, regTm int64, activ_groups int8, online_cnt int) (num uint32, e error) //获取最近更新且符合添加到组的节点 GetAvailableNode(groupCapacity uint64, updateTm, regTm int64, online_cnt, num int) (nodes []string, e error) /* 获取占用空间最小的分组 参数: groupCapacity: 分组的容量(字节) fileSize: 分组文件大小范围 返回值: group: 分组详情 */ GetAvailableGroup(fileSize uint32) (group *Group, e error) GetGroup(gid string) (group *Group, e error) GetAllGroup() (groups map[string]Group, e error) AddGroup(group *Group) (e error) UpdateGroupSize(group *Group, filesize int64) (e error) /* 获取每种文件尺寸范围的可用分组数量 参数: groupCapacity: 分组的容量(字节) 返回值: groups: 分组数量,key-file_size,value-数量 */ GetActiveGroupsCount(groupCapacity uint64) (groups map[uint32]uint32, e error) /* 获取每种文件尺寸范围的可用分组剩余空间 参数: groupCapacity: 分组的容量(字节) 返回值: groups: 分组剩余空间,key-file_size,value-字节 */ GetActiveGroupsLeftSpace(groupCapacity uint64) (groups map[uint32]uint64, e error) /* 向分组添加节点 参数: gid: 分组ID node: 节点信息 返回值: */ AddNodeToGroup(gid string, node *GroupNode) (e error) UpdateGroupNode(gid string, node *GroupNode, isVerChange bool) (e error) DeleteGroupNode(gid, nid string) (e error) /* 获取分组中节点版本号>=ver的在线节点 */ GetFileNodes(gid string, ver uint64) (nodes []Peer, e error) /* 获取分组中节点版本号<ver的所有节点,包括不在线的 */ GetNoFileNodes(gid string, ver uint64) (nodes []string, e error) /* 获取分组中所有节点的MD5值,包括不在线的 */ GetAllFileNodes(gid string) (nodes []string, e error) GetGroupNodes(gid string) (nodes []GroupNode, e error) /* 随机获取分组中一个在线的节点 */ GetRandomGroupNode(gid string) (node *GroupNode, e error) /* 获取节点详情 参数: nid: 节点ID 返回值: detail: 节点详情,nil-在e==nil时表示未找到 */ GetNodeDetail(nid string) (detail *NodeDetail, e error) /* 获取节点所属的group集合 参数: nid: 节点ID */ GetNodeGroups(nid string) (groups []Group, e error) /* 随机获取 一个可用分组 */ GetRandomNodeGroup(nid string) (group Group, e error) /* 获取节点所属的group数量 参数: nid: 节点ID */ GetNodeGroupCount(nid string) (num uint32, e error) /* 获取节点所在分组详情 参数: nid: 节点ID */ GetNodeGroupDetail(nid string) (groups []NodeGroupDetail, e error) GetNodeGroupState(nid string) (groups map[string]GroupNode, e error) /* 获取分组中在线节点的数量 参数: gid: 分组ID */ GetGroupOnlineNodesCount(gid string) (num uint32, e error) /* 获取UPNP可用的节点列表,按节点更新时间升序排列返回 */ GetUPNPAvailableNodes(num int, updateTm int64) (nodes []Peer, e error) AddToInvalidFile(nid, gid, md5 string, tm int64) (e error) UpdateChecksum(md5, checksum string) (e error) /* 获取文件的checksum 参数: md5: 文件的md5 返回值: checksum: 校验和,"" - 在e==nil时表示未找到 */ GetChecksum(md5 string) (checksum string, e error) IncrementActiveGroups(nid string) (e error) /* 获取当前正在扩充的节点列表 参数: gid: 分组ID md5: 文件的md5 返回值: exNodes: 正在扩充节点列表 */ GetValidExpandNodes(gid, md5 string) (exNodes []ExpandNode, e error) /* 获取扩充节点信息 参数: gid: 分组ID nid: 节点ID md5: 文件的md5 返回值: exNode: 扩充节点详情,nil - 在e==nil时表示未找到 */ GetExpandNode(gid, nid, md5 string) (exNode *ExpandNode, e error) /* 获取扩充节点信息 参数: id: ID 返回值: exNode: 扩充节点详情,nil - 在e==nil时表示未找到 */ GetExpandNodeById(id uint64) (exNode *ExpandNode, e error) /* 获取某个节点的扩充任务列表 参数: nid: 节点ID state: 任务状态 num: 返回的数量上限 返回值: exNodes: 扩充任务列表 */ GetExpandTasks(nid string, state int8, num int) (exNodes []ExpandNode, e error) /* 添加或更新扩充节点信息 参数: exNodes: 扩充节点详情 返回值: */ AddOrUpdateExpandNode(exNode *ExpandNode) (task_id int64, e error) /* 更新扩充节点状态 参数: gid: 扩充节点所在分组 nid: 扩充节点ID md5: 要扩充的文件 state: 扩充节点状态 返回值: */ UpdateExpandNodeState(gid, nid, md5 string, state int8, timouet int64, increment_failed_times bool) (e error) /* 批量更新扩充节点状态 参数: nid: 扩充节点ID state: 扩充节点状态 返回值: */ UpdateExpandNodesState(nid string, state int8, timouet int64) (e error) /* 更新扩充节点超时时间 参数: gid: 扩充节点所在分组 nid: 扩充节点ID md5: 要扩充的文件 返回值: */ UpdateExpandNodeTimeout(id uint64, timouet int64) (e error) /* 删除扩充节点 参数: gid: 扩充节点所在分组 nid: 扩充节点ID md5: 要扩充的文件 返回值: */ DeleteExpandNode(gid, nid, md5 string) (e error) /* 获取分组文件总失败次数 参数: gid: 扩充节点所在分组 md5: 要扩充的文件 返回值: times: 总失败次数 */ GetExpandTaskTotalFailedTimes(gid, md5 string) (times uint32, e error) /* 根据node获取该节点等待做的任务数 参数: node:节点id 返回值: cnt: 任务数 */ GetExpandTaskCount(node string) (cnt uint32, e error) /* 获取扩散任务表中已完成超时检查的时间点(秒数)。 参数: 返回值: tm: 上次检查到的时间点(秒数) */ GetTimeoutExpandTaskCheckedTime() (tm, id int64, e error) /* 更新扩散表已完成超时检查的时间点(秒数)。 参数: tm: 新的时间点(秒数) 返回值: */ UpdateTimeoutExpandTaskCheckedTime(tm, id int64) (e error) /* 获取超时扩展任务 参数: from: 起始时间(秒数) to: 截止时间(秒数) num: 最多获取多少个节点 返回值: tasks: 超时节点列表 */ GetTimeoutExpandTask(from int64, to int64, lastId int64, num int) (nodes []ExpandNode, e error) /* 根据ID批量获取节点 参数: gid: 扩充节点所在分组 md5: 要扩充的文件 返回值: times: 总失败次数 */ GetNodesByIds(ids []string) (nodes []NodeDetail, e error) /* 添加任务节点关系表 */ AddTaskNode(task_id uint64, nids []string) (e error) /* 根据扩散任务id删除所有任务节点值 */ DeleteTaskNodeByTask(id uint64) (e error) DeleteExpandNodeById(id uint64) (e error) DeleteExpandNodeByMd5(md5 string) (e error) /* 根据超时时间删除任务记录 参数: tm:查询超时时间 */ DeleteExpandNodeByTimeOut(tm uint64) (e error) /* 参数: gid: 组id nid: 节点id 返回值: ver:版本 */ GetGroupFileVer(gid, nid string) (ver uint64, e error) /* 参数: gid: 组id nid: 节点id ver: 版本 num: 数量 返回值: files 返回需要文件数组 */ GetGroupFileByVer(gid, nid string, ver uint64, num int) (files []GroupFile, e error) /* 获取某组中已经同步某文件的节点数 参数: gid: 组id ver: 版本 返回值: cnt: 符合条件的节点数 */ GetFileNodesCountByVer(gid string, ver uint64) (cnt uint32, e error) /* 获取超时并可以删除的节点 参数: tm:时间 num:个数 返回值: 获取节点列表 */ GetCanDelTimeoutNodes(tm uint64, num int) (nodes []string, e error) /* 检测是否完成了首次扩散 参数: gid:组id ver:文件版本 返回值: 返回bool */ CheckIsFinishFirstExpand(gid string, ver uint64) (finish bool, e error) /* 根据文件版本和节点状态,获取符合条件的节点数 参数: gid:组id ver:文件版本 state: 节点状态 返回值: 返回num */ GetNodeCountByVerAndState(gid string, ver uint64, state int) (num uint32, e error) /* checker文件中各种携程检测是否启动依据,需要获取改时间戳 */ GetAtomicLastCheckerTm(key string) (tm int64, e error) /* checker文件中各种携程检测是否启动依据,需要更新改时间戳 */ SetAtomicGetLastCheckerTm(key string, tm int64, expire_second int) (e error) /* 修改组的首次扩散完成的版本号 */ UpdateGroupFirstFinishVer(gid string, ver uint64) (e error) /* 获取某个组首次扩散完成的文件版本 */ GetGroupFirstFinishExpandVer(gid string) (finish_ver uint64, e error) /* 获取节点的在线时长 */ GetNodeOnlineTm(nids []string) (node_online_map map[string]int, e error) /* 分页获取节点id */ GetAllNode(begin string) (nodes []string, e error) /* 更新节点在线时长 */ UpdateNodeOnlineCnt(nodeMap map[string]int) (e error) /* 获取某个节点的危险扩充任务列表 参数: nid: 节点ID state: 任务状态 num: 返回的数量上限 返回值: exNodes: 扩充任务列表 */ GetUnSafeExpandTasks(nid string, state int8, num int) (exNodes []UnSafeExpandNode, e error) /* 修改危险任务状态 */ UpdateUnSafeExpandNodeState(id uint64, state int) (e error) /* 获取危险上传信息 参数: id: ID 返回值: exNode: 扩充节点详情,nil - 在e==nil时表示未找到 */ GetUnSafeExpandNodeById(id uint64) (exNode *UnSafeExpandNode, e error) //添加文件到unsafe_group_file中 AddOrUpdateUnSafeFile(gid, md5 string) (e error) /* 批量获取危险扩散节点 参数: exNodes : 任务数组 */ AddOrUpdateUnSafeExpandNodes(exNodes []UnSafeExpandNode) (e error) /* 删除危险文件 参数: gid: md5: */ DeleteUnSafeFile(gid, md5 string) (e error) /* 删除危险文件ExpandNode */ DeleteUnSafeFileExpandNode(gid, node, md5 string) (e error) /* 获取危险文件任务集合 */ GetUnSafeFileExpandNode() (exNodes []UnSafeExpandNode, e error) /* 获取拥有某文件危险 piece 的节点 */ GetHasUnSafeFileNode(gid, md5 string, num uint32, ex_nids []string) (nids []string, e error) /* 更改groupfile的state和add_ver */ UpdateGroupFileStateAndAddVer(gid string, md5 string, state int, add_ver uint64) (e error) /* 获取各组在线节点或离线节点数量 */ GetGroupNodeCountByState(state int) (countMap map[string]int, e error) /* 从config表中获取配置表 */ GetMapFromConfig(configMap map[interface{}]interface{}) (e error) /* 获取active_group<=0的节点 */ GetNodesAGZero(groupCapacity uint64, updateTm, regTm int64, active_groups int8, online_cnt int) (nodes map[string]bool, e error) /* 获取新加入没有任何分组的节点 */ GetNewNodes(groupCapacity uint64, updateTm, regTm int64, online_cnt int) (nodes map[string]bool, e error) /* 获取任务卡主的组和节点 */ GetGroupNodesTaskProcessSlow(nowTm int64) (groupNodesMap map[string]GroupNode, e error) /* 重置未超时且未完成、未失败的任务节点的状态 */ SetExpandNodeStateFailed(node string) (e error) /** 分布式锁,通过redis实现 */ GetLock(db int, key string, expireSec int64, timeout int64) (getLock bool) /** 释放锁 */ UnLock(db int, key string) (e error) }
type NodeDetail ¶
type NodeDetail struct { Peer `json:"peer"` TotalSpace uint64 `json:"total_space"` //魔盒的总存储空间 LeftP2pSpace int64 `json:"left_p2p_space"` //P2P剩余空闲空间(total_space*percent/100-各分组容量之和) Percent int8 `json:"percent"` //节点空间占用比例 UpdateTm int64 `json:"update_tm"` //上次活跃时间 RegTm int64 `json:"reg_tm"` //节点注册时间 ActiveGroups int `json:"activ_groups"` //还未满的分组数量 OnlineTm int64 `json:"online_tm"` //上次在线时间 Weight float64 `json:"weight"` //节点权重 OnlineCount int `json:"online_cnt"` //节点在线时间计数 UpSpeed int64 `json:"up_speed"` //上行带宽字节 Upload int64 `json:"upload"` //上传速度 Download int64 `json:"download"` //下载速度 }
func (*NodeDetail) GetWeight ¶
func (detail *NodeDetail) GetWeight() (weight float64)
func (*NodeDetail) Update ¶
func (detail *NodeDetail) Update(node *Node) (e error)
更新节点详情
参数:
node: 节点基本信息 groups: 节点所在的分组列表
type NodeGroupDetail ¶
type Peer ¶
type Peer struct { ID string `json:"id"` IP string `json:"ip"` Port int32 `json:"port"` UPNPIP string `json:"upnp_ip"` UPNPPort int32 `json:"upnp_port"` NATType int8 `json:"nat_type"` UPNPAvailable int8 `json:"upnp_available"` }
func GetOnlineNodesByIds ¶
func (*Peer) FillUPNPAvailable ¶
func (p *Peer) FillUPNPAvailable()
type UnSafeExpandNode ¶
type UnSafeExpandNode struct { ID uint64 `json:"id"` Group string `json:"group"` Node string `json:"node"` MD5 string `json:"md5"` State int8 `json:"state"` Tm int64 `json:"tm"` //创建时间,秒数 }
危险文件上传任务对象
func GetUnSafeExpandTasks ¶
func GetUnSafeExpandTasks(nid string, state int8, num int) (exNodes []UnSafeExpandNode, e error)
func GetUnSafeFileExpandNode ¶
func GetUnSafeFileExpandNode() (expandNodes []UnSafeExpandNode, e error)
删除危险文件任务
func (*UnSafeExpandNode) IsFinished ¶
func (en *UnSafeExpandNode) IsFinished() bool