etcdqueue

package
v0.0.0-...-ca65d48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2018 License: MIT Imports: 13 Imported by: 0

README

Built on top of https://github.com/coreos/etcd.

See http://godoc.org/github.com/gyuho/dplearn/pkg/etcd-queue.

Documentation

Overview

Package etcdqueue implements queue service backed by etcd.

Index

Constants

View Source
const (
	// MaxWeight is the maximum value for item(job) weights.
	MaxWeight uint64 = 99999

	// MaxProgress is the progress value when the job is done!
	MaxProgress = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Item

type Item struct {
	// Bucket is the name or job category for namespacing.
	// All keys will be prefixed with bucket name.
	Bucket string `json:"bucket"`

	// CreatedAt is timestamp of item creation.
	CreatedAt time.Time `json:"created_at"`

	// Key is autogenerated based on timestamps and bucket name.
	// It is stored as a key in etcd.
	Key string `json:"key"`

	// Value contains any data (e.g. encoded computation results).
	Value string `json:"value"`

	// Progress is the progress status value (range from 0 to 'etcdqueue.MaxProgress').
	Progress int `json:"progress"`

	// Canceled is true if the item(or job) is canceled.
	Canceled bool `json:"canceled"`

	// Error contains any error message. It's defined as string for
	// different language interpolation.
	Error string `json:"error"`

	// RequestID is used/generated by external service,
	// to help identify each item.
	RequestID string `json:"request_id"`
}

Item represents a job item in the queue. Key is stored as a key, with serialized JSON data as a value.

func CreateItem

func CreateItem(bucket string, weight uint64, value string) *Item

CreateItem creates an item with auto-generated ID of unix nano seconds. The maximum weight(priority) is 99999.

func (*Item) Equal

func (item1 *Item) Equal(item2 *Item) error

Equal compares two items with truncated CreatedAt field string, to handle modified timestamp string after serialization

type ItemWatcher

type ItemWatcher <-chan *Item

ItemWatcher is receive-only channel, used for broadcasting status updates.

type Op

type Op struct {
	// contains filtered or unexported fields
}

Op represents an operation that queue can execute.

type OpOption

type OpOption func(*Op)

OpOption configures queue operations.

func WithTTL

func WithTTL(dur time.Duration) OpOption

WithTTL configures TTL.

type Queue

type Queue interface {
	// Add adds an item to the queue.
	Add(ctx context.Context, it *Item, opts ...OpOption) error

	// Pop returns ItemWatcher that returns the first item in the queue.
	// It blocks until there is at least one item to return.
	Pop(ctx context.Context, bucket string) ItemWatcher

	// Stop stops the queue service and any embedded clients.
	Stop()

	// Client returns the client.
	Client() *clientv3.Client

	// ClientEndpoints returns the client endpoints.
	ClientEndpoints() []string
}

Queue is the queue service backed by etcd.

func NewEmbeddedQueue

func NewEmbeddedQueue(ctx context.Context, cport, pport int, dataDir string) (Queue, error)

NewEmbeddedQueue starts a new embedded etcd server. cport is the TCP port used for etcd client request serving. pport is for etcd peer traffic, and still needed even if it's a single-node cluster.

func NewQueue

func NewQueue(cli *clientv3.Client) (Queue, error)

NewQueue creates a new queue from given etcd client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL