uploads

package
v2.0.42 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2025 License: MIT Imports: 44 Imported by: 0

Documentation

Overview

Package uploads 提供文件上传相关的功能实现

Package uploads 提供文件上传相关的功能实现

Package uploads 提供了文件上传相关的功能

Package uploads 实现文件上传相关功能

Package uploads 提供文件上传相关的功能实现

Index

Constants

View Source
const (
	// 上传临时文件前缀
	ProcessSegmentPrefix = "segment_process_" // 处理过程中的分片临时文件前缀
	SegmentDataPrefix    = "segment_data_"    // 分片数据临时文件前缀
)
View Source
const (
	TempFileData     = "data"     // 数据分片临时文件
	TempFileParity   = "parity"   // 校验分片临时文件
	TempFileProcess  = "process"  // 处理过程临时文件
	TempFileCompress = "compress" // 压缩临时文件
)

临时文件类型常量

View Source
const (
	MaxBlockSize = 1024 * 1024 * 100 // 最大块大小,100MB
	ConnTimeout  = 60 * time.Second  // 连接超时时间
)
View Source
const (
	MaxSessions = 3 // 允许的最大并发会话数
)

Variables

View Source
var (
	// StreamSendingToNetworkProtocol 定义了发送任务到网络的协议标识符
	StreamSendingToNetworkProtocol = fmt.Sprintf("defs@stream/sending/network/%s", version)
	// StreamForwardToNetworkProtocol 定义了转发任务到网络的协议标识符
	StreamForwardToNetworkProtocol = fmt.Sprintf("defs@stream/forward/network/%s", version)
)

AllowedTopics 定义了系统支持的所有主题列表

Functions

func CalculateShards

func CalculateShards(size int64, opt *fscfg.Options) (int64, int64, error)

CalculateShards 根据文件大小和存储选项计算数据分片和奇偶校验分片的数量 参数:

  • size: int64 文件大小,单位为字节
  • opt: *fscfg.Options 存储选项,包含存储模式和其他参数

返回值:

  • int64: 数据分片数
  • int64: 奇偶校验分片数
  • error: 如果发生错误,返回错误信息

func CleanupSegmentTempFiles added in v2.0.28

func CleanupSegmentTempFiles() error

CleanupSegmentTempFiles 清理所有临时文件(慎用!) 返回值:

  • error: 清理失败错误

func CleanupTaskSegmentFiles added in v2.0.32

func CleanupTaskSegmentFiles(taskID string) error

CleanupTaskSegmentFiles 清理特定任务的所有临时文件 参数:

  • taskID: 任务ID

返回值:

  • error: 清理失败错误

func CleanupUploadsTempDir added in v2.0.32

func CleanupUploadsTempDir() error

CleanupUploadsTempDir 清理上传文件的临时目录 返回值:

  • error: 如果清理失败则返回错误

func CreateFileAssetRecord

func CreateFileAssetRecord(db *badgerhold.Store, fileRecord *pb.UploadFileRecord) error

CreateFileAssetRecord 创建一个新的文件资产记录 参数:

  • db: *badgerhold.Store 数据库实例
  • fileRecord: *pb.UploadFileRecord 上传文件记录信息

返回值:

  • error: 如果创建过程中发生错误,返回错误信息;否则返回 nil

func CreateProcessTempFile added in v2.0.32

func CreateProcessTempFile(segmentID string) (string, error)

CreateProcessTempFile 创建处理分片的临时文件 参数:

  • segmentID: 分片ID

返回值:

  • string: 临时文件路径
  • error: 如果创建失败则返回错误

func CreateSegmentDataTempFile added in v2.0.32

func CreateSegmentDataTempFile(segmentID string) (string, error)

CreateSegmentDataTempFile 创建分片数据的临时文件 参数:

  • segmentID: 分片ID

返回值:

  • string: 临时文件路径
  • error: 如果创建失败则返回错误

func CreateUploadFileRecord

func CreateUploadFileRecord(
	db *badgerhold.Store,
	taskID string,
	fileID string,
	name string,
	fileMeta *pb.FileMeta,
	fileSecurity *pb.FileSecurity,
	status pb.UploadStatus,
) error

CreateUploadFileRecord 创建上传文件记录并保存到数据库 该方法用于初始化一个新的文件上传记录,并将其保存到持久化存储中

参数:

  • db: 上传文件数据库存储接口
  • taskID: 上传任务的唯一标识
  • fileID: 文件的唯一标识
  • name: 文件名称/路径
  • fileMeta: 文件的元数据信息
  • fileSecurity: 文件的安全相关信息
  • status: 文件上传状态

返回值:

  • error: 如果创建或存储过程中发生错误则返回相应错误,否则返回 nil

func CreateUploadSegmentRecord

func CreateUploadSegmentRecord(
	db *badgerhold.Store,
	taskID string,
	segmentID string,
	segmentIndex int64,
	size int64,
	checksum uint32,
	readKey string,
	isRsCodes bool,
	status pb.SegmentUploadStatus,
) error

CreateUploadSegmentRecord 创建上传分片记录 参数:

  • db: *badgerhold.Store 数据库实例
  • taskID: string 任务ID
  • segmentID: string 分片ID
  • segmentIndex: int64 分片索引
  • size: int64 分片大小
  • checksum: uint32 CRC32校验和
  • readKey: string 临时文件读取标识
  • isRsCodes: bool 是否为纠删码分片
  • status: SegmentUploadStatus 分片状态

返回值:

  • error: 如果创建过程中发生错误,返回错误信息

func CreateUploadsTempDir added in v2.0.32

func CreateUploadsTempDir() error

CreateUploadsTempDir 创建上传文件的临时目录 返回值:

  • error: 如果创建失败则返回错误

func GetUploadProgress

func GetUploadProgress(db *badgerhold.Store, taskID string) (*bitset.BitSet, error)

GetUploadProgress 获取上传任务的进度信息 参数:

  • db: *badgerhold.Store 数据库存储接口
  • taskID: string 上传任务的唯一标识

返回值:

  • *bitset.BitSet: 表示上传进度的位图
  • error: 如果获取过程中发生错误则返回相应错误,否则返回 nil

func GetUploadsTempDir added in v2.0.32

func GetUploadsTempDir() string

GetUploadsTempDir 返回上传文件的临时目录 返回值:

  • string: 上传文件的临时目录路径

func HandleDeleteFileSegmentRequestPubSub

func HandleDeleteFileSegmentRequestPubSub(
	ctx context.Context,
	opt *fscfg.Options,
	db *database.DB,
	fs afero.Afero,
	nps *pubsub.NodePubSub,
	upload *UploadManager,
	res *pubsub.Message,
) error

HandleDeleteFileSegmentRequestPubSub 处理删除文件切片请求主题 参数:

  • ctx: context.Context 上下文对象,用于控制请求的生命周期
  • opt: *fscfg.Options 配置选项,包含系统配置信息
  • db: *database.DB 数据库实例,用于数据持久化
  • fs: afero.Afero 文件系统实例,用于文件操作
  • nps: *pubsub.NodePubSub 发布订阅系统,用于消息通信
  • upload: *UploadManager 上传管理器实例,处理上传相关逻辑
  • res: *pubsub.Message 接收到的消息,包含请求数据

返回值:

  • error 返回处理过程中的错误信息

功能:

  • 处理删除文件切片信息的请求
  • 解析请求数据并删除对应的文件切片存储记录

func InitializeUploadManager

func InitializeUploadManager(lc fx.Lifecycle, input InitializeUploadManagerInput) error

InitializeUploadManager 初始化 UploadManager 并设置相关的生命周期钩子 参数:

  • lc: fx.Lifecycle 用于管理应用生命周期的对象
  • input: InitializeUploadManagerInput 包含初始化 UploadManager 所需的输入参数

返回值:

  • error 如果初始化过程中发生错误,则返回相应的错误信息

func NewFileMeta

func NewFileMeta(f *os.File) (*pb.FileMeta, error)

NewFileMeta 创建并初始化一个新的 FileMeta 实例,提供文件的基本元数据信息 参数:

  • file: *os.File 文件对象,用于读取文件信息

返回值:

  • *pb.FileMeta: 新创建的 FileMeta 实例,包含文件的基本元数据
  • error: 如果发生错误,返回错误信息

func NewFileSecurity

func NewFileSecurity(fileID string, privKey *ecdsa.PrivateKey, secret []byte) (*pb.FileSecurity, error)

NewFileSecurity 创建并初始化一个新的FileSecurity实例,封装了文件的安全和权限相关的信息 参数:

  • fileID: string 文件ID
  • privKey: *ecdsa.PrivateKey 私钥
  • secret: []byte 需要共享的秘密

返回:

  • *pb.FileSecurity: 包含文件安全信息的结构体
  • error: 错误信息

func NewFileSegment

func NewFileSegment(db *badgerhold.Store, taskID string, fileID string, file *os.File, pk []byte, dataShards, parityShards int64) error

NewFileSegment 创建并初始化一个新的 FileSegment 实例 参数:

  • db: 数据库实例
  • taskID: 任务ID
  • fileID: 文件ID
  • file: 原始文件
  • pk: 公钥
  • dataShards: 数据分片数
  • parityShards: 奇偶校验分片数

返回值:

  • error: 如果处理过程中发生错误,返回错误信息

func NewUploadFile

func NewUploadFile(opt *fscfg.Options, db *database.DB, scheme *shamir.ShamirScheme,
	name string,
	ownerPriv *ecdsa.PrivateKey,
	onSegmentsReady func(taskID string),
	taskStatus *SegmentStatus,
	errChan chan error,
) (*pb.UploadOperationInfo, error)

NewUploadFile 创建并初始化一个新的 UploadFile 实例 参数:

  • opt: *fscfg.Options 存储选项
  • db: *database.DB 数据库实例
  • scheme: *shamir.ShamirScheme Shamir 秘密共享方案
  • name: string 文件名
  • ownerPriv: *ecdsa.PrivateKey 文件所有者的私钥
  • onSegmentsReady: func(taskID string) 完成回调函数

返回值:

  • *pb.UploadOperationInfo: 上传操作信息
  • error: 错误信息

func QueryFileAssets

func QueryFileAssets(db *badgerhold.Store, pubkeyHash []byte, start, pageSize int, query string, options ...database.QueryOption) ([]*pb.FileAssetRecord, uint64, int, int, error)

QueryFileAssets 查询文件资产记录 参数:

  • db: BadgerDB存储实例,用于数据持久化
  • pubkeyHash: 所有者的公钥哈希,用于权限验证
  • start: 起始记录索引
  • pageSize: 每页的最大记录数
  • query: 查询条件字符串
  • options: 额外的查询选项,用于设置查询条件

返回值:

  • []*pb.FileAssetRecord: 查询到的文件资产记录切片
  • uint64: 符合查询条件的总记录数
  • int: 当前页数
  • int: 每页的最大记录数
  • error: 如果查询过程中发生错误,返回错误信息

func RegisterUploadPubsubProtocol

func RegisterUploadPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)

RegisterUploadPubsubProtocol 注册所有上传相关的PubSub协议处理器 参数:

  • lc: fx.Lifecycle 应用生命周期管理器
  • input: RegisterPubsubProtocolInput 注册所需的输入参数

返回值:

  • error: 如果注册过程中出现错误,返回相应的错误信息

func RegisterUploadStreamProtocol

func RegisterUploadStreamProtocol(lc fx.Lifecycle, input RegisterStreamProtocolInput)

RegisterUploadStreamProtocol 注册上传流协议 参数:

  • lc: fx.Lifecycle 类型,用于管理组件的生命周期
  • input: RegisterStreamProtocolInput 类型,包含注册所需的所有依赖项

返回值: 无

func RequestDeleteFileSegmentPubSub

func RequestDeleteFileSegmentPubSub(
	ctx context.Context,
	nps *pubsub.NodePubSub,
	fileID string,
	pubkeyHash []byte,
) error

RequestDeleteFileSegmentPubSub 发送删除文件片段请求 参数:

  • ctx: 上下文,用于控制请求的生命周期
  • nps: 发布订阅系统,用于节点之间的消息传递
  • fileID: 文件唯一标识
  • pubkeyHash: 所有者的公钥哈希

返回值:

  • error: 如果请求过程中出现错误,返回相应的错误信息

func UpdateSegmentUploadInfo

func UpdateSegmentUploadInfo(
	db *badgerhold.Store,
	taskID string,
	index int64,
	status pb.SegmentUploadStatus,
) error

UpdateSegmentUploadInfo 更新文件片段的上传信息 该方法用于更新文件片段的上传状态、节点ID和上传时间

参数:

  • db: *badgerhold.Store 数据库存储接口
  • taskID: string 上传任务的唯一标识
  • index: int64 片段索引
  • status: pb.SegmentUploadStatus 片段上传状态

返回值:

  • error: 如果更新过程中发生错误则返回相应错误,否则返回 nil

func UpdateUploadFileHashTable

func UpdateUploadFileHashTable(db *badgerhold.Store, taskID string, sliceTable map[int64]*pb.HashTable) error

UpdateUploadFileHashTable 更新上传文件的哈希表 该方法仅在文件处于编码状态时更新哈希表,并将状态改为待上传 参数:

  • db: *badgerhold.Store 数据库存储接口
  • taskID: string 上传任务的唯一标识
  • sliceTable: map[int64]*pb.HashTable 分片哈希表

返回值:

  • error: 如果更新过程中发生错误则返回相应错误,否则返回 nil

func UpdateUploadFileStatus

func UpdateUploadFileStatus(db *badgerhold.Store, taskID string, status pb.UploadStatus) error

UpdateUploadFileStatus 更新上传文件的状态 参数:

  • db: *badgerhold.Store 数据库存储接口
  • taskID: string 上传任务的唯一标识
  • status: pb.UploadStatus 新的文件状态

返回值:

  • error: 如果更新过程中发生错误则返回相应错误,否则返回 nil

Types

type CompressContext added in v2.0.28

type CompressContext struct {
	// contains filtered or unexported fields
}

CompressContext 压缩上下文

func (*CompressContext) Reset added in v2.0.28

func (cc *CompressContext) Reset()

Reset 重置压缩上下文

type ForwardQueueManager added in v2.0.34

type ForwardQueueManager struct {
	// contains filtered or unexported fields
}

ForwardQueueManager 转发队列管理器

func NewForwardQueueManager added in v2.0.34

func NewForwardQueueManager(ctx context.Context, workerCount int, db *database.DB,
	routingTable *kbucket.RoutingTable, host host.Host) *ForwardQueueManager

NewForwardQueueManager 创建转发队列管理器

func (*ForwardQueueManager) GetStats added in v2.0.34

func (fqm *ForwardQueueManager) GetStats() map[string]interface{}

GetStats 获取转发队列的统计信息

func (*ForwardQueueManager) Submit added in v2.0.34

func (fqm *ForwardQueueManager) Submit(payload *pb.FileSegmentStorage)

Submit 提交转发任务

type ForwardTask added in v2.0.34

type ForwardTask struct {
	SegmentID  string                 // 分片ID
	Payload    *pb.FileSegmentStorage // 存储原始 payload(不包含内容)
	RetryCount int                    // 重试计数
	Timestamp  time.Time              // 创建时间戳
}

ForwardTask 文件片段转发任务

type InitializeUploadManagerInput

type InitializeUploadManagerInput struct {
	fx.In

	Upload *UploadManager
}

InitializeUploadManagerInput 定义了初始化 UploadManager 所需的输入参数

type MemoryMonitor added in v2.0.28

type MemoryMonitor struct {
	// contains filtered or unexported fields
}

MemoryMonitor 内存监控器

func NewMemoryMonitor added in v2.0.28

func NewMemoryMonitor() *MemoryMonitor

NewMemoryMonitor 创建新的内存监控器

func (*MemoryMonitor) AcquireToken added in v2.0.28

func (mm *MemoryMonitor) AcquireToken()

AcquireToken 获取并发令牌

func (*MemoryMonitor) CheckMemory added in v2.0.28

func (mm *MemoryMonitor) CheckMemory()

CheckMemory 检查内存使用情况

func (*MemoryMonitor) Global added in v2.0.28

func (mm *MemoryMonitor) Global() *MemoryMonitor

Global 获取全局内存监控器实例

func (*MemoryMonitor) ReleaseToken added in v2.0.28

func (mm *MemoryMonitor) ReleaseToken()

ReleaseToken 释放并发令牌

type NetworkError added in v2.0.28

type NetworkError struct {
	// contains filtered or unexported fields
}

NetworkError 自定义网络错误结构

func (*NetworkError) Error added in v2.0.28

func (e *NetworkError) Error() string

type NetworkErrorType added in v2.0.28

type NetworkErrorType int

NetworkErrorType 定义网络错误类型

const (
	// 临时性错误,可以重试
	TempError NetworkErrorType = iota
	// 永久性错误,不应重试
	PermanentError
	// 致命错误,需要终止任务
	FatalError
)

type NewUploadManagerInput

type NewUploadManagerInput struct {
	fx.In

	Ctx          context.Context       // 全局上下文,用于管理整个应用的生命周期和取消操作
	Opt          *fscfg.Options        // 文件存储选项配置,包含各种系统设置和参数
	DB           *database.DB          // 持久化存储,用于本地数据的存储和检索
	FS           afero.Afero           // 文件文件系统接口,提供跨平台的文件操作能力统接口
	Host         host.Host             // libp2p网络主机实例
	RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由
	NPS          *pubsub.NodePubSub    // 发布订阅系统,用于节点之间的消息传递
}

NewUploadManagerInput 定义了创建 UploadManager 所需的输入参数

type NewUploadManagerOutput

type NewUploadManagerOutput struct {
	fx.Out

	Upload *UploadManager // 上传管理器,用于处理和管理文件上传任务,包括任务调度、状态跟踪等
}

NewUploadManagerOutput 定义了 NewUploadManager 函数的输出

func NewUploadManager

func NewUploadManager(lc fx.Lifecycle, input NewUploadManagerInput) (out NewUploadManagerOutput, err error)

NewUploadManager 创建并初始化一个新的 UploadManager 实例 参数:

  • lc: fx.Lifecycle 用于管理应用生命周期的对象
  • input: NewUploadManagerInput 包含创建 UploadManager 所需的输入参数

返回值:

  • out: NewUploadManagerOutput 包含创建的 UploadManager 实例
  • err: error 如果创建过程中发生错误,则返回相应的错误信息

type ProcessFunc added in v2.0.28

type ProcessFunc func([]byte) ([]byte, error)

ProcessFunc 定义数据处理函数类型

type ProcessPipeline added in v2.0.28

type ProcessPipeline struct {
	// contains filtered or unexported fields
}

ProcessPipeline 处理管道

func (*ProcessPipeline) Process added in v2.0.28

func (p *ProcessPipeline) Process() error

Process 流式处理

type ProtocolHandler added in v2.0.32

type ProtocolHandler struct {
	// contains filtered or unexported fields
}

ProtocolHandler 替代原来的StreamUtils,使用defsproto.Handler处理网络通信

func NewProtocolHandler added in v2.0.32

func NewProtocolHandler(conn net.Conn) *ProtocolHandler

NewProtocolHandler 创建一个新的协议处理器 参数:

  • conn: 网络连接

返回值:

  • *ProtocolHandler: 新的协议处理器实例

func (*ProtocolHandler) Close added in v2.0.32

func (p *ProtocolHandler) Close() error

Close 关闭处理器

func (*ProtocolHandler) ReadResponse added in v2.0.32

func (p *ProtocolHandler) ReadResponse() error

ReadResponse 读取响应 参数:

  • s: StreamUtils实例

返回值:

  • error: 如果读取失败,返回相应的错误信息

func (*ProtocolHandler) WriteSegmentData added in v2.0.32

func (p *ProtocolHandler) WriteSegmentData(payload *pb.FileSegmentStorage) error

WriteSegmentData 写入分片数据 参数:

  • payload: 分片数据

返回值:

  • error: 如果写入失败,返回相应的错误信息

type RegisterPubsubProtocolInput

type RegisterPubsubProtocolInput struct {
	fx.In

	Ctx    context.Context    // 全局上下文,用于管理整个应用的生命周期
	Opt    *fscfg.Options     // 文件存储配置选项
	DB     *database.DB       // 本地数据存储实例
	FS     afero.Afero        // 文件系统接口
	Host   host.Host          // libp2p网络主机实例
	NPS    *pubsub.NodePubSub // 发布订阅系统
	Upload *UploadManager     // 上传管理器实例
}

RegisterPubsubProtocolInput 定义了注册PubsubProtocol所需的输入参数

type RegisterStreamProtocolInput

type RegisterStreamProtocolInput struct {
	fx.In

	Ctx          context.Context       // 全局上下文,用于管理整个应用的生命周期和取消操作
	Opt          *fscfg.Options        // 文件存储选项配置,包含各种系统设置和参数
	DB           *database.DB          // 持久化存储,用于本地数据的存储和检索
	FS           afero.Afero           // 文件系统接口,提供跨平台的文件操作能力
	Host         host.Host             // libp2p网络主机实例
	RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由
	NPS          *pubsub.NodePubSub    // 发布订阅系统,用于节点之间的消息传递
	Upload       *UploadManager        // 上传管理器,用于处理和管理文件上传任务,包括任务调度、状态跟踪等
}

RegisterStreamProtocolInput 定义了注册流协议所需的输入参数

type ResourcePool added in v2.0.28

type ResourcePool struct {
	// contains filtered or unexported fields
}

ResourcePool 资源池管理器

func Global added in v2.0.28

func Global() *ResourcePool

Global 获取全局资源池实例 返回值:

  • *ResourcePool: 全局资源池实例

func NewResourcePool added in v2.0.28

func NewResourcePool() *ResourcePool

NewResourcePool 创建新的资源池 返回值:

  • *ResourcePool: 资源池实例

func (*ResourcePool) GetCompressBuffer added in v2.0.28

func (rp *ResourcePool) GetCompressBuffer() *bytes.Buffer

GetCompressBuffer 获取压缩缓冲区 返回值:

  • *bytes.Buffer: 压缩缓冲区

func (*ResourcePool) GetCompressContext added in v2.0.28

func (rp *ResourcePool) GetCompressContext() *CompressContext

GetCompressContext 获取压缩上下文 返回值:

  • *CompressContext: 压缩上下文

func (*ResourcePool) GetLargeBuffer added in v2.0.28

func (rp *ResourcePool) GetLargeBuffer() []byte

GetLargeBuffer 获取大缓冲区 返回值:

  • []byte: 大缓冲区

func (*ResourcePool) GetReader added in v2.0.28

func (rp *ResourcePool) GetReader(r io.Reader) *bufio.Reader

GetReader 获取带缓冲的Reader 参数:

  • r: io.Reader 读取器

返回值:

  • *bufio.Reader: 带缓冲的读取器

func (*ResourcePool) GetStreamBuffer added in v2.0.28

func (rp *ResourcePool) GetStreamBuffer() []byte

GetStreamBuffer 获取流处理缓冲区 返回值:

  • []byte: 流处理缓冲区

func (*ResourcePool) GetWriter added in v2.0.28

func (rp *ResourcePool) GetWriter(w io.Writer) *bufio.Writer

GetWriter 获取带缓冲的Writer 参数:

  • w: io.Writer 写入器

返回值:

  • *bufio.Writer: 带缓冲的写入器

func (*ResourcePool) PutCompressBuffer added in v2.0.28

func (rp *ResourcePool) PutCompressBuffer(buf *bytes.Buffer)

PutCompressBuffer 归还压缩缓冲区 参数:

  • buf: *bytes.Buffer 压缩缓冲区

func (*ResourcePool) PutCompressContext added in v2.0.28

func (rp *ResourcePool) PutCompressContext(ctx *CompressContext)

PutCompressContext 归还压缩上下文 参数:

  • ctx: *CompressContext 压缩上下文

func (*ResourcePool) PutLargeBuffer added in v2.0.28

func (rp *ResourcePool) PutLargeBuffer(buf []byte)

PutLargeBuffer 归还大缓冲区 参数:

  • buf: []byte 大缓冲区

func (*ResourcePool) PutReader added in v2.0.28

func (rp *ResourcePool) PutReader(r *bufio.Reader)

PutReader 归还Reader 参数:

  • r: *bufio.Reader 带缓冲的读取器

func (*ResourcePool) PutStreamBuffer added in v2.0.28

func (rp *ResourcePool) PutStreamBuffer(buf []byte)

PutStreamBuffer 归还流处理缓冲区 参数:

  • buf: []byte 流处理缓冲区

func (*ResourcePool) PutWriter added in v2.0.28

func (rp *ResourcePool) PutWriter(w *bufio.Writer)

PutWriter 归还Writer 参数:

  • w: *bufio.Writer 带缓冲的写入器

func (*ResourcePool) ResetPools added in v2.0.28

func (p *ResourcePool) ResetPools()

ResetPools 重置所有资源池

type SafeHashTableMap added in v2.0.28

type SafeHashTableMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SafeHashTableMap 线程安全的哈希表映射

func NewSafeHashTableMap added in v2.0.28

func NewSafeHashTableMap() *SafeHashTableMap

NewSafeHashTableMap 创建新的线程安全的哈希表映射 返回值:

  • *SafeHashTableMap: 新的哈希表映射

func (*SafeHashTableMap) Get added in v2.0.28

func (m *SafeHashTableMap) Get(index int64) (*pb.HashTable, bool)

Get 获取哈希表映射中的值 参数:

  • index: 索引

返回值:

  • *pb.HashTable: 哈希表
  • bool: 是否存在

func (*SafeHashTableMap) Set added in v2.0.28

func (m *SafeHashTableMap) Set(index int64, table *pb.HashTable)

Set 设置哈希表映射中的值 参数:

  • index: 索引
  • table: 哈希表

func (*SafeHashTableMap) ToMap added in v2.0.28

func (m *SafeHashTableMap) ToMap() map[int64]*pb.HashTable

ToMap 将哈希表映射转换为map 返回值:

  • map[int64]*pb.HashTable: 哈希表映射

type SegmentDataMessage added in v2.0.32

type SegmentDataMessage struct {
	Payload *pb.FileSegmentStorage
}

SegmentDataMessage 表示要发送的分片数据

func (*SegmentDataMessage) GetSegmentId added in v2.0.32

func (m *SegmentDataMessage) GetSegmentId() string

GetSegmentId 获取分片ID

func (*SegmentDataMessage) Marshal added in v2.0.32

func (m *SegmentDataMessage) Marshal() ([]byte, error)

Marshal 将消息序列化为字节数组

func (*SegmentDataMessage) Unmarshal added in v2.0.32

func (m *SegmentDataMessage) Unmarshal(data []byte) error

Unmarshal 从字节数组反序列化消息

type SegmentMessage added in v2.0.32

type SegmentMessage interface {
	defsproto.Message
	GetSegmentId() string
}

定义用于文件分片传输的消息类型

type SegmentResponseMessage added in v2.0.32

type SegmentResponseMessage struct {
	Success bool
	Message string
	Error   string
}

SegmentResponseMessage 表示分片传输的响应

func (*SegmentResponseMessage) GetSegmentId added in v2.0.32

func (m *SegmentResponseMessage) GetSegmentId() string

GetSegmentId 实现SegmentMessage接口

func (*SegmentResponseMessage) Marshal added in v2.0.32

func (m *SegmentResponseMessage) Marshal() ([]byte, error)

Marshal 将响应消息序列化为字节数组

func (*SegmentResponseMessage) Unmarshal added in v2.0.32

func (m *SegmentResponseMessage) Unmarshal(data []byte) error

Unmarshal 从字节数组反序列化响应消息

type SegmentStatus

type SegmentStatus struct {
	// contains filtered or unexported fields
}

SegmentStatus 用于跟踪文件是否准备就绪的状态

func NewSegmentStatus

func NewSegmentStatus(locker sync.Locker) *SegmentStatus

NewSegmentStatus 初始化并返回一个 SegmentStatus 实例 参数:

  • locker: sync.Locker 用于同步的互斥锁

返回值:

  • *SegmentStatus: 初始化后的 SegmentStatus 实例

func (*SegmentStatus) GetState

func (s *SegmentStatus) GetState() bool

GetState 获取当前状态 返回值:

  • bool: 当前的状态值

func (*SegmentStatus) SetState

func (s *SegmentStatus) SetState(state bool)

SetState 设置状态,并通知所有等待的goroutine 参数:

  • state: bool 要设置的新状态

func (*SegmentStatus) WaitForSpecificState

func (s *SegmentStatus) WaitForSpecificState(targetState bool)

WaitForSpecificState 阻塞当前goroutine,直到达到指定状态 参数:

  • targetState: bool 要等待的目标状态

func (*SegmentStatus) WaitForStateChange

func (s *SegmentStatus) WaitForStateChange()

WaitForStateChange 阻塞当前goroutine,直到状态发生变化

type ShardProcessor added in v2.0.28

type ShardProcessor struct {
	sync.RWMutex // 读写锁
	// contains filtered or unexported fields
}

ShardProcessor 分片处理器

func NewShardProcessor added in v2.0.28

func NewShardProcessor() *ShardProcessor

NewShardProcessor 创建分片处理器 返回值:

  • *ShardProcessor: 分片处理器实例

func (*ShardProcessor) GetChunk added in v2.0.28

func (sp *ShardProcessor) GetChunk() []byte

GetChunk 获取处理块 返回值:

  • []byte: 处理块

func (*ShardProcessor) GetCompressContext added in v2.0.28

func (sp *ShardProcessor) GetCompressContext() *CompressContext

GetCompressContext 获取压缩上下文 返回值:

  • *CompressContext: 压缩上下文

func (*ShardProcessor) GetHashTables added in v2.0.28

func (sp *ShardProcessor) GetHashTables() map[int64]*pb.HashTable

GetHashTables 获取所有分片哈希表 返回值:

  • map[int64]*pb.HashTable: 分片哈希表

func (*ShardProcessor) PutChunk added in v2.0.28

func (sp *ShardProcessor) PutChunk(chunk []byte)

PutChunk 归还处理块 参数:

  • chunk: []byte 处理块

func (*ShardProcessor) PutCompressContext added in v2.0.28

func (sp *ShardProcessor) PutCompressContext(ctx *CompressContext)

PutCompressContext 归还压缩上下文 参数:

  • ctx: *CompressContext 压缩上下文

func (*ShardProcessor) UpdateHashTable added in v2.0.28

func (sp *ShardProcessor) UpdateHashTable(index int64, table *pb.HashTable)

UpdateHashTable 更新分片哈希表 参数:

  • index: int64 分片索引
  • table: *pb.HashTable 分片哈希表

type ShardWriter added in v2.0.28

type ShardWriter struct {
	// contains filtered or unexported fields
}

ShardWriter 提供分片数据的流式写入功能

func NewShardWriter added in v2.0.28

func NewShardWriter(file *os.File, index int) *ShardWriter

NewShardWriter 创建新的分片写入器 参数:

  • file: 目标文件
  • index: 分片索引

返回值:

  • *ShardWriter: 分片写入器

func (*ShardWriter) Close added in v2.0.28

func (sw *ShardWriter) Close() error

Close 关闭写入器 返回值:

  • error: 关闭错误

func (*ShardWriter) Write added in v2.0.28

func (sw *ShardWriter) Write(p []byte) (n int, err error)

Write 实现io.Writer接口,提供流式写入 参数:

  • p: 要写入的数据

返回值:

  • n: 写入的字节数
  • err: 写入错误

type StreamProcessor added in v2.0.28

type StreamProcessor struct {
	// contains filtered or unexported fields
}

StreamProcessor 提供流式数据处理

func NewStreamProcessor added in v2.0.28

func NewStreamProcessor(reader io.Reader, writer io.Writer, processors []ProcessFunc) *StreamProcessor

NewStreamProcessor 创建新的流式处理器 参数:

  • reader: 输入读取器
  • writer: 输出写入器
  • processors: 数据处理函数列表

返回值:

  • *StreamProcessor: 流式处理器

func (*StreamProcessor) Process added in v2.0.28

func (sp *StreamProcessor) Process() error

Process 执行流式处理 返回值:

  • error: 处理错误

type StreamProtocol

type StreamProtocol struct {
	// contains filtered or unexported fields
}

StreamProtocol 定义了流协议的结构体

type TempFileManager added in v2.0.28

type TempFileManager struct {
	// contains filtered or unexported fields
}

TempFileManager 临时文件管理器

func NewTempFileManager added in v2.0.28

func NewTempFileManager(delayCleanup bool, taskID string) *TempFileManager

NewTempFileManager 创建临时文件管理器 参数:

  • delayCleanup: 是否延迟清理
  • taskID: 任务ID,用于隔离不同上传任务的临时文件

返回值:

  • *TempFileManager: 临时文件管理器

func (*TempFileManager) CleanupFiles added in v2.0.28

func (tm *TempFileManager) CleanupFiles()

CleanupFiles 清理所有临时文件

func (*TempFileManager) CleanupFilesByType added in v2.0.28

func (tm *TempFileManager) CleanupFilesByType(fileType string)

CleanupFilesByType 根据文件类型清理临时文件 参数:

  • fileType: 文件类型

返回值:

  • error: 清理失败错误

func (*TempFileManager) CreateTempFile added in v2.0.28

func (tm *TempFileManager) CreateTempFile(fileType string, index int64) (*os.File, error)

CreateTempFile 创建临时文件 参数:

  • fileType: 文件类型
  • index: 索引

返回值:

  • *os.File: 临时文件
  • error: 创建失败错误

func (*TempFileManager) ForceCleanup added in v2.0.28

func (tm *TempFileManager) ForceCleanup()

ForceCleanup 强制清理所有临时文件

func (*TempFileManager) GetTaskTempDir added in v2.0.32

func (tm *TempFileManager) GetTaskTempDir() string

GetTaskTempDir 获取任务专用的临时目录 返回值:

  • string: 任务专用的临时目录

type Topic

type Topic int

Topic 定义了允许的主题类型

const (
	PubSubDeleteFileSegmentRequestTopic Topic = iota // 删除文件切片请求主题
)

定义主题类型常量

func (Topic) String

func (t Topic) String() string

String 将Topic转换为对应的字符串表示 返回值:

  • string: 主题对应的字符串

type UploadError added in v2.0.28

type UploadError struct {
	SegmentID string
	PeerID    peer.ID
	Operation string // 操作类型:如 "network", "verify", "process"
	Time      time.Time
	Err       error
}

添加错误处理上下文

func (*UploadError) Error added in v2.0.28

func (e *UploadError) Error() string

type UploadManager

type UploadManager struct {
	// contains filtered or unexported fields
}

UploadManager 管理所有上传任务,提供文件上传的统一入口和管理功能 它负责协调上传任务的执行,管理任务的生命周期,以及通知任务的状态更新和错误事件

func (*UploadManager) Cancel

func (m *UploadManager) Cancel() context.CancelFunc

Cancel 获取任务的取消函数 返回值:

  • context.CancelFunc: 任务的取消函数

func (*UploadManager) CancelUpload

func (m *UploadManager) CancelUpload(taskID string) error

CancelUpload 取消上传 参数:

  • taskID: 要取消的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果取消过程中发生错误,返回错误信息;否则返回 nil

func (*UploadManager) Context

func (m *UploadManager) Context() context.Context

Context 获取任务的上下文 返回值:

  • context.Context: 任务的上下文对象

func (*UploadManager) DB

func (m *UploadManager) DB() *database.DB

DB 返回数据库存储 返回值:

  • *badgerhold.Store: 数据库存储

func (*UploadManager) DeleteUpload

func (m *UploadManager) DeleteUpload(taskID string) error

DeleteUpload 删除上传任务 参数:

  • taskID: 要删除的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果删除过程中发生错误,返回错误信息;否则返回 nil

func (*UploadManager) EnsureChannelOpen added in v2.0.23

func (m *UploadManager) EnsureChannelOpen()

检查通道是否关闭,并在关闭时重新初始化

func (*UploadManager) ErrChan

func (m *UploadManager) ErrChan() <-chan error

ErrChan 返回错误通知通道 返回值:

  • <-chan error: 只读的通道,用于接收错误通知

func (*UploadManager) FS

func (m *UploadManager) FS() afero.Afero

FS 返回文件系统接口 返回值:

  • afero.Afero: 文件系统接口

func (*UploadManager) GetAllUploadFilesSummaries

func (m *UploadManager) GetAllUploadFilesSummaries() ([]*pb.UploadFilesSummaries, error)

GetAllUploadFilesSummaries 获取所有上传记录的概要信息 返回值:

  • []*pb.UploadFilesSummaries: 包含所有上传记录概要信息的切片,每个元素包含任务ID、文件名、大小等信息
  • error: 如果获取过程中发生错误,返回错误信息;否则返回 nil

func (*UploadManager) Host

func (m *UploadManager) Host() host.Host

Host 获取网络主机实例 返回值:

  • host.Host: 网络主机实例

func (*UploadManager) IsMaxConcurrencyReached

func (m *UploadManager) IsMaxConcurrencyReached() bool

IsMaxConcurrencyReached 检查是否达到上传允许的最大并发数 返回值:

  • bool: 如果达到最大并发数返回 true,否则返回 false

func (*UploadManager) LoadExistingTasks

func (m *UploadManager) LoadExistingTasks() error

LoadExistingTasks 从数据库加载现有的上传任务 返回值:

  • error: 如果加载过程中发生错误,返回相应的错误信息

func (*UploadManager) ManagerChannelEvents

func (m *UploadManager) ManagerChannelEvents()

ManagerChannelEvents 处理上传管理器的通道事件 此方法启动一个新的goroutine来持续监听和处理各种通道事件

func (*UploadManager) NewUpload

func (m *UploadManager) NewUpload(
	path string,
	ownerPriv *ecdsa.PrivateKey,
	immediate ...bool,
) (*pb.UploadOperationInfo, error)

NewUpload 创建一个新的上传任务 参数:

  • path: 文件路径,要上传的文件的完整路径
  • ownerPriv: 所有者的私钥,用于签名和权限验证
  • immediate: 是否立即执行上传(可选参数,默认为 false)

返回:

  • *pb.UploadOperationInfo: 上传操作信息,包含任务ID等信息
  • error: 错误信息,如果创建失败则返回错误原因

func (*UploadManager) NodePubSub

func (m *UploadManager) NodePubSub() *pubsub.NodePubSub

NodePubSub 返回发布订阅系统 返回值:

  • *pubsub.NodePubSub: 发布订阅系统

func (*UploadManager) NotifyError

func (m *UploadManager) NotifyError(err string, args ...interface{})

NotifyError 通知错误信息 参数:

  • m: 上传管理器对象
  • err: 要通知的错误信息
  • args: 格式化参数

func (*UploadManager) NotifyStatus

func (m *UploadManager) NotifyStatus(status *pb.UploadChan)

NotifyStatus 通知上传状态 参数:

  • m: 上传管理器对象
  • status: 要通知的状态信息

func (*UploadManager) Options

func (m *UploadManager) Options() *fscfg.Options

Options 返回文件存储选项配置 返回值:

  • *fscfg.Options: 文件存储选项配置

func (*UploadManager) PauseUpload

func (m *UploadManager) PauseUpload(taskID string) error

PauseUpload 暂停上传 参数:

  • taskID: 要暂停的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果暂停过程中发生错误,返回错误信息;否则返回 nil

func (*UploadManager) ResumeUpload

func (m *UploadManager) ResumeUpload(taskID string) error

ResumeUpload 继续上传 参数:

  • taskID: 要继续上传的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果继续上传过程中发生错误,返回相应的错误信息

func (*UploadManager) RoutingTable

func (m *UploadManager) RoutingTable() *kbucket.RoutingTable

RoutingTable 获取客户端实例 返回值:

  • *kbucket.RoutingTable : 路由表实例

func (*UploadManager) StatusChan

func (m *UploadManager) StatusChan() <-chan *pb.UploadChan

StatusChan 返回上传状态和进度通知通道 返回值:

  • <-chan *pb.UploadChan: 只读的通道,用于接收上传状态和进度通知

func (*UploadManager) TriggerForward

func (m *UploadManager) TriggerForward(payload *pb.FileSegmentStorage)

TriggerForward 触发转发操作 参数:

  • payload: *pb.FileSegmentStorage 要转发的文件段存储信息

func (*UploadManager) TriggerUpload

func (m *UploadManager) TriggerUpload(taskID string, checkNodesAndSend bool) error

TriggerUpload 触发上传操作 参数:

  • taskID: 要上传的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果触发过程中发生错误,返回错误信息;否则返回 nil

type UploadTask

type UploadTask struct {
	// contains filtered or unexported fields
}

UploadTask 描述一个文件上传任务,包括文件信息和上传状态

func NewUploadTask

func NewUploadTask(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero,
	host host.Host, routingTable *kbucket.RoutingTable, nps *pubsub.NodePubSub,
	scheme *shamir.ShamirScheme, statusChan chan *pb.UploadChan, errChan chan error, taskID string,
) *UploadTask

NewUploadTask 创建并初始化一个新的文件上传任务实例 参数:

  • ctx: context.Context 用于管理任务生命周期的上下文
  • opt: *fscfg.Options 文件存储配置选项
  • db: *database.DB 数据库实例
  • fs: afero.Afero 文件系统接口
  • host: host.Host libp2p网络主机实例
  • routingTable: *kbucket.RoutingTable 路由表
  • nps: *pubsub.NodePubSub 发布订阅系统
  • scheme: *shamir.ShamirScheme Shamir秘密共享方案实例
  • statusChan: chan *pb.UploadChan 状态更新通道
  • taskID: string 任务唯一标识符

返回值:

  • *UploadTask: 创建的上传任务实例

func (*UploadTask) Cleanup

func (t *UploadTask) Cleanup()

Cleanup 清理任务资源

func (*UploadTask) Close

func (t *UploadTask) Close()

Close 关闭任务并释放资源

func (*UploadTask) Context

func (t *UploadTask) Context() context.Context

Context 获取任务的上下文 返回值:

  • context.Context: 任务的上下文对象

func (*UploadTask) DB

func (t *UploadTask) DB() *badgerhold.Store

DB 获取持久化存储 返回值:

  • *badgerhold.Store: 持久化存储实例

func (*UploadTask) FS

func (t *UploadTask) FS() afero.Afero

FS 返回文件系统接口 返回值:

  • afero.Afero: 文件系统接口

func (*UploadTask) ForceFileFinalize

func (t *UploadTask) ForceFileFinalize() error

ForceFileFinalize 强制触发文件完成处理 处理文件上传完成后的操作,如果通道已满则先清空再写入

func (*UploadTask) ForceNetworkTransfer

func (t *UploadTask) ForceNetworkTransfer(peerSegments map[peer.ID][]string) error

ForceNetworkTransfer 强制触发网络传输 向目标节点传输文件片段,支持重试机制

func (*UploadTask) ForceNodeDispatch

func (t *UploadTask) ForceNodeDispatch() error

ForceNodeDispatch 强制触发节点分发 以节点为单位从队列中读取文件片段进行分发,如果通道已满则先清空再写入

func (*UploadTask) ForceSegmentProcess

func (t *UploadTask) ForceSegmentProcess() error

ForceSegmentProcess 强制触发片段处理 将文件片段整合并写入队列,如果通道已满则先清空再写入

func (*UploadTask) ForceSegmentVerify

func (t *UploadTask) ForceSegmentVerify() error

ForceSegmentVerify 强制触发片段验证 验证已传输片段的完整性,如果通道已满则先清空再写入

func (*UploadTask) GetProgress

func (t *UploadTask) GetProgress() (int64, error)

GetProgress 计算并返回当前上传进度 返回值:

  • int64: 返回0-100之间的进度值
  • error: 如果获取进度失败,返回相应的错误信息

func (*UploadTask) GetShardCounts

func (t *UploadTask) GetShardCounts() (totalShards, parityShards int64, err error)

GetShardCounts 返回文件的总分片数和冗余分片数 该方法统计文件的分片信息,包括总分片数和冗余分片数

返回值:

  • totalShards: int64 总分片数
  • parityShards: int64 冗余分片数
  • error: 如果获取分片数失败,返回相应的错误信息

func (*UploadTask) GetTotalSize

func (t *UploadTask) GetTotalSize() (int64, error)

GetTotalSize 返回文件大小加上奇偶校验片段的总大小 该方法计算文件的总大小,包括原始文件和奇偶校验片段

返回值:

  • int64: 文件总大小(包括原始文件大小和奇偶校验片段大小)
  • error: 如果计算过程中发生错误,返回相应的错误信息

func (*UploadTask) Host

func (t *UploadTask) Host() host.Host

Host 获取网络主机实例 返回值:

  • host.Host: 网络主机实例

func (*UploadTask) NodePubSub

func (t *UploadTask) NodePubSub() *pubsub.NodePubSub

NodePubSub 获取存储网络 返回值:

  • *pubsub.NodePubSub: 发布订阅系统

func (*UploadTask) NotifySegmentStatus

func (t *UploadTask) NotifySegmentStatus(status *pb.UploadChan)

NotifySegmentStatus 通知片段状态更新 向外部通知文件片段的处理状态,超时后记录警告日志

func (*UploadTask) NotifyTaskError

func (t *UploadTask) NotifyTaskError(err error)

NotifyTaskError 通知任务错误 向外部通知任务执行过程中的错误,超时后记录警告日志

func (*UploadTask) Options

func (t *UploadTask) Options() *fscfg.Options

Options 获取文件存储选项配置 返回值:

  • *fscfg.Options: 文件存储选项配置

func (*UploadTask) RoutingTable

func (t *UploadTask) RoutingTable() *kbucket.RoutingTable

RoutingTable 获取端实例 返回值:

  • *kbucket.RoutingTable : 路由表实例

func (*UploadTask) Scheme

func (t *UploadTask) Scheme() *shamir.ShamirScheme

Scheme 返回 Shamir 秘密共享方案 返回值:

  • *shamir.ShamirScheme: Shamir 秘密共享方案

func (*UploadTask) SetStatus added in v2.0.28

func (t *UploadTask) SetStatus(status pb.UploadStatus) error

SetStatus 设置任务状态 参数:

  • status: pb.UploadStatus 新的任务状态

返回值:

  • error: 如果设置状态失败,返回相应的错误信息

func (*UploadTask) Start

func (t *UploadTask) Start() error

Start 开始或恢复上传任务 该方法会启动一个goroutine来处理上传任务的各种事件

返回值:

  • error: 如果开始上传过程中发生错误,返回相应的错误信息

func (*UploadTask) TaskID

func (t *UploadTask) TaskID() string

TaskID 获取任务ID 返回值:

  • string: 任务的唯一标识符

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL