taskqueue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2018 License: MIT Imports: 1 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrAttemptToChangeStatusOfTaskInProcessing 是当在 OverwritingFor 中设置了 TaskStatusProcessing 时返回的错误
	ErrAttemptToChangeStatusOfTaskInProcessing = errors.New("taskqueue: Change status of tasks in prcoessing is not allowed")

	// ErrAttemptToChangeStatusOfTaskToProcessing 是当尝试将任务状态手动设置为处理中时返回的错误
	ErrAttemptToChangeStatusOfTaskToProcessing = errors.New("taskqueue: Change status of tasks to prcoessing is not allowed")

	// ErrInvaildPriority 是当优先级非正数时返回的错误
	ErrInvaildPriority = errors.New("taskqueue: Priority must > 0")

	// ErrTaskExists 是当任务本身就存在(不一定在等待队列中)时返回的错误
	ErrTaskExists = errors.New("taskqueue: Task already exists")

	// ErrNoTasksInPending 是没有可返回的等待中的任务时返回的错误
	ErrNoTasksInPending = errors.New(`taskqueue: No task in the pending queue`)
)

Functions

This section is empty.

Types

type EnqueueOptions

type EnqueueOptions struct {
	// Priority 是优先级
	Priority int
	// ToHead 代表是否插入到队列相应优先级的最前方
	ToHead bool

	// Frozen 如果为真, 任务插入时即为冻结状态
	Frozen bool

	// MaxAttempts 是最多尝试的次数, 如果为 0 则被设置为默认值, 如果为负数则不限
	MaxAttempts int
}

EnqueueOptions 是插入任务时的选项

type ProcessResult

type ProcessResult int

ProcessResult 是任务处理的结果

const (
	// ProcessResultSuccessful 代表任务处理成功
	// 任务之后会标记为已完成
	ProcessResultSuccessful ProcessResult = iota
	// ProcessResultFailed 代表任务失败了, 但是可以根据机会重试
	ProcessResultFailed
	// ProcessResultRetry 代表任务应该重试, 不计为失败
	ProcessResultRetry
	// ProcessResultGivenUp 代表放弃任务
	ProcessResultGivenUp
	// ProcessResultShouldBeExcluded 代表移除任务
	ProcessResultShouldBeExcluded
	// ProcessResultShouldBeFrozen 代表任务应被冻结
	ProcessResultShouldBeFrozen
)

type Refresher

type Refresher func(status TaskStatus, id int, taskType string, taskData []byte) TaskStatus

Refresher 用于刷新任务状态

type TaskQueue

type TaskQueue interface {
	//Enqueue(prefix, data []byte) (id int, err error)
	EnqueueWithOptions(taskType string, taskData []byte, opts EnqueueOptions) (id int, err error)

	//BatchUnfreezing(filter func(prefix, data []byte) bool) (err error)
	BatchRefreshStatuses(appliedStatuses TaskStatusSet, appliedTaskTypes []string, filter Refresher) (err error)

	DequeueForProcess() (id int, taskType string, taskData []byte, err error)
	ReportProcessResult(id int, result ProcessResult) (err error)

	Close() (err error)
}

TaskQueue 是会由爬虫使用的队列的接口 队列插入的规则: * 不存在的任务直接插入 * 存在且等待/冻结的任务, 会修改优先级相关状态及, 并依新旧尝试次数的差值修改相关状态

type TaskStatus

type TaskStatus uint8

TaskStatus 是任务在 task queue 中的状态 小于等于 0 的值代表已尝试的次数

const (
	// TaskStatusPending 代表任务正在等待执行
	TaskStatusPending TaskStatus = 1
	// TaskStatusFrozen 代表任务已被冻结, 在被解冻之前不会被取出队列
	TaskStatusFrozen TaskStatus = 1 << 1

	// TaskStatusProcessing 代表任务正在被处理
	TaskStatusProcessing TaskStatus = 1 << 2

	// TaskStatusGivenUp 代表超过尝试次数而放弃的任务
	TaskStatusGivenUp TaskStatus = 1 << 3

	// TaskStatusExcluded 代表任务被除外
	TaskStatusExcluded TaskStatus = 1 << 6
	// TaskStatusFinished 代表任务已完成
	TaskStatusFinished TaskStatus = 1 << 7
)

func (TaskStatus) String

func (s TaskStatus) String() (name string)

type TaskStatusSet

type TaskStatusSet TaskStatus

TaskStatusSet 是任务状态的集合

func (TaskStatusSet) GetStatuses

func (set TaskStatusSet) GetStatuses() (ss []TaskStatus)

GetStatuses 返回 TaskStatusSet 中的所有状态

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL