p2p_storage

package
v0.0.0-...-1ec2636 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2019 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UNSAFE_EXPAND_STATE_INIT     = 0
	UNSAFE_EXPAND_STATE_FINISHED = 1
)

危险任务状态 0-初始化 1-完成

View Source
const (
	GROUPFILE_TYPE_SPRAND_FIRST = 0
	GROUPFILE_TYPE_NEW_ADD      = 1
)

分组文件状态 0-完成首次扩散 1- 新增文件

View Source
const ADD_FILE_COUNT_PART = 16

添加文件节点数量标准

View Source
const ADD_FILE_TEST_TIME = 1

添加文件时递归尝试次数

View Source
const ADD_P2P_FILE_CONFIG_KEY = "add_p2p_file"

addp2pfile概率key值

View Source
const ALL, NORMAL, DELETED int = 2, 1, 0
View Source
const CHECKER_CREATEGROUP_LAST_TM string = "create_group_last_tm"
View Source
const CHECKER_EXPAND_NODE_PREFIX string = "expand_prefix_"
View Source
const CHECKER_EXPAND_TASK_TIME string = "checker_expand_task_time"
View Source
const CHECKER_GEN_PIECETM_PRIFIX string = "gen_piece_"
View Source
const CHECKER_GROUP_EXPAND string = "group_expand"

检测各分组在线节点数量,并扩张在线数量不足的分组

View Source
const CHECKER_GROUP_FILE_NEW_ADD_TIMEOUT string = "new_gf_timeout"

检测 group_file 中不活跃节点(上次 AddP2pFile时间距离现在超过1周但还未完成的文件)

View Source
const CHECKER_NODE_ONLINETM_CHECKER string = "node_online_checker"
View Source
const CHECKER_ONLINE_NODE = "checker_node_online"

检测在线节点

View Source
const CHECKER_ONLINE_NODE_MIN = 5
View Source
const CHECKER_TASK_PROCESS_SLOW_MIN = 10

检测卡住任务间隔(分钟)

View Source
const CHECKER_TIMEOUT_LAST_TM string = "checker_last_tm"

p2p自启动检测服务执行时间戳

View Source
const CHECK_GROUP_EXPAND_TIME int = 5

分组扩张检测时间间隔(分钟)

View Source
const CON_CONFIG_KEY = "con"

con设置key值

View Source
const CREATE_NEWGROUP_BALANCE_RATIO int = 20

创建新分组标准,当分组总量剩余百分比时,则创建新分组

View Source
const DEFAULT_ADD_P2P_FILE int64 = 0

addp2pfile概率默认值

View Source
const DEFAULT_CON int64 = 20

节点能承受的最大下载并发数con默认值

View Source
const DEFAULT_DELEGATES_NODE_SPEED int64 = 500 * 1024

代理上行速度限制默认值(字节)

View Source
const DEFAULT_GEN_PIECE_LEVEL int64 = 0

genPieceLevel默认值

View Source
const DEFAULT_MAX_HOUR int64 = 8

max_hour默认值

View Source
const DEFAULT_OSS_SPLIT_SIZE int64 = 5 * 1024 * 1024

oss_split_size默认值

View Source
const DEFAULT_P2P_UPSPEED_LIMIT int64 = 1048576

p2p系统节点上行速度限制

View Source
const DEFAULT_SECOND_EXPAND_SPEED int64 = 300 * 1024

二次扩算任务上行速度限制默认值(字节)

View Source
const DEFAULT_TRANS_NODE int64 = 1

node直传默认值

View Source
const DELEGATES_MIN_SPEED_CONFIG_KEY string = "delegate_min_speed"

代理上行速度限制Key值

View Source
const EXPAND_GROUP_ADDRATIO uint32 = 16

组扩散时,节点扩散界限 g.SafePiece+g.MinPiece/16

View Source
const EXPAND_MAX_FAIL_TIMES_EACH_NODE uint32 = 1
View Source
const EXPAND_STATE_FAILED int8 = 4
View Source
const EXPAND_STATE_FINISHED int8 = 3
View Source
const EXPAND_STATE_INIT int8 = 0
View Source
const EXPAND_STATE_NOTIFIED int8 = 1
View Source
const EXPAND_STATE_STARTED int8 = 2
View Source
const EXPAND_TASK_DELETE_TIME int64 = 5 * 24 * 3600

扩散任务删除时间

View Source
const EXPAND_TASK_FINISH_COUNT_PART = 32

扩散完成判断标准

View Source
const EXPAND_TRANS_TYPE_BOTH int8 = 2
View Source
const EXPAND_TRANS_TYPE_NODE int8 = 1
View Source
const EXPAND_TRANS_TYPE_OSS int8 = 0

扩散传输文件方式

View Source
const FIRST_EXPAND_FINISH_NUM int = 160

首次扩散完成节点数

View Source
const GEN_PIECE_LEVEL_CONFIG_KEY = "gen_piece_level"

genPieceLevel设置的key值

View Source
const GID_LEN uint = 32

分组ID的长度

View Source
const GROUP_FILE_NEW_ADD_DIFF_TIME int = 10
View Source
const GROUP_NODE_CAPACITY uint64 = 2 * 1024 * 1024 * 1024

分组在每个节点上所占用的空间(字节)

View Source
const MAX_EXPAND_NODE_NUM uint8 = 1

并行生成碎片的最大节点数量

View Source
const MAX_EXPAND_TAKS_FAIL_NUMS uint8 = 3

扩散任务最大失败次数

View Source
const MAX_EXPAND_TASK_NUM int = 30

每次分配给节点扩充任务的数量上限

View Source
const MAX_FILE_SIZE uint64 = 50 * 1024 * 1024 * 1024

文件尺寸上限

View Source
const MAX_HOUR_CONFIG_KEY string = "max_hour"

max_hour设置key值

View Source
const MAX_NODE_EXPANDTASK_CNT uint32 = 100

节点最大的分配任务数

View Source
const NEW_NODE_ACTIVE_GROUP_COUNT int8 = 0

新节点所属未满分组最大数量

View Source
const NEW_NODE_CREATE_GROUP_COUNT = 208

新节点创建分组数量阈值(节点的数量)

View Source
const NODE_CHECKDELETE_TM int64 = 6

检测超时检点删除间隔时间(小时)

View Source
const NODE_DELETE_TIMEOUT int64 = 30

节点超时则将其从p2p系统移除(天)

View Source
const NODE_EXPAND_GROUP_VALID_TIME int64 = 120

组扩散节点选取节点有效时间(2个汇报周期)

View Source
const NODE_EXPAND_MIN_ONLINE_CNT int = 144

扩散组节点在线时长配置,需要符合online_cnt数

View Source
const NODE_MAX_ACTIVE_GROUPS int8 = 11

节点所属的未满分组的最大数量

View Source
const NODE_MIN_ACTIVE_GROUPS int8 = 9
View Source
const NODE_OCCUPY_PERCENT int8 = 50

节点空间占用比例(%)

View Source
const NODE_ONLINETM_INTERVAL_TM int = 1

节点在线检测间隔(小时)

View Source
const NODE_TAKS_MAX_NUM int8 = 100

node所能同时执行的接收任务数

View Source
const NODE_VALID_AFTER_REGTM int64 = 7 * 86400

节点老化时长

View Source
const NODE_VALID_TIME int64 = 600

节点的有效期(秒数)

View Source
const NORMAL_NODE_OCCUPY_PERCENT int8 = 1

普通节点(非超级硬盘)空间占用比例(%)

View Source
const ONLINE, OFFLINE int = 1, 0
View Source
const OSS_SPLIT_SIZE_CONFIG_KEY = "oss_split_size"

oss_split_size设置key值

View Source
const P2P_DOWNLOAD_CACHE = "download_cache"

p2p节点开启下载piece缓存

View Source
const P2P_MERGE_PIECE = "merge_piece"

p2p节点开启小文件合并

View Source
const P2P_UPSPEED_LIMIT_KEY = "p2p_upspeed_limit"

p2p系统节点上行速度限制key值

View Source
const PIECE_MIN_NUM uint32 = 32
View Source
const PIECE_PERFECT_NUM uint32 = 64
View Source
const PIECE_SAFE_NUM uint32 = 48
View Source
const PIECE_SIZE uint32 = 1024
View Source
const SPREAD_MIN_SPEED_CONFIG_KEY string = "sprand_min_speed"

二次扩散任务上行速度限制Key值

View Source
const TASK_PROCESS_SLOW_TM = 3600

定义任务卡住的超时用时

View Source
const TRANS_NODE_CONFIG_KEY = "trans_node"

node直传的key值 1 BOTH 0 OSS

View Source
const UNSAFE_EXPAND_DEL_ALIVE = 0
View Source
const UNSAFE_EXPAND_DEL_DELETE = 1

危险任务删除标志

View Source
const UPDATE_CONFIG_MAP string = "update_config_map"

更新ConfigMap

View Source
const UPDATE_CONFIG_MAP_TIME int = 60

更新ConfigMap时间间隔(秒)

View Source
const YES, NO int = 1, 0

Variables

View Source
var GROUP_CONFIG []GroupPieceInfo = []GroupPieceInfo{{1024, 32, 48, 64}, {1024, 64, 96, 128}, {1024, 128, 160, 208}}
View Source
var P2pGetLockTimeOut int64 = 1 //所有使用同步锁的地方,超过该值还未获取到时,这直接放弃,业务需要根据实际情况来处理(单位秒)
View Source
var P2pLockExpireSec int64 = 5 //同步锁到期时间5秒(单位秒)

Functions

func AddNode

func AddNode(id string) (e error)

func AddOrUpdateUnSafeExpandNode

func AddOrUpdateUnSafeExpandNode(gid, md5 string, nodes []GroupNode) (e error)

添加unsafe_expand_node

func AddOrUpdateUnSafeFile

func AddOrUpdateUnSafeFile(gid, md5 string) (e error)

添加unsafe_file

func AddP2PFile

func AddP2PFile(md5, src_node string, size uint64, times int, add_no_source_file bool) (task_id int64, e error)

新版逻辑独立添加文件逻辑,通过查找符合条件的节点,然后确定分组,然后生成任务,并返回

func AddTaskNode

func AddTaskNode(task_id uint64, nids []string) (e error)

添加节点任务

func CalculateExpandNodeTimeout

func CalculateExpandNodeTimeout(state int8) (timeout int64)

func CalculateFileSize

func CalculateFileSize(size uint64) (file_size uint32)

func CanDownloadFile

func CanDownloadFile(nid, md5 string) (yes bool, e error)

节点是否有权下载此文件。节点所属的分组中必须含有此文件,或者节点自身就含有此文件。

func CheckFileOssExist

func CheckFileOssExist(md5 string) (ossExist int, e error)

根据某任务删除某节点全部数据

func DeleteFile

func DeleteFile(md5 string) (e error)

func DeleteNode

func DeleteNode(id string) (e error)

func DeleteTaskNodeByTask

func DeleteTaskNodeByTask(id uint64) (e error)

根据某任务删除某节点全部数据

func DeleteUnSafeFile

func DeleteUnSafeFile(gid, md5 string) (e error)

删除危险文件

func DeleteUnSafeFileExpandNode

func DeleteUnSafeFileExpandNode(gid, node, md5 string) (e error)

删除危险文件任务

func DoUpdateGroupFirstExpandVer

func DoUpdateGroupFirstExpandVer(gid string, old_finish_ver uint64) (e error)

修改分组中完成首次扩散文件的版本号

func Download

func Download(md5 string) (nodes []Peer, group *Group, sources []Peer, e error)

获取文件的下载节点

参数:

md5: 文件的md5

返回值:

nodes: 可用的节点列表
group: 这些节点所在的分组信息
sources: 源文件所在的节点(都是在线的)

func DownloadMore

func DownloadMore(md5 string, usedGroups []string) (nodes []Peer, group *Group, sources []Peer, e error)

获取更多文件的下载节点,排除掉已经用过的分组

参数:

md5: 文件的md5
usedGroups: 已经用过的分组

返回值:

nodes: 可用的节点列表,最多返回拼回原始数据所需要最小节点数的1倍,客户端如果还是下载不成功,
	   则认为下载失败。
group: 节点所属分组信息
sources: 源文件所在的节点(都是在线的)

func ExpandFinished

func ExpandFinished(id uint64, state int8) (e error)

扩散结束

参数:

id:12321
state: 0-失败,1-成功

返回值:

func ExpandGroupToPerfectSize

func ExpandGroupToPerfectSize(gid string) (e error)

func GenPiece

func GenPiece(gid, nid, md5 string) (e error)
请求生成碎片

参数:

gid: 分组ID
nid: 节点ID
md5: 文件md5

返回值:

func GetAvailableNode

func GetAvailableNode(num int) (node []string, e error)

获取可以添加文件的节点(获取当前可以添加组的节点)

func GetChecksum

func GetChecksum(md5 string) (checksum string, e error)

func GetExpandTask

func GetExpandTask(gid, nid, md5 string) (nodes []string, file *GroupFile, group *Group, e error)

获取扩充节点任务的相关信息

参数:

gid: 分组ID
nid: 节点ID
md5: 文件md5

返回值:

nodes: 需要碎片的节点ID

func GetExpandTaskById

func GetExpandTaskById(id uint64) (nodes []string, file *GroupFile, group *Group, exNode *ExpandNode, e error)

获取扩充节点任务的相关信息

参数:

id: Id

返回值:

nodes: 需要碎片的节点ID

func GetExpandTaskLevel

func GetExpandTaskLevel(gid string, ver uint64) (level int8, e error)

根据条件确定任务优先级

func GetHasUnSafeFileNode

func GetHasUnSafeFileNode(gid, md5 string, num uint32, ex_nids []string) (nids []string, e error)

func GetUnSafeExpandTaskById

func GetUnSafeExpandTaskById(id uint64) (exNode *UnSafeExpandNode, file *GroupFile, group *Group, e error)

获取危险任务

参数:

id: Id

返回值:

func IncrGroupFileVer

func IncrGroupFileVer(gid, md5 string) (e error)

修改文件版本号,修改版本前需要查看该版本(节点历史最高版本)是否有足够节点,没有才需要添加版本

参数:

md5: 文件md5

func Init

func Init(ds IDataSource, lg *log.MLogger, open_check bool) (e error)

func InvalidFile

func InvalidFile(nid, gid, md5 string) (e error)

无效文件汇报,该文件无法生成piece 将会从p2p系统中删除,移入问题文件表

func IsAvailable

func IsAvailable(md5 string) (ok bool, e error)

文件是否可用

参数:

md5: 文件的md5

func IsExists

func IsExists(md5 string) (exist bool, e error)

文件是否存在

参数:

md5: 文件的md5

func IsExistsMore

func IsExistsMore(md5s []string) (m map[string]bool, e error)

批量检测文件是否存在

参数:

md5: 文件的md5

func MinFileSize

func MinFileSize(nums map[uint32]uint64) (file_size uint32)

func P2PExpandFinished

func P2PExpandFinished(id uint64, state int8) (e error)

扩散结束

参数:

id:12321
state: 0-失败,1-成功

返回值:

func RestartInitExpandNodeState

func RestartInitExpandNodeState(node string) (e error)

重置重启节点的任务状态

func UnSafeExpandFinished

func UnSafeExpandFinished(id uint64, state int) (e error)

扩散结束

参数:

id:12321
state: 0-失败,1-成功

返回值:

func UpdateChecksum

func UpdateChecksum(md5, checksum string) (e error)

func UpdateNode

func UpdateNode(node *Node, groupVersions map[string]uint64, tasks []uint64, is_super int) (groups []NodeGroupDetail, exNodes []ExpandNode, e error)

更新节点及其分组信息

参数:

node: 节点信息
groupVersions: 节点所属分组的文件同步版本号
tasks: 正在扩散的任务的心跳

返回值:

groups: 节点所属分组的最新信息
exNodes: 需要此节点执行扩充任务的文件列表

func UpdateNode2

func UpdateNode2(node *Node, groupVersions map[string]uint64, tasks []uint64, is_super int) (returnGroups []NodeGroupDetail, exNodes []ExpandNode, deleteGids []string, e error)

更新节点及其分组信息

参数:

node: 节点信息
groupVersions: 节点所属分组的文件同步版本号
tasks: 正在扩散的任务的心跳

返回值:

groups: 节点所属分组的最新信息
exNodes: 需要此节点执行扩充任务的文件列表

func UpdateNodeWeight

func UpdateNodeWeight(nids []string) (e error)

创建分组或者扩容后需要修改节点权重

Types

type ConfigSet

type ConfigSet struct {
	ConfigValue *safe_map.SafeMap
}

ConfigSet set

var ConfigMap *ConfigSet

func NewConfigSet

func NewConfigSet() *ConfigSet

NewConfigSet 新建一个ConfigSet

func (*ConfigSet) FlushConfigValue

func (cs *ConfigSet) FlushConfigValue() (err error)

func (*ConfigSet) GetFloat64Value

func (cs *ConfigSet) GetFloat64Value(key string) (value float64, err error)

func (*ConfigSet) GetInt64Value

func (cs *ConfigSet) GetInt64Value(key string) (value int64, err error)

func (*ConfigSet) GetStringValue

func (cs *ConfigSet) GetStringValue(key string) (value string, err error)

func (*ConfigSet) InitConfigValue

func (cs *ConfigSet) InitConfigValue()

type DataSource

type DataSource struct {
	//数据接口的实现
	Raw IDataSource
}

func (*DataSource) AddFileToGroup

func (ds *DataSource) AddFileToGroup(gid string, g *Group, file *GroupFile, fileVer uint64) (e error)

func (*DataSource) FetchExpandTasks

func (ds *DataSource) FetchExpandTasks(nid string, num int) (exnodes []ExpandNode, e error)

取出还未通知节点的任务列表

参数:

nid: 节点ID
num: 最多返回的个数

返回值:

exNodes: 任务列表

func (*DataSource) FillEmptyGroupFile

func (ds *DataSource) FillEmptyGroupFile(gid string) (e error)

func (*DataSource) GetNoFileNodes

func (ds *DataSource) GetNoFileNodes(gid, md5 string) (nodes []string, e error)

获取分组中没有该文件碎片的节点

参数:

gid: 分组ID
md5: 文件的md5

返回值:

nodes: 没有该文件的节点列表

func (*DataSource) GetNodesByIds

func (ds *DataSource) GetNodesByIds(ids []string) (nodes []NodeDetail, e error)
   批量获取节点ids

   参数:
		ids:节点ids
   返回值:
   		exNodes: 节点列表

func (*DataSource) GetOnlineNodesByIds

func (ds *DataSource) GetOnlineNodesByIds(ids []string, min_update_tm int64) (peers []Peer, e error)

func (*DataSource) GetOnlineSourceFileNodes

func (ds *DataSource) GetOnlineSourceFileNodes(md5 string, num int) (peers []Peer, e error)
   随机获取源文件所在的节点列表

   参数:
   		md5: 文件的md5
		num: 需要的数量
   返回值:
   		peers: 拥有原始文件的节点ID列表

func (*DataSource) IsFileExists

func (ds *DataSource) IsFileExists(md5 string) (ok bool, e error)

func (*DataSource) IsMoreFileExists

func (ds *DataSource) IsMoreFileExists(md5s []string) (m map[string]bool, e error)

func (*DataSource) IsP2PFileExists

func (ds *DataSource) IsP2PFileExists(md5 string) (ok bool, file *GroupFile, e error)

判断是否需要添加该文件

func (*DataSource) UpdateExpandNodeState

func (ds *DataSource) UpdateExpandNodeState(gid, nid, md5 string, state int8) (e error)
   更新任务状态

   参数:
   		gid: 分组ID
   		nid: 节点ID
		md5: 文件
   		state: 任务状态
		increment_failed_times: 是否增加失败次数
   返回值:
   		exNodes: 任务列表

func (*DataSource) UpdateExpandNodeTimeout

func (ds *DataSource) UpdateExpandNodeTimeout(id uint64) (e error)
   更新任务超时时间

   参数:
   		gid: 分组ID
   		nid: 节点ID
		md5: 文件
   返回值:
   		exNodes: 任务列表

func (*DataSource) UpdateGroupSize

func (ds *DataSource) UpdateGroupSize(isAdd bool, group *Group, size uint64) (e error)

更新分组大小

参数:

isAdd: 是否是添加文件,false表示是删除文件
gid: 分组的ID
size: 文件大小

返回值:

type ExpandNode

type ExpandNode struct {
	ID          uint64 `json:"id"`
	Group       string `json:"group"`
	Node        string `json:"node"`
	MD5         string `json:"md5"`
	State       int8   `json:"state"`
	Tm          int64  `json:"tm"`           //创建时间,秒数
	Timeout     int64  `json:"timeout"`      //超时时间,秒数
	FailedTimes uint32 `json:"failed_times"` //扩散失败次数
	Size        uint64 `json:"size"`         //文件大小
	Level       int8   `json:"level"`        //任务优先级
	Ver         uint64 `json:"ver"`          //文件在组中的版本
}

func (*ExpandNode) IsFinished

func (en *ExpandNode) IsFinished() bool

type File

type File struct {
	MD5  string `json:"md5"`
	Size uint64 `json:"size"`
}

文件信息

type Group

type Group struct {
	ID             string `json:"id"`
	Size           uint64 `json:"size"`
	FileSize       uint32 `json:"file_size"`        //分组文件大小范围,1: 1-10M,10: 10-100M, 100: 100-1000M,以此类推
	PieceSize      uint32 `json:"piece_size"`       //碎片大小
	MinPieces      uint32 `json:"min_pieces"`       //最小碎片数(能恢复出原始数据的最小碎片数)
	SafePieces     uint32 `json:"safe_pieces"`      //安全碎片数(小于该数量就会出发再扩散流程)
	PerfectPieces  uint32 `json:"perfect_pieces"`   //再扩散后要达到的碎片数
	FirstFinishVer uint64 `json:"first_finish_ver"` //组中首次扩散完成的文件版本
	DeletedVer     uint64 `json:"deleted_ver"`      //组中删除的文件版本号
}

客户端汇报上来的节点信息

func CheckNewNodeAndCreateGroup

func CheckNewNodeAndCreateGroup(tar_node string) (group *Group, e error)

检测新加入节点数量并创建分组

func CreateGroup

func CreateGroup() (group *Group, e error)

func GetNodeAvailableGroup

func GetNodeAvailableGroup(node string) (group *Group, e error)

func (*Group) AddFile

func (group *Group) AddFile(md5 string, src_node string, size uint64) (e error)

向分组中添加文件

参数:

md5: 文件md5
size: 文件大小

func (*Group) AddP2PFile

func (group *Group) AddP2PFile(md5 string, size uint64, src_node string, fileVer uint64) (e error)

向分组中添加新增文件

参数:

md5: 文件md5
size: 文件大小

func (*Group) DeleteFile

func (group *Group) DeleteFile(file *GroupFile) (e error)

删除分组中文件

参数:

md5: 文件md5

func (*Group) ExpandNodes

func (group *Group) ExpandNodes(num uint32, active_groups int8, nid string) (e error)

扩充分组中的节点

参数:

num: 要扩充的节点数量

func (*Group) ExpandNodesToGroup

func (group *Group) ExpandNodesToGroup(nodes map[string]bool) (e error)

将节点扩充至分组

参数:

nodes: 要扩充至分组的节点(须保证不与组内现有节点重复)

func (*Group) ExpandNodesToPerfectSize

func (group *Group) ExpandNodesToPerfectSize(active_groups int8, nid string) (e error)

扩充分组中的节点到PerfectSize

参数:

type GroupFile

type GroupFile struct {
	File      `json:"file"`
	Ver       uint64 `json:"ver"`
	State     int    `json:"state"`       //1-NORMAL,0-DELETED
	Group     string `json:"group"`       //文件所在组
	Type      int    `json:"type"`        //文件类型 0-首次扩散 1-新增
	AddVer    uint64 `json:"add_ver"`     // 新增文件版本
	LastAddTm uint64 `json:"last_add_tm"` // 新增文件版本
	SrcNode   string `json:"src_node"`    // 文件源节点
}

分组文件信息

func ListUpdatedFiles

func ListUpdatedFiles(gid string, ver uint64, num int, tp int) (files []GroupFile, e error)

获取分组中版本号大于ver的文件列表

参数:

gid: 分组ID
ver: 起始版本号(不包括本版本号)
num: 获取文件数量

返回值:

files: 文件列表

func (*GroupFile) IsNewAdd

func (gf *GroupFile) IsNewAdd() (ok bool)

func (*GroupFile) ToString

func (gf *GroupFile) ToString() (val string)

type GroupNode

type GroupNode struct {
	Node   string `json:"node"`
	Ver    uint64 `json:"ver"`
	State  int    `json:"state"` //ONLINE/OFFLINE
	MaxVer uint64 `json:"max_ver"`
}

func GetGroupNodes

func GetGroupNodes(gid string) ([]GroupNode, error)

获取分组节点

type GroupPieceInfo

type GroupPieceInfo struct {
	PieceSize     uint32 `json:"piece_size"`     //碎片大小
	MinPieces     uint32 `json:"min_pieces"`     //最小碎片数(能恢复出原始数据的最小碎片数)
	SafePieces    uint32 `json:"safe_pieces"`    //安全碎片数(小于该数量就会出发再扩散流程)
	PerfectPieces uint32 `json:"perfect_pieces"` //再扩散后要达到的碎片数
}

type IDataSource

type IDataSource interface {
	/*
		自增ID
	*/
	AtomicIncrID(key string) (uint64, error)
	/*
		获取自增ID,如果key不存在,返回0
	*/
	GetIncrID(key string) (uint64, error)
	/*
	   获取节点表已完成超时检查的时间点(秒数)。

	   参数:
	   返回值:
	   		tm: 上次检查到的时间点(秒数)
	*/
	GetTimeoutNodeCheckedTime() (tm int64, e error)
	/*
	   更新节点表已完成超时检查的时间点(秒数)。

	   参数:
	   		tm: 新的时间点(秒数)
	   返回值:
	*/
	UpdateTimeoutNodeCheckedTime(tm int64) (e error)
	/*
	   获取state状态下文件在各分组的详情

	   参数:
	   		md5: 文件的md5
	   		state: 文件的状态,ALL/NORMAL/DELETED
	   返回值:
	   		files: 文件分组详情,key-分组ID
	*/
	GetFileGroups(md5 string, state int) (files map[string]GroupFile, e error)
	//获取新增超时文件
	GetNewAddTimeOutGroupFile(tm int64, num int) (files []GroupFile, e error)
	/*
		   随机获取源文件所在的节点列表

		   参数:
		   		md5: 文件的md5
				num: 需要的数量
		   返回值:
		   		ids: 拥有原始文件的节点ID列表
	*/
	GetSourceFileNodes(md5 string, num int) (ids []string, e error)
	/*
		   节点自身是否有这个文件

		   参数:
		   		md5: 文件的md5
				nid: 节点ID
		   返回值:
		   		yes: 是否存在
	*/
	IsNodeHasFile(nid string, md5 string) (yes bool, e error)
	/*
	   获取文件在系统中的数量

	   参数:
	   		md5: 文件的md5
	   返回值:
	   		count: 数量
	*/
	GetSourceFileCount(md5 string) (count int, e error)
	/*
		   根据节点ID筛选其中在线的,并返回Peer信息

		   参数:
		   		ids: 候选节点
				timeout: 超时的时间点
		   返回值:
		   		peers: 当前在线的节点信息
	*/
	GetOnlinePeers(ids []string, timeout int64) (peers []Peer, e error)
	/*
		   获取文件详情

		   参数:
		   		md5: 文件的md5
				state: 状态
		   返回值:
		   		file: 文件详情,nil-在e==nil时表示未找到
	*/
	GetFileByMd5AndState(md5 string, state int) (files []GroupFile, e error)
	/*
	   获取分组文件详情

	   参数:
	   		gid: 分组ID
	   		md5: 文件的md5
	   返回值:
	   		file: 文件详情,nil-在e==nil时表示未找到
	*/
	GetGroupFile(gid, md5 string) (file *GroupFile, e error)
	/*
		   获取版本号大于ver的文件详情

		   参数:
		   		gid: 分组ID
		   		ver: 起始版本号
				num: 要获取的数量
		   返回值:
		   		files: 文件分组详情
	*/
	ListUpdatedFiles(gid string, ver uint64, num int, tp int) (files []GroupFile, e error)
	/*
		根据分组中的文件计算分组的大小,要剔除掉已删除的文件
	*/
	CalculateGroupSize(gid string) (e error)
	/*
		获取包含该文件的分组数量,不包括文件状态是已删除的分组
	*/
	GetFileGroupsCount(md5 string) (count int, e error)

	/*
		获取包含该文件的分组数量,不包括文件状态是已删除的分组
	*/
	GetMoreFileGroupsCount(md5s []string) (m map[string]bool, e error)

	/*
		向分组添加文件

		参数:
			gid: 分组ID
			file: 文件详情
		返回值:
	*/
	AddFileToGroup(gid string, file *GroupFile) (e error)
	UpdateGroupFile(gid string, file *GroupFile) (e error)
	UpdateGroupFileTpAndVer(gid, md5 string, ver uint64) (e error)
	DeleteGroupFile(gid string, md5 string) (e error)
	//添加文件版本
	IncrFileVer(gid string, md5 string, ver uint64) (e error)

	/*
	   获取超时的节点

	   参数:
	   		from: 起始时间(秒数)
	   		to: 截止时间(秒数)
	   		num: 最多获取多少个节点
	   返回值:
	   		nodes: 超时节点列表
	*/
	GetTimeoutNodes(from int64, to int64, num int) (nodes []NodeDetail, e error)
	AddNode(node *NodeDetail) (e error)
	DeleteNode(id string) (e error)
	IsNodeExist(nid string) (exist bool, e error)
	UpdateNode(node *NodeDetail) (e error)
	UpdateNodeWeight(nid string, weight float64) (e error)
	/*
		获取剩余空间足够的在线节点,注册时间也要超过一定时间

		参数:
			groupCapacity: 分组的容量(字节)
			updateTm: 上次更新时间必须晚于此时刻
			regTm: 注册时间必须早于此时间点
			num: 需要的节点数量
		返回值:
			nodes: 节点ID列表
	*/
	GetAvailableNodes(groupCapacity uint64, updateTm, regTm int64, offset, num uint32, active_groups int8, online_cnt int) (nodes []string, e error)
	GetAvailableNodesCount(groupCapacity uint64, updateTm, regTm int64, activ_groups int8, online_cnt int) (num uint32, e error)
	//获取最近更新且符合添加到组的节点
	GetAvailableNode(groupCapacity uint64, updateTm, regTm int64, online_cnt, num int) (nodes []string, e error)
	/*
		获取占用空间最小的分组

		参数:
			groupCapacity: 分组的容量(字节)
			fileSize: 分组文件大小范围
		返回值:
			group: 分组详情
	*/
	GetAvailableGroup(fileSize uint32) (group *Group, e error)
	GetGroup(gid string) (group *Group, e error)
	GetAllGroup() (groups map[string]Group, e error)
	AddGroup(group *Group) (e error)
	UpdateGroupSize(group *Group, filesize int64) (e error)
	/*
		获取每种文件尺寸范围的可用分组数量

		参数:
			groupCapacity: 分组的容量(字节)
		返回值:
			groups: 分组数量,key-file_size,value-数量
	*/
	GetActiveGroupsCount(groupCapacity uint64) (groups map[uint32]uint32, e error)
	/*
		获取每种文件尺寸范围的可用分组剩余空间

		参数:
			groupCapacity: 分组的容量(字节)
		返回值:
			groups: 分组剩余空间,key-file_size,value-字节
	*/
	GetActiveGroupsLeftSpace(groupCapacity uint64) (groups map[uint32]uint64, e error)
	/*
		向分组添加节点

		参数:
			gid: 分组ID
			node: 节点信息
		返回值:
	*/
	AddNodeToGroup(gid string, node *GroupNode) (e error)
	UpdateGroupNode(gid string, node *GroupNode, isVerChange bool) (e error)
	DeleteGroupNode(gid, nid string) (e error)
	/*
		获取分组中节点版本号>=ver的在线节点
	*/
	GetFileNodes(gid string, ver uint64) (nodes []Peer, e error)
	/*
		获取分组中节点版本号<ver的所有节点,包括不在线的
	*/
	GetNoFileNodes(gid string, ver uint64) (nodes []string, e error)
	/*
		获取分组中所有节点的MD5值,包括不在线的
	*/
	GetAllFileNodes(gid string) (nodes []string, e error)
	GetGroupNodes(gid string) (nodes []GroupNode, e error)
	/*
		随机获取分组中一个在线的节点
	*/
	GetRandomGroupNode(gid string) (node *GroupNode, e error)
	/*
	   获取节点详情

	   参数:
	   		nid: 节点ID
	   返回值:
	   		detail: 节点详情,nil-在e==nil时表示未找到
	*/
	GetNodeDetail(nid string) (detail *NodeDetail, e error)
	/*
		获取节点所属的group集合

		参数:
			nid: 节点ID
	*/
	GetNodeGroups(nid string) (groups []Group, e error)

	/*
		随机获取 一个可用分组
	*/
	GetRandomNodeGroup(nid string) (group Group, e error)

	/*
		获取节点所属的group数量

		参数:
			nid: 节点ID
	*/
	GetNodeGroupCount(nid string) (num uint32, e error)
	/*
		获取节点所在分组详情

		参数:
			nid: 节点ID
	*/
	GetNodeGroupDetail(nid string) (groups []NodeGroupDetail, e error)
	GetNodeGroupState(nid string) (groups map[string]GroupNode, e error)
	/*
		获取分组中在线节点的数量

		参数:
			gid: 分组ID
	*/
	GetGroupOnlineNodesCount(gid string) (num uint32, e error)
	/*
		获取UPNP可用的节点列表,按节点更新时间升序排列返回
	*/
	GetUPNPAvailableNodes(num int, updateTm int64) (nodes []Peer, e error)

	AddToInvalidFile(nid, gid, md5 string, tm int64) (e error)

	UpdateChecksum(md5, checksum string) (e error)
	/*
			获取文件的checksum

		   参数:
		   		md5: 文件的md5
		   返回值:
		   		checksum: 校验和,"" - 在e==nil时表示未找到
	*/
	GetChecksum(md5 string) (checksum string, e error)

	IncrementActiveGroups(nid string) (e error)

	/*
			获取当前正在扩充的节点列表

		   参数:
		   		gid: 分组ID
		   		md5: 文件的md5
		   返回值:
		   		exNodes: 正在扩充节点列表
	*/
	GetValidExpandNodes(gid, md5 string) (exNodes []ExpandNode, e error)
	/*
			获取扩充节点信息

		   参数:
		   		gid: 分组ID
				nid: 节点ID
		   		md5: 文件的md5
		   返回值:
		   		exNode: 扩充节点详情,nil - 在e==nil时表示未找到
	*/
	GetExpandNode(gid, nid, md5 string) (exNode *ExpandNode, e error)

	/*
			获取扩充节点信息

		   参数:
		   		id: ID
		   返回值:
		   		exNode: 扩充节点详情,nil - 在e==nil时表示未找到
	*/
	GetExpandNodeById(id uint64) (exNode *ExpandNode, e error)

	/*
			获取某个节点的扩充任务列表

		   参数:
				nid: 节点ID
				state: 任务状态
				num: 返回的数量上限
		   返回值:
		   		exNodes: 扩充任务列表
	*/
	GetExpandTasks(nid string, state int8, num int) (exNodes []ExpandNode, e error)

	/*
			添加或更新扩充节点信息

		   参数:
		   		exNodes: 扩充节点详情
		   返回值:
	*/
	AddOrUpdateExpandNode(exNode *ExpandNode) (task_id int64, e error)

	/*
			更新扩充节点状态

		   参数:
		   		gid: 扩充节点所在分组
		   		nid: 扩充节点ID
				md5: 要扩充的文件
				state: 扩充节点状态
		   返回值:
	*/
	UpdateExpandNodeState(gid, nid, md5 string, state int8, timouet int64, increment_failed_times bool) (e error)

	/*
			批量更新扩充节点状态

		   参数:
		   		nid: 扩充节点ID
				state: 扩充节点状态
		   返回值:
	*/
	UpdateExpandNodesState(nid string, state int8, timouet int64) (e error)
	/*
			更新扩充节点超时时间

		   参数:
		   		gid: 扩充节点所在分组
		   		nid: 扩充节点ID
				md5: 要扩充的文件
		   返回值:
	*/
	UpdateExpandNodeTimeout(id uint64, timouet int64) (e error)
	/*
			删除扩充节点

		   参数:
		   		gid: 扩充节点所在分组
		   		nid: 扩充节点ID
				md5: 要扩充的文件
		   返回值:
	*/
	DeleteExpandNode(gid, nid, md5 string) (e error)

	/*
			获取分组文件总失败次数

			参数:
				gid: 扩充节点所在分组
				md5: 要扩充的文件
		 	返回值:
				times: 总失败次数
	*/
	GetExpandTaskTotalFailedTimes(gid, md5 string) (times uint32, e error)

	/*
		根据node获取该节点等待做的任务数

		参数:
			node:节点id
		返回值:
			cnt: 任务数
	*/
	GetExpandTaskCount(node string) (cnt uint32, e error)

	/*
	   获取扩散任务表中已完成超时检查的时间点(秒数)。

	   参数:
	   返回值:
	   		tm: 上次检查到的时间点(秒数)
	*/
	GetTimeoutExpandTaskCheckedTime() (tm, id int64, e error)

	/*
	   更新扩散表已完成超时检查的时间点(秒数)。

	   参数:
	   		tm: 新的时间点(秒数)
	   返回值:
	*/
	UpdateTimeoutExpandTaskCheckedTime(tm, id int64) (e error)

	/*
	   获取超时扩展任务

	   参数:
	   		from: 起始时间(秒数)
	   		to: 截止时间(秒数)
	   		num: 最多获取多少个节点
	   返回值:
	   		tasks: 超时节点列表
	*/
	GetTimeoutExpandTask(from int64, to int64, lastId int64, num int) (nodes []ExpandNode, e error)
	/*
		根据ID批量获取节点

			参数:
				gid: 扩充节点所在分组
				md5: 要扩充的文件
		 	返回值:
				times: 总失败次数
	*/
	GetNodesByIds(ids []string) (nodes []NodeDetail, e error)

	/*
		添加任务节点关系表
	*/
	AddTaskNode(task_id uint64, nids []string) (e error)

	/*
		根据扩散任务id删除所有任务节点值
	*/
	DeleteTaskNodeByTask(id uint64) (e error)

	DeleteExpandNodeById(id uint64) (e error)
	DeleteExpandNodeByMd5(md5 string) (e error)

	/*
		根据超时时间删除任务记录

		参数:
			tm:查询超时时间
	*/
	DeleteExpandNodeByTimeOut(tm uint64) (e error)

	/*
		   参数:
		   		gid: 组id
				nid: 节点id
		   返回值:
		   		ver:版本
	*/
	GetGroupFileVer(gid, nid string) (ver uint64, e error)

	/*
		   参数:
		   		gid: 组id
				nid: 节点id
				ver: 版本
				num: 数量
		   返回值:
		   	 	files 返回需要文件数组
	*/
	GetGroupFileByVer(gid, nid string, ver uint64, num int) (files []GroupFile, e error)

	/*
		获取某组中已经同步某文件的节点数
			   参数:
				   		gid: 组id
						ver: 版本
			   返回值:
				   		cnt:  符合条件的节点数
	*/
	GetFileNodesCountByVer(gid string, ver uint64) (cnt uint32, e error)

	/*
		获取超时并可以删除的节点
		参数:
				tm:时间
				num:个数
		返回值:
				获取节点列表
	*/
	GetCanDelTimeoutNodes(tm uint64, num int) (nodes []string, e error)

	/*
		检测是否完成了首次扩散
		参数:
			gid:组id
			ver:文件版本
		返回值:
		    返回bool
	*/
	CheckIsFinishFirstExpand(gid string, ver uint64) (finish bool, e error)

	/*
		根据文件版本和节点状态,获取符合条件的节点数
		参数:
			gid:组id
			ver:文件版本
			state: 节点状态
		返回值:
		    返回num
	*/
	GetNodeCountByVerAndState(gid string, ver uint64, state int) (num uint32, e error)

	/*
		checker文件中各种携程检测是否启动依据,需要获取改时间戳
	*/
	GetAtomicLastCheckerTm(key string) (tm int64, e error)

	/*
		checker文件中各种携程检测是否启动依据,需要更新改时间戳
	*/
	SetAtomicGetLastCheckerTm(key string, tm int64, expire_second int) (e error)

	/*
		修改组的首次扩散完成的版本号
	*/
	UpdateGroupFirstFinishVer(gid string, ver uint64) (e error)

	/*
		获取某个组首次扩散完成的文件版本
	*/
	GetGroupFirstFinishExpandVer(gid string) (finish_ver uint64, e error)

	/*
		获取节点的在线时长
	*/
	GetNodeOnlineTm(nids []string) (node_online_map map[string]int, e error)

	/*
		分页获取节点id
	*/
	GetAllNode(begin string) (nodes []string, e error)

	/*
		更新节点在线时长
	*/
	UpdateNodeOnlineCnt(nodeMap map[string]int) (e error)

	/*
			获取某个节点的危险扩充任务列表

		   参数:
				nid: 节点ID
				state: 任务状态
				num: 返回的数量上限
		   返回值:
		   		exNodes: 扩充任务列表
	*/
	GetUnSafeExpandTasks(nid string, state int8, num int) (exNodes []UnSafeExpandNode, e error)

	/*
		修改危险任务状态
	*/
	UpdateUnSafeExpandNodeState(id uint64, state int) (e error)

	/*
			获取危险上传信息

		   参数:
		   		id: ID
		   返回值:
		   		exNode: 扩充节点详情,nil - 在e==nil时表示未找到
	*/
	GetUnSafeExpandNodeById(id uint64) (exNode *UnSafeExpandNode, e error)

	//添加文件到unsafe_group_file中
	AddOrUpdateUnSafeFile(gid, md5 string) (e error)

	/*
		批量获取危险扩散节点

		参数:
			exNodes : 任务数组
	*/
	AddOrUpdateUnSafeExpandNodes(exNodes []UnSafeExpandNode) (e error)

	/*
		删除危险文件

		参数:
			gid:
			md5:
	*/
	DeleteUnSafeFile(gid, md5 string) (e error)

	/*
	 删除危险文件ExpandNode

	*/
	DeleteUnSafeFileExpandNode(gid, node, md5 string) (e error)

	/*
		获取危险文件任务集合
	*/
	GetUnSafeFileExpandNode() (exNodes []UnSafeExpandNode, e error)

	/*
		获取拥有某文件危险 piece 的节点
	*/
	GetHasUnSafeFileNode(gid, md5 string, num uint32, ex_nids []string) (nids []string, e error)

	/*
		更改groupfile的state和add_ver
	*/
	UpdateGroupFileStateAndAddVer(gid string, md5 string, state int, add_ver uint64) (e error)
	/*
		获取各组在线节点或离线节点数量
	*/
	GetGroupNodeCountByState(state int) (countMap map[string]int, e error)
	/*
		从config表中获取配置表
	*/
	GetMapFromConfig(configMap map[interface{}]interface{}) (e error)
	/*
		获取active_group<=0的节点
	*/
	GetNodesAGZero(groupCapacity uint64, updateTm, regTm int64, active_groups int8, online_cnt int) (nodes map[string]bool, e error)
	/*
		获取新加入没有任何分组的节点
	*/
	GetNewNodes(groupCapacity uint64, updateTm, regTm int64, online_cnt int) (nodes map[string]bool, e error)
	/*
		获取任务卡主的组和节点
	*/
	GetGroupNodesTaskProcessSlow(nowTm int64) (groupNodesMap map[string]GroupNode, e error)

	/*
		重置未超时且未完成、未失败的任务节点的状态
	*/
	SetExpandNodeStateFailed(node string) (e error)

	/**
	分布式锁,通过redis实现
	*/
	GetLock(db int, key string, expireSec int64, timeout int64) (getLock bool)

	/**
	释放锁
	*/
	UnLock(db int, key string) (e error)
}

type Node

type Node struct {
	Peer       `json:"peer"`
	TotalSpace uint64 `json:"total_space"` //魔盒的总存储空间
	LeftSpace  int64  `json:"left_space"`  //剩余空闲空间
	State      int    `json:"state"`       // 超级硬盘状态
	UpSpeed    int64  `json:"up_speed"`    //上行带宽字节
	Upload     int64  `json:"upload"`      //上传速度
	Download   int64  `json:"download"`    //下载速度
}

type NodeDetail

type NodeDetail struct {
	Peer         `json:"peer"`
	TotalSpace   uint64  `json:"total_space"`    //魔盒的总存储空间
	LeftP2pSpace int64   `json:"left_p2p_space"` //P2P剩余空闲空间(total_space*percent/100-各分组容量之和)
	Percent      int8    `json:"percent"`        //节点空间占用比例
	UpdateTm     int64   `json:"update_tm"`      //上次活跃时间
	RegTm        int64   `json:"reg_tm"`         //节点注册时间
	ActiveGroups int     `json:"activ_groups"`   //还未满的分组数量
	OnlineTm     int64   `json:"online_tm"`      //上次在线时间
	Weight       float64 `json:"weight"`         //节点权重
	OnlineCount  int     `json:"online_cnt"`     //节点在线时间计数
	UpSpeed      int64   `json:"up_speed"`       //上行带宽字节
	Upload       int64   `json:"upload"`         //上传速度
	Download     int64   `json:"download"`       //下载速度

}

func GetNodesByIds

func GetNodesByIds(ids []string) (nodes []NodeDetail, e error)

批量获取节点

func (*NodeDetail) GetWeight

func (detail *NodeDetail) GetWeight() (weight float64)

func (*NodeDetail) Update

func (detail *NodeDetail) Update(node *Node) (e error)

更新节点详情

参数:

node: 节点基本信息
groups: 节点所在的分组列表

type NodeGroupDetail

type NodeGroupDetail struct {
	Group   `json:"group"`
	FileVer uint64 `json:"file_ver"` //分组当前版本号
	NodeVer uint64 `json:"node_ver"` //节点当前版本号
	State   int    `json:"state"`    //ONLINE/OFFLINE
	MaxVer  uint64 `json:"max_ver"`  //组所在节点历史最大版本
	AddVer  uint64 `json:"add_ver"`  // 分组当前最新 add_ver 版本
}

type Peer

type Peer struct {
	ID            string `json:"id"`
	IP            string `json:"ip"`
	Port          int32  `json:"port"`
	UPNPIP        string `json:"upnp_ip"`
	UPNPPort      int32  `json:"upnp_port"`
	NATType       int8   `json:"nat_type"`
	UPNPAvailable int8   `json:"upnp_available"`
}

func GetDelegates

func GetDelegates(num int) (peers []Peer, e error)

获取可以用作代理的节点

参数: 返回值:

nodes: 可用的节点列表
	   则认为下载失败。

func GetOnlineNodesByIds

func GetOnlineNodesByIds(ids []string, min_update_tm int64) (peers []Peer, e error)

func (*Peer) FillUPNPAvailable

func (p *Peer) FillUPNPAvailable()

type TaskNode

type TaskNode struct {
	ID   uint64 `json:"id"`
	Node string `json:"node"`
	Tm   int64  `json:"tm"`
}

任务节点关系表

type UnSafeExpandNode

type UnSafeExpandNode struct {
	ID    uint64 `json:"id"`
	Group string `json:"group"`
	Node  string `json:"node"`
	MD5   string `json:"md5"`
	State int8   `json:"state"`
	Tm    int64  `json:"tm"` //创建时间,秒数
}

危险文件上传任务对象

func GetUnSafeExpandTasks

func GetUnSafeExpandTasks(nid string, state int8, num int) (exNodes []UnSafeExpandNode, e error)

func GetUnSafeFileExpandNode

func GetUnSafeFileExpandNode() (expandNodes []UnSafeExpandNode, e error)

删除危险文件任务

func (*UnSafeExpandNode) IsFinished

func (en *UnSafeExpandNode) IsFinished() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL