task

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SLICE_STATUS_STARTED = iota
	SLICE_STATUS_FAILED
	SLICE_STATUS_WAITING_FOR_SP
	SLICE_STATUS_REPLACED
	SLICE_STATUS_FINISHED

	MAXSLICE              = 50 // max number of slices that can upload concurrently for a single file
	UPLOAD_TIMER_INTERVAL = 10 // seconds
	MAX_UPLOAD_RETRY      = 5
	UPLOAD_WAIT_TIMEOUT   = 60 // in seconds
	STATE_NOT_STARTED     = 0
	STATE_RUNNING         = 1
	STATE_DONE            = 2
	STATE_PAUSED          = 3
)
View Source
const LOCAL_REQID string = "local"
View Source
const TRANSFER_TASK_TIMEOUT_THRESHOLD = 180 // in seconds

Variables

View Source
var (
	// File related maps
	// DownloadTaskMap PP passway download task map   make(map[string]*DownloadTask)
	DownloadTaskMap = utils.NewAutoCleanMap(1 * time.Hour)
	// DownloadFileMap P download info map  make(map[string]*protos.RspFileStorageInfo)
	DownloadFileMap = utils.NewAutoCleanMap(1 * time.Hour)
	// DownloadSpeedOfProgress
	DownloadSpeedOfProgress = &sync.Map{}

	// Slice related maps
	// DownloadSliceTaskMap resource node download slice task map
	DownloadSliceTaskMap = utils.NewAutoCleanMap(1 * time.Hour)
	// SliceSessionMap key: slice reqid, value: file-reqid
	SliceSessionMap = &sync.Map{} //
	// DownloadSliceProgress sliceTaskId + sliceHash + reqId : downloaded size
	DownloadSliceProgress = utils.NewAutoCleanMap(1 * time.Hour)
	// DownloadEncryptedSlices stores the partially downloaded encrypted slices, indexed by the slice hash.
	// This is used because slices can only be decrypted after being fully downloaded
	DownloadEncryptedSlices = &sync.Map{}
)
View Source
var (
	UploadErrNoUploadTask = errors.New("no upload task found for the file")
	UploadErrFatalError   = errors.New("upload task stops on unresolvable error")
	UploadErrMaxRetries   = errors.New("upload task stops on too many retries")
	UploadFinished        = errors.New("upload task stops on success")
	TaskTimer             = clock.NewClock()
)
View Source
var UploadFileTaskMap = &sync.Map{} // map[string]*UploadFileTask

UploadFileTaskMap Map of file upload tasks that are in progress.

View Source
var UploadProgressMap = &sync.Map{} // map[string]*UploadProgress

UploadProgressMap Map of the progress for ongoing uploads

View Source
var UploadTaskIdMap = utils.NewAutoCleanMap(1 * time.Hour) // map[fileHash]taskId  Store the task ID (from SP) for each upload so that getFileStatus knows which taskId corresponds to a fileHash

Functions

func AddDownloadTask

func AddDownloadTask(target *protos.RspFileStorageInfo)

func AddTransferTask added in v0.5.0

func AddTransferTask(taskId, sliceHash string, tTask TransferTask)

func CancelDownloadTask added in v0.5.0

func CancelDownloadTask(fileHash string)

func CheckDownloadOver

func CheckDownloadOver(ctx context.Context, fileHash string) (bool, float32)

CheckDownloadOver check download finished

func CheckDownloadTask added in v0.8.0

func CheckDownloadTask(fileHash, walletAddress, fileReqId string) bool

func CheckRemoteDownloadOver added in v0.9.0

func CheckRemoteDownloadOver(ctx context.Context, fileHash, fileReqId string)

func CheckTransfer

func CheckTransfer(target *protos.NoticeFileSliceBackup) bool

CheckTransfer check whether can transfer todo:

func CleanDownloadFileAndConnMap added in v0.5.0

func CleanDownloadFileAndConnMap(ctx context.Context, fileHash, fileReqId string)

func CleanDownloadTask

func CleanDownloadTask(ctx context.Context, fileHash, sliceHash, walletAddress, fileReqId string)

func CleanTransferTask added in v0.5.0

func CleanTransferTask(taskId, sliceHash string)

func CleanTransferTaskByTaskSliceUID added in v0.11.0

func CleanTransferTaskByTaskSliceUID(taskSliceUID string)

func DeleteDownloadTask

func DeleteDownloadTask(fileHash, walletAddress, fileReqId string)

func DoneDownload

func DoneDownload(ctx context.Context, fileHash, fileName, savePath string)

DoneDownload

func DownloadProgress

func DownloadProgress(ctx context.Context, fileHash, fileReqId string, size uint64)

func DownloadResult added in v0.11.0

func DownloadResult(ctx context.Context, filehash string, success bool, reason string)

func GetOngoingTransferTaskCnt added in v0.10.0

func GetOngoingTransferTaskCnt() int

func GetTimeoutTransfer added in v0.11.0

func GetTimeoutTransfer() []string

func GetTransferSliceData

func GetTransferSliceData(taskId, sliceHash string) (int64, [][]byte)

func SaveBackuptFile added in v0.10.0

func SaveBackuptFile(target *protos.ReqBackupFileSlice) error

func SaveDownloadFile

func SaveDownloadFile(ctx context.Context, target *protos.RspDownloadSlice, fInfo *protos.RspFileStorageInfo) error

func SaveTransferData

func SaveTransferData(target *protos.RspTransferDownload) (bool, error)

func SaveUploadFile

func SaveUploadFile(target *protos.ReqUploadFileSlice) error

func SetDownloadResultToRpc added in v0.11.0

func SetDownloadResultToRpc(fileHash string, result bool)

func StopRepeatedUploadTaskJob added in v0.11.9

func StopRepeatedUploadTaskJob(fileHash string)

func SubscribeDownloadResult added in v0.11.0

func SubscribeDownloadResult(key string) chan bool

SubscribeDownloadResult when download is done, notification is set to subscribers

func UnsubscribeDownloadResult added in v0.11.0

func UnsubscribeDownloadResult(key string)

UnsubscribeDownloadResult

Types

type DownloadSP

type DownloadSP struct {
	RawSize        int64
	TotalSize      int64
	DownloadedSize int64
}

DownloadSP download progress

type DownloadSliceData

type DownloadSliceData struct {
	Data    [][]byte
	FileCrc uint32
	RawSize uint64
}

type DownloadTask

type DownloadTask struct {
	TaskId        string // file task id
	WalletAddress string
	FileHash      string
	VisitCer      string

	FailedSlice   map[string]bool
	SuccessSlice  map[string]bool
	FailedPPNodes map[string]*protos.PPBaseInfo
	SliceCount    int
	// contains filtered or unexported fields
}

DownloadTask signal task convert sliceHash list to map

func GetDownloadTask added in v0.7.0

func GetDownloadTask(key string) (*DownloadTask, bool)

func GetDownloadTaskWithSliceReqId added in v0.9.0

func GetDownloadTaskWithSliceReqId(fileHash, walletAddress, sliceReqId string) (*DownloadTask, bool)

func (*DownloadTask) AddFailedSlice added in v0.7.0

func (task *DownloadTask) AddFailedSlice(sliceHash string)

func (*DownloadTask) DeleteSliceInfo added in v0.11.0

func (task *DownloadTask) DeleteSliceInfo(sliceHash string)

func (*DownloadTask) GetNumberOfSliceInfo added in v0.11.0

func (task *DownloadTask) GetNumberOfSliceInfo() int

func (*DownloadTask) GetSliceInfo added in v0.11.0

func (task *DownloadTask) GetSliceInfo(sliceHash string) (*protos.DownloadSliceInfo, bool)

func (*DownloadTask) NeedRetry added in v0.7.0

func (task *DownloadTask) NeedRetry() (needRetry bool)

func (*DownloadTask) RefreshTask added in v0.7.0

func (task *DownloadTask) RefreshTask(target *protos.RspFileStorageInfo)

func (*DownloadTask) SetSliceSuccess added in v0.7.0

func (task *DownloadTask) SetSliceSuccess(sliceHash string)

type SliceWithStatus added in v0.9.0

type SliceWithStatus struct {
	Error error

	Status   int
	CostTime int64
	// contains filtered or unexported fields
}

SliceWithStatus wraps a SliceHashAddr, and it provides extra states for upload/backup task.

type SlicesPerDestination added in v0.9.0

type SlicesPerDestination struct {
	// contains filtered or unexported fields
}

type TransferTask added in v0.5.0

type TransferTask struct {
	TaskId             string
	IsReceiver         bool
	DeleteOrigin       bool
	PpInfo             *protos.PPBaseInfo
	SliceStorageInfo   *protos.SliceStorageInfo
	FileHash           string
	SliceNum           uint64
	ReceiverP2pAddress string
	SpP2pAddress       string
	AlreadySize        uint64
	LastTouchTime      int64
}

func AddAlreadySizeToTransferTask added in v0.11.0

func AddAlreadySizeToTransferTask(taskId, sliceHash string, alreadySizeDelta uint64) (tTask TransferTask, ok bool)

func GetTransferTask added in v0.5.0

func GetTransferTask(taskId, sliceHash string) (tTask TransferTask, ok bool)

func GetTransferTaskByTaskSliceUID added in v0.11.0

func GetTransferTaskByTaskSliceUID(taskSliceUID string) (tTask TransferTask, ok bool)

type UploadFileTask added in v0.9.0

type UploadFileTask struct {
	// contains filtered or unexported fields
}

UploadFileTask represents a file upload task that is in progress

func CreateBackupFileTask added in v0.10.0

func CreateBackupFileTask(target *protos.RspBackupStatus, fn func(ctx context.Context, fileHash string)) *UploadFileTask

func CreateUploadFileTask added in v0.9.0

func CreateUploadFileTask(target *protos.RspUploadFile, fn func(ctx context.Context, fileHash string)) *UploadFileTask

func (*UploadFileTask) CanRetry added in v0.9.0

func (u *UploadFileTask) CanRetry() bool

func (*UploadFileTask) Continue added in v0.11.9

func (u *UploadFileTask) Continue()

func (*UploadFileTask) GetExcludedDestinations added in v0.9.0

func (u *UploadFileTask) GetExcludedDestinations() []*protos.PPBaseInfo

func (*UploadFileTask) GetLastTouch added in v0.11.9

func (u *UploadFileTask) GetLastTouch() time.Time

func (*UploadFileTask) GetState added in v0.11.9

func (u *UploadFileTask) GetState() int

func (*UploadFileTask) GetUploadFileHash added in v0.11.9

func (u *UploadFileTask) GetUploadFileHash() string

func (*UploadFileTask) GetUploadProgress added in v0.11.9

func (u *UploadFileTask) GetUploadProgress() float32

func (*UploadFileTask) GetUploadSpP2pAddress added in v0.11.9

func (u *UploadFileTask) GetUploadSpP2pAddress() string

func (*UploadFileTask) GetUploadTaskId added in v0.11.9

func (u *UploadFileTask) GetUploadTaskId() string

func (*UploadFileTask) GetUploadType added in v0.11.9

func (u *UploadFileTask) GetUploadType() protos.UploadType

func (*UploadFileTask) IsFatal added in v0.9.0

func (u *UploadFileTask) IsFatal() error

func (*UploadFileTask) IsFinished added in v0.9.0

func (u *UploadFileTask) IsFinished() bool

func (*UploadFileTask) NextDestination added in v0.9.0

func (u *UploadFileTask) NextDestination() *SlicesPerDestination

func (*UploadFileTask) Pause added in v0.11.9

func (u *UploadFileTask) Pause()

func (*UploadFileTask) SetFatalError added in v0.11.9

func (u *UploadFileTask) SetFatalError(err error)

func (*UploadFileTask) SetRspUploadFile added in v0.11.9

func (u *UploadFileTask) SetRspUploadFile(rspUploadFile *protos.RspUploadFile)

func (*UploadFileTask) SetScheduledJob added in v0.11.9

func (u *UploadFileTask) SetScheduledJob(fn func())

func (*UploadFileTask) SetState added in v0.11.9

func (u *UploadFileTask) SetState(state int)

func (*UploadFileTask) SetUploadSliceStatus added in v0.11.9

func (u *UploadFileTask) SetUploadSliceStatus(sliceHash string, status int) error

func (*UploadFileTask) SignalNewDestinations added in v0.9.0

func (u *UploadFileTask) SignalNewDestinations(ctx context.Context)

func (*UploadFileTask) SliceFailuresToReport added in v0.9.0

func (u *UploadFileTask) SliceFailuresToReport() ([]*protos.SliceHashAddr, []bool)

SliceFailuresToReport returns the list of slices that will require a new destination, and a boolean list of the same length indicating which slices actually failed

func (*UploadFileTask) Touch added in v0.11.9

func (u *UploadFileTask) Touch()

func (*UploadFileTask) UpdateRetryCount added in v0.11.9

func (u *UploadFileTask) UpdateRetryCount()

func (*UploadFileTask) UpdateSliceDestinationsForRetry added in v0.11.9

func (u *UploadFileTask) UpdateSliceDestinationsForRetry(newDestinations []*protos.SliceHashAddr)

func (*UploadFileTask) UploadToDestination added in v0.11.9

func (u *UploadFileTask) UploadToDestination(ctx context.Context, fn func(ctx context.Context, tk *UploadSliceTask) error)

type UploadProgress added in v0.9.0

type UploadProgress struct {
	Total     int64
	HasUpload int64
}

UploadProgress represents the progress for an ongoing upload

type UploadSliceTask

type UploadSliceTask struct {
	RspUploadFile *protos.RspUploadFile
	RspBackupFile *protos.RspBackupStatus
	SliceNumber   uint64
	SliceHash     string
	Type          protos.UploadType
	Data          []byte
}

UploadSliceTask represents a slice upload task that is in progress

func CreateUploadSliceTask added in v0.9.0

func CreateUploadSliceTask(ctx context.Context, slice *SliceWithStatus, uploadTask *UploadFileTask) (*UploadSliceTask, error)

func GetReuploadSliceTask added in v0.8.0

func GetReuploadSliceTask(ctx context.Context, slice *SliceWithStatus, ppInfo *protos.PPBaseInfo, uploadTask *UploadFileTask) (*UploadSliceTask, error)

type VideoCacheTask added in v0.6.0

type VideoCacheTask struct {
	Slices     []*protos.DownloadSliceInfo
	FileHash   string
	DownloadCh chan bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL