taskstore

package module
v0.0.0-...-e85c719 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2018 License: Apache-2.0 Imports: 10 Imported by: 1

README

Task Store

by Chris Monson

A simple, no-frills place to put tasks with safe coordination semantics.

tl;dr:

go get -d -t entrogo.com/taskstore
go install entrogo.com/taskstore/service/server
go install entrogo.com/taskstore/service/client

Of course, you can always clone the entire repository at http://github.com/shiblon/taskstore and hack on it yourself. If that floats your boat, patches and pull requests are always welcome.

The TaskStore is fault-tolerant transactional task maintenance software. Basically, it allows you to have multiple workers creating and consuming small chunks of data with some hard guarantees on consistency. This enables you, for example, to create things like a MapReduce framework: input data is transformed into map tasks, map workers consume tasks so long as there are any to consume, they then produce tasks corresponding to their map output, and reduce workers pick up those tasks and emit output data. Central to all of this is a process flow that is based on the production and consumption of tasks, all done in a fault-tolerant way that maintains consistency through multiple parallel accessors, crashes, and evictions.

It has been designed to be as modular and simple as possible, only implementing what is needed to fulfill the needed guarantees. This simplicity not only makes it easy to reason about so that it can be used correctly, but also makes it easier to maintain while preserving correctness. It is just a transactional task store, and does not implement anything else like a message service, nor does it depend on one. A sample server process has been included, as has a sample client library. These are by no means the only way in which the TaskStore can be used, but they do give an idea of how one might use it, and they can be used as is for many needs.

Introduction

The TaskStore is divided into three basic pieces:

  • TaskStore: basic in-process task manipulation.
  • Service: exposes an API over the network for task manipulation - uses the TaskStore.
  • Client: library that exposes Go calls to use the service.

The TaskStore is further divided into task management and journaling. The journaler is an interface that you can implement and pass into the TaskStore upon creation, allowing you to implement something different than what comes with the TaskStore. A basic append-only logging journaler is included.

The TaskStore Library

The TaskStore library can be used to open a persistent store of tasks and then manipulate them safely in a single process. The basic design is single-threaded to keep it very simple and easy to reason about and work with. There is only ever one reader/writer, with one exception that will be discussed in the section on journaling.

The basic idea of the TaskStore is that you can have multiple task "groups". These groups are simply names, and can be used in any way you choose. You might choose to use a group as a way of specifying task "type", or as a way of specifying a particular part of a pipeline. It's up to you. An example of how to use the TaskStore to make a MapReduce is given in taskstore_test.go (and can be viewed in godoc as an "Example").

Within these groups, tasks can be manipulated in a way that has some guarantees:

  • No currently-owned task can be updated or deleted by any other process.
  • No operation succeeds if any of the depended-upon tasks are missing.

The way these characteristics are enforced is through immutability and atomicity. Every Task is immutable. An update to an existing task atomically deletes the old Task (and thus its globally-unique ID) and creates a new one with the new specifications (new data, new ownership, or new "available time" when ownership expires). Thus the existence of a task ID is a guarantee that it is exactly the task that you think it is, and a missing task ID means that another process was responsible for doing something with it. A task is either there or it is not. It is never in an unexpected state.

Because a change in "available time" actually creates a new task, this means that "Claim"ing a task creates a new task ID with an available time in the future and deletes the old ID. Thus the old task is gone and therefore cannot be claimed by any other process, and the new task has a future available time and an assigned owner, also rendering it unclaimable by another process.

Some failure scenarios can help demonstrate the utility of this simple approach.

Task Abandoned

Frequently in parallel processing environments, a process busy doing a task will die. This can happen for various reasons, including eviction, data corruption, or process bugs. When it happens to a process that owns a task, that task is now orphaned for a period of time. Eventually, however, the ownership will expire on that task, and another process will be able to claim it.

Because claiming a task actually creates a new task with a new availability time, the old process will not be able to claim the same task.

Task Delayed

If a process is working, but ends up being delayed or is simply very slow, the task might expire while it is holding it. If this occurs, that task is now available to be picked up by another process, which may decide to start working on it. This means that two processes are working on the same task, but only the second will be able to commit the work because the task held by the first is no longer in the store (it disappeared when the second process claimed it). Thus, the first process, when attempting to delete the task or otherwise modify it to indicate that it is finished with it, will get an error that the task is no longer available and can abandon the work that it has completed.

Task Store Restarted

There are two approaches to running a task store: opportunistic journaling and strict journaling.

In the opportunistic scenario, requests involving tasks are fulfilled immediately from RAM, and the corresponding update is flushed to disk as soon as possible afterward. If a task store dies between sending out the task and flushing the update to disk, a worker will have confirmation that it, e.g., owns a task, but the task store will have no record of that task when it starts up again. If the work done for a task is idempotent (there is no harm in repeating work), then this is safe: the client can finish what it is doing and fail when it attempts to update or delete the corresponding task: it won't be there (or if a task with a similar ID is there because another process created one, it won't be owned by this worker).

With strict journaling things are even more straightforward: no task requests are fulfilled until after the task update is flushed to disk, so no work will be done on a task that the task store does not durably remember. If the task store crashes between flushing to disk and fulfilling the request, that simply means that a task exists that nobody will be able to claim until its expiry: an annoying but safe situation.

More on Journaling

When the TaskStore is opened, it is given a journal to which it can send transactions and through which it can initiate a snapshot. Commonly a journal just accesses a filesystem, but other approaches are also allowed, e.g., a database.

As mentioned above, the TaskStore has two journaling modes: strict and opportunistic. The opportunistic variant works by spawning a concurrent routine that listens for transactions on a channel and sends them to the journal when they arrive. The strict method simply writes transactions in the main thread before returning a result to the caller.

Even in the case of opportunistic journaling, all access to the in-memory store is serialized, with the exception of producing periodic snapshots.

A snapshot is produced when the number of recent transactions reaches some threshold, enabling start-up times to be reduced in the event of a failure. During a snapshot, the operation of the task store can continue normally, as careful attention has been given to allowing this exception to serialized access.

Anatomy of a Task

What is a Task? A Task is a piece of data with

  • A unique identifier,
  • A group name,
  • An available time,
  • An owner, and
  • User-specified data.

Task ID

The unique ID is always globally unique and new tasks always have a larger ID number than old tasks. There are never any other tasks in the store with the same ID, and once used, it will never be used again.

Group Name

Tasks are organized into "groups" with free-form names. Groups serve to partition tasks in useful ways. For instance, one can claim a new, random task from a particular group knowing only the name of that group. One can also list the tasks, owned or not, within a group. This can serve as a way of determining whether there is any of a particular kind of work left to be done. For example, in a mapreduce scenario, a "map" group might contain all of the map tasks that are not yet completed, and a "reduce" group might contain all of the unfinished reduce tasks.

Groups spring into existence when tasks are created within them, and they disappear when they are empty.

Available Time

The "available time" (AT) of a task is the time in milliseconds since the Epoch when a task can be claimed or owned by anyone. If the time is in the future, then the task is not available: some process owns it and is ostensbily busy working on it if the time is in the future. In fact, "claim"ing a task involves setting its owner ID and advancing its AT to a future time (and, of course, creating a new ID for it, as these operations create a new task).

Owner ID

Each client of the task store is required to provide a unique ID that represents it. This is often achieved by obtaining a random number. The client library, for example, works in precisely this way: when the library loads, it creates a unique ID, and any uses of that library will share it. This gives each process its own (probably) unique identifier. The task store does not assign identifiers to clients.

Data

Each task can optionally contain a slice of bytes, the data that describes the task. Sometimes a task requires no explanation; it's presence is sufficient. Most of the time, tasks will require some data. A map task, for example, will probably want to descirbe the file from which it is to read the data, and perhaps the offsets within that file. Because it is only a byte slice, the data is not interpreted in any way by the task store. It is simply passed around as an opaque entity.

Task Service

Users of the task store are of course welcome to create their own service using the taskstore library as the underlying mechanism for storage and maintenance of tasks. Even so, a taskstore service is included with this distribution that exposes the operations of the taskstore over HTTP using a simple RESTful API. Among other things, this approach assumes that all task data is a string, not a slice of bytes.

All communication with the HTTP service is done via JSON.

The retrieval entry points of the API are described here:

  • /groups: GET a list of non-empty groups
  • /task/<ID>: GET the given task, specified by numeric ID
  • /tasks/<ID>[,<ID>,...]: GET a list of tasks, by comma-separated ID list
  • /group/<Group>?limit=1&owned=true: GET the tasks for the specified group, optionally limiting the number returned and allowing owned tasks to be included with unowned tasks. Default is all unowned tasks.

In addition to retrieving, the store can be updated (including task claiming) by POSTing to

  • /update: add, update, or delete tasks
  • /claim: claim a task from a group

All of the above are described in their own section below.

/groups

Issue a GET request to /groups to return a JSON list of all non-empty group names in the task store. It may not be perfectly current.

/task/<ID>

Issue a GET request to /task/<ID> to obtain information about the task of the given ID. The ID is numeric (only digits), e.g., /task/523623. The returned JSON has the following format:

  {"id": 523623,
   "group": "map",
   "data": "/path/to/file:1024:2048",
   "timespec": 1388552400000,
   "ownerid": 2624}

The time is in milliseconds since the Epoch. The data is free-form, whatever was put into the data field when the task was last updated or created.

/tasks/<ID>[,<ID>,...]

This is like /task/<ID> except that you can specify a comma-separated list of IDs and a list of task JSON structures will be returned, e.g., issuing a GET to /tasks/523623,523624 would produce the following (provided that the tasks exist, otherwise nonexistent tasks return null):

  [{"id": 523623,
    "group": "map",
    "data": "/path/to/file:1024:2048",
    "timespec": 1388552400000,
    "ownerid": 2354},
   {"id": 523624,
    "group": "map",
    "data": "/path/to/file:2048:3096",
    "timespec": 1388552401000,
    "ownerid": 2354}]

/group/<Group>

To obtain a list of tasks from a specified group, issue a GET request to /group/<Group> where <Group> is the name of the group of interest.

Additionally the parameters limit and owned may be specified:

  • limit: numeric, return no more than the amount specified.
  • owned: boolean (any of 0, 1, "yes", "no", "true", "false), determines whether to allow owned tasks to be returned.

The default is no limit and only unowned tasks.

The result is the same as for /tasks/...: a list of tasks from the task store.

/update

To update tasks (including add, modify, or delete), issue a POST request to /update with the request information in a JSON object, the structure of which is exemplified here:

  {"clientid": 1643,
   "adds": [{"group": "map",
             "data": "/path/to/file:1024:2048"},
            {"group": "map",
             "data": "/path/to/file:2048:3096"}],
   "updates": [{"id": 14,
                "data": "some data for this task",
                "timespec": -60000}],
   "deletes": [16, 18, 20],
   "depends": [12, 13]}

The clientid should be a unique numeric ID for this particular requesting process. The included client library that uses this service assigns the ID randomly using the cypto/rand library. This is the suggested approach. Alternatively, a service could be designed to assign client IDs, but cryptographically strong random number generators are decentralized and convenient.

The adds list contains partial task information (the ID is omitted, for example, because it will be assigned by the task store, and is ignored if included). The group is the only thing that must be specified. All other fields have sensible defaults:

  • data defaults to the empty string
  • timespec defaults to "right now" in milliseconds since the Epoch (making the task unowned, since its time is already in the past).

The updates list contains the ID of the task and need not contain any other fields. If it does not, then the task will be changed (deleted and a new one created) with no data and the timespec set to the present time. The group is never changed. If you want to move a task from one group to another, you must delete the old one and add a new one with the same data.

You can renew ownership of a task by specifying a future timespec. Alternatively, if you specify a negative timespec, the new timespec will be the present time plus the absolute value of what you specify.(e.g., -60000 means "set to one minute from right now", where 60000 means "set to 60 seconds after the Epoch", which was a long time ago). This also works for adds.

The deletes list is nothing more than a list of integer task IDs to delete.

The depends list allows you to specify that the transaction cannot complete unless the given task IDs are present. None of those task IDs will be changed in any way, but the other tasks cannot be changed, added, or deleted unless the depends list is all currently in the task store.

This does not only mean that the conceptual tasks exist, but also that whatever tasks you specify are unmodified, since a task ID changes when the task is updated by any process.

An update either completely succeeds or completely fails. If any of the tasks specified by ID are not present (udpates, deletes, or depends), then the transaction will return an error indicating what happened.

If it succeeds, then it returns all of the new tasks in a single JSON list in the order specified: adds first, followed by updates. This way you can track the way that the ID changes across updates, and can do things like renew your claim on a changed task later on.

The JSON returned is an object with two fields: tasks and error. The tasks member is a list of tasks just like that returned by a GET request on /tasks, and the error member is a JSON object potentially containing several lists:

  • changes: a list of task IDs that failed because they were not present to be modified.
  • deletes: a list of task IDs that failed because they were not present to be deleted.
  • depends: a list of task IDs that were not present and thus kept other changes or deletions from happening.
  • owned: a list of task IDs that could not be changed because they were owned by another client.
  • bugs: a list of errors indicating that some unexpected conditions occurred.

/claim

To claim a task, POST a JSON object to /claim with the following layout:

  {"clientid": 1532,
   "group": "map",
   "duration": 60000,
   "depends": [14, 10]}

Like an update, a claim cannot succeed unless the dependent task IDs are present in the task store. When a claim does succeed, you will be given a random task from the specified group name, and will have ownership that extends duration milliseconds into the future.

The result is the same as with /update: either a list containing the claimed task, or a list of errors indicating why it failed.

It is good practice to claim a task for a relatively short time and refresh the claim periodically if work is progressing properly. This is done by issuing an update to the task with a future (or negative) timespec as described in the section for /update.

Documentation

Overview

Package taskstore implements a library for a simple task store. This provides abstractions for creating a simple task store process that manages data in memory and on disk. It can be used to implement a full-fledged task queue, but it is only the core storage piece. It does not, in particular, implement any networking.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadySnapshotting = errors.New("already snapshotting")
	ErrJournalClosed       = errors.New("journal closed")
	ErrAlreadyClosed       = errors.New("already closed")
)

Functions

func Now

func Now() int64

Now returns the current time in nanoseconds since the UTC epoch. This is the standard Go time granularity, so it works in all functions needing time without being multiplied by a time.Duration constant.

Types

type Task

type Task struct {
	ID      int64  `json:"id"`
	OwnerID int32  `json:"ownerid"`
	Group   string `json:"group"`

	// The "Available Time": nanoseconds from the Epoch (UTC) when this task
	// becomes available. When used in requests, a value <= 0 is subtracted
	// from "right now" to generate a positive time value. Thus, 0 becomes
	// "now", and -time.Second (-1e9) becomes "1 second from now".
	AT int64 `json:"at"`

	// Data holds the data for this task.
	// If you want raw bytes, you'll need to encode them
	// somehow.
	Data []byte `json:"data"`
}

Task is the atomic task unit. It contains a unique task, an owner ID, and an availability time (ms). The data is user-defined and can be basically anything.

0 (or less) is an invalid ID, and is used to indicate "please assign". A negative AT means "delete this task".

func NewTask

func NewTask(group string, data []byte) *Task

NewTask creates a new task for this owner and group.

func (*Task) Copy

func (t *Task) Copy() *Task

Copy performs a shallow copy of this task.

func (*Task) Key

func (t *Task) Key() int64

Key returns the ID, to satisfy the keyheap.Item interface. This allows tasks to be found and removed from the middle of the heap.

func (*Task) Priority

func (t *Task) Priority() int64

Priority returns an integer that can be used for heap ordering. In this case it's just the available time (ordering is from smallest to largest, or from oldest to newest).

func (*Task) String

func (t *Task) String() string

String formats this task into a nice string value.

type TaskStore

type TaskStore struct {
	// contains filtered or unexported fields
}

TaskStore maintains the tasks.

Example
// The task store is only the storage portion of a task queue. If you wish
// to implement a service, you can easily do so using the primitives
// provided. This example should give an idea of how you might do that.

// To create a task store, specify a journal implementation. A real
// implementation should use a lock of some kind to guarantee exclusivity.
jr, err := journal.OpenDiskLog("/tmp/taskjournal")
if err != nil {
	panic(fmt.Sprintf("could not create journal: %v", err))
}

// Then create the task store itself. You can create a "strict" store,
// which requires that all transactions be flushed to the journal before
// being committed to memory (and results returned), or "opportunistic",
// which commits to memory and returns while letting journaling happen in
// the background. If task execution is idempotent and it is always obvious
// when to retry, you can get a speed benefit from opportunistic
// journaling.
store, err := OpenStrict(jr)
if err != nil {
	fmt.Printf("error opening taskstore: %v\n", err)
	return
}
defer store.Close()

// To put a task into the store, call Update with the "add" parameter:
add := []*Task{
	NewTask("groupname", []byte("task info, any string")),
}

// Every user of the task store needs a unique "OwnerID". When implementing
// this as a service, the client library would likely assign this at
// startup, so each process gets its own (and cannot change it). This is
// one example of how to create an Owner ID.
clientID := int32(rand.Int() ^ os.Getpid())

// Request an update. Here you can add, modify, and delete multiple tasks
// simultaneously. You can also specify a set of task IDs that must be
// present (but will not be modified) for this operation to succeed.
results, err := store.Update(clientID, add, nil, nil, nil)

if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

// If successful, "results" will contain all of the newly-created tasks.
// Note that even a task modification is relaly a task creation: it deletes
// the old task and creates a new task with a new ID. IDs are guarnteed to
// increase monotonically.
fmt.Println(results)
Output:

Example (MapReduce)

ExampleTaskStore_mapReduce tests the taskstore by setting up a fake pipeline and working it for a while, just to make sure that things don't really hang up.

// We test the taskstore by creating a simple mapreduce pipeline.
// This produces a word frequency histogram for the text below by doing the
// following:
//
// - The lines of text create tasks, one for each line.
// - Map goroutines consume those tasks, producing reduce groups.
// - When all mapping is finished, one reduce task per group is created.
// - Reduce goroutines consume reduce tasks, indicating which group to pull tasks from.
// - They hold onto their reduce token, and so long as they own it, they
// 	 perform reduce tasks and, when finished, push results into the result group.
// - The results are finally read into a histogram.

type Data struct {
	Key   string
	Count int
}

lines := []string{
	"The fundamental approach to parallel computing in a mapreduce environment",
	"is to think of computation as a multi-stage process, with a communication",
	"step in the middle. Input data is consumed in chunks by mappers. These",
	"mappers produce key/value pairs from their own data, and they are designed",
	"to do their work in isolation. Their computation does not depend on the",
	"computation of any of their peers. These key/value outputs are then grouped",
	"by key, and the reduce phase begins. All values corresponding to a",
	"particular key are processed together, producing a single summary output",
	"for that key. One example of a mapreduce is word counting. The input",
	"is a set of documents, the mappers produce word/count pairs, and the",
	"reducers compute the sum of all counts for each word, producing a word",
	"frequency histogram.",
}

numMappers := 3
numReducers := 3
maxSleepTime := 500 * int64(time.Millisecond)

mainID := rand.Int31()

// Create a taskstore backed by a fake in-memory journal.
fs := journal.NewMemFS("/myfs")
jr, err := journal.OpenDiskLogInjectFS("/myfs", fs)
if err != nil {
	panic(fmt.Sprintf("failed to create journal: %v", err))
}
store, err := OpenStrict(jr)
if err != nil {
	fmt.Printf("error opening task store: %v\n", err)
	return
}
defer store.Close()

// And add all of the input lines.
toAdd := make([]*Task, len(lines))
for i, line := range lines {
	toAdd[i] = NewTask("map", []byte(line))
}

// Do the actual update.
_, err = store.Update(mainID, toAdd, nil, nil, nil)
if err != nil {
	panic(fmt.Sprintf("could not create task: %v", err))
}

// Start mapper workers.
for i := 0; i < numMappers; i++ {
	go func() {
		mapperID := rand.Int31()
		for {
			// Get a task for ten seconds.
			maptask, err := store.Claim(mapperID, "map", int64(10*time.Second), nil)
			if err != nil {
				panic(fmt.Sprintf("error retrieving tasks: %v", err))
			}
			if maptask == nil {
				time.Sleep(time.Duration(maxSleepTime))
				continue
			}
			// Now we have a map task. Split the data into words and emit reduce tasks for them.
			// The data is just a line from the text file.
			words := strings.Split(string(maptask.Data), " ")
			wm := make(map[string]int)
			for _, word := range words {
				word = strings.ToLower(word)
				word = strings.TrimSuffix(word, ".")
				word = strings.TrimSuffix(word, ",")
				wm[strings.ToLower(word)]++
			}
			// One task per word, each in its own group (the word's group)
			// This could just as easily be something in the filesystem,
			// and the reduce tasks would just point to them, but we're
			// using the task store because our data is small and because
			// we can.
			reduceTasks := make([]*Task, 0)
			for word, count := range wm {
				group := fmt.Sprintf("reduceword %s", word)
				reduceTasks = append(reduceTasks, NewTask(group, []byte(fmt.Sprintf("%d", count))))
			}
			delTasks := []int64{maptask.ID}
			_, err = store.Update(mapperID, reduceTasks, nil, delTasks, nil)
			if err != nil {
				panic(fmt.Sprintf("mapper failed: %v", err))
			}
		}
	}()
}

// Just wait for all map tasks to be deleted.
for {
	tasks := store.ListGroup("map", 1, true)
	if len(tasks) == 0 {
		break
	}
	time.Sleep(time.Duration(rand.Int63n(maxSleepTime) + 1))
}

// Now do reductions. To do this we list all of the reduceword groups and
// create a task for each, then we start the reducers.
//
// Note that there are almost certainly better ways to do this, but this is
// simple and good for demonstration purposes.
//
// Why create a task? Because tasks, unlike groups, can be exclusively
// owned and used as dependencies in updates.
groups := store.Groups()
reduceTasks := make([]*Task, 0, len(groups))
for _, g := range groups {
	if !strings.HasPrefix(g, "reduceword ") {
		continue
	}
	// Add the group name as a reduce task. A reducer will pick it up and
	// consume all tasks in the group.
	reduceTasks = append(reduceTasks, NewTask("reduce", []byte(g)))
}

_, err = store.Update(mainID, reduceTasks, nil, nil, nil)
if err != nil {
	panic(fmt.Sprintf("failed to create reduce tasks: %v", err))
}

// Finally start the reducers.
for i := 0; i < numReducers; i++ {
	go func() {
		reducerID := rand.Int31()
		for {
			grouptask, err := store.Claim(reducerID, "reduce", int64(30*time.Second), nil)
			if err != nil {
				panic(fmt.Sprintf("failed to get reduce task: %v", err))
			}
			if grouptask == nil {
				time.Sleep(time.Duration(maxSleepTime))
				continue
			}
			gtdata := string(grouptask.Data)
			word := strings.SplitN(gtdata, " ", 2)[1]

			// No need to claim all of these tasks, just list them - the
			// main task is enough for claims, since we'll depend on it
			// before deleting these guys.
			tasks := store.ListGroup(gtdata, 0, true)
			delTasks := make([]int64, len(tasks)+1)
			sum := 0
			for i, task := range tasks {
				delTasks[i] = task.ID
				val, err := strconv.Atoi(string(task.Data))
				if err != nil {
					fmt.Printf("oops - weird value in task: %v\n", task)
					continue
				}
				sum += val
			}
			delTasks[len(delTasks)-1] = grouptask.ID
			outputTask := NewTask("output", []byte(fmt.Sprintf("%04d %s", sum, word)))

			// Now we delete all of the reduce tasks, including the one
			// that we own that points to the group, and add an output
			// task.
			_, err = store.Update(reducerID, []*Task{outputTask}, nil, delTasks, nil)
			if err != nil {
				panic(fmt.Sprintf("failed to delete reduce tasks and create output: %v", err))
			}

			// No need to signal anything - we just deleted the reduce
			// task. The main process can look for no tasks remaining.
		}
	}()
}

// Just look for all reduce tasks to be finished.
for {
	tasks := store.ListGroup("reduce", 1, true)
	if len(tasks) == 0 {
		break
	}
	time.Sleep(time.Duration(rand.Int63n(maxSleepTime) + 1))
}

// And now we have the finished output in the task store.
outputTasks := store.ListGroup("output", 0, false)
freqs := make([]string, len(outputTasks))
for i, t := range outputTasks {
	freqs[i] = string(t.Data)
}
sort.Sort(sort.Reverse(sort.StringSlice(freqs)))

for i, f := range freqs {
	if i >= 10 {
		break
	}
	fmt.Println(f)
}
Output:


0008 the
0008 a
0006 of
0004 to
0004 their
0004 is
0004 in
0003 word
0003 mappers
0003 key
Example (Tasks)

ExampleTaskStore_tasks demonstrates the use of getting tasks by id.


Output:

func OpenOpportunistic

func OpenOpportunistic(journaler journal.Interface) (*TaskStore, error)

OpenOpportunistic returns a new TaskStore instance. This store will be opportunistically journaled, meaning that it is possible to update, delete, or create a task, get confirmation of it occurring, crash, and find that recently committed tasks are lost. If task execution is idempotent, this is safe, and is much faster, as it writes to disk when it gets a chance.

func OpenStrict

func OpenStrict(journaler journal.Interface) (*TaskStore, error)

OpenStrict returns a TaskStore with journaling done synchronously instead of opportunistically. This means that, in the event of a crash, the full task state will be recoverable and nothing will be lost that appeared to be commmitted. Use this if you don't mind slower mutations and really need committed tasks to stay committed under all circumstances. In particular, if task execution is not idempotent, this is the right one to use.

func (*TaskStore) AllTasks

func (t *TaskStore) AllTasks() []*Task

AllTasks returns a slice of every task in the store, sorted by ID. This can be an expensive operation, as it blocks all access while it copies the list of tasks, so don't do it at all when you care deeply about availability.

func (*TaskStore) Claim

func (t *TaskStore) Claim(owner int32, group string, duration int64, depends []int64) (*Task, error)

Claim attempts to find one random unowned task in the specified group and set the ownership to the specified owner. If successful, the newly-owned tasks are returned with their AT set to now + duration (in nanoseconds).

func (*TaskStore) Close

func (t *TaskStore) Close() error

Close shuts down the taskstore gracefully, finalizing the journal, etc.

func (*TaskStore) Groups

func (t *TaskStore) Groups() []string

Groups returns a list of all of the groups known to this task store.

func (*TaskStore) IsOpen

func (t *TaskStore) IsOpen() bool

func (*TaskStore) IsStrict

func (t *TaskStore) IsStrict() bool

func (*TaskStore) LatestTaskID

func (t *TaskStore) LatestTaskID() int64

LatestTaskID returns the most recently-assigned task ID.

func (*TaskStore) ListGroup

func (t *TaskStore) ListGroup(name string, limit int, allowOwned bool) []*Task

ListGroup tries to find tasks for the given group name. The number of tasks returned will be no more than the specified limit. A limit of 0 or less indicates that all possible tasks should be returned. If allowOwned is specified, then even tasks with AT in the future that are owned by other clients will be returned.

func (*TaskStore) NumTasks

func (t *TaskStore) NumTasks() int

NumTasks returns the number of tasks being managed by this store.

func (*TaskStore) Snapshot

func (t *TaskStore) Snapshot() error

Snapshot tries to force a snapshot to start immediately. It only fails if there is already one in progress.

func (*TaskStore) Snapshotting

func (t *TaskStore) Snapshotting() bool

Snapshotting indicates whether snapshotting is in progress.

func (*TaskStore) String

func (t *TaskStore) String() string

String formats this as a string. Shows minimal information like group names.

func (*TaskStore) Tasks

func (t *TaskStore) Tasks(ids []int64) []*Task

Tasks attempts to retrieve particular tasks from the store, specified by ID. The returned slice of tasks will be of the same size as the requested IDs, and some of them may be nil (if the requested task does not exist).

func (*TaskStore) Update

func (t *TaskStore) Update(owner int32, add, change []*Task, del, dep []int64) ([]*Task, error)

Update makes changes to the task store. The owner is the ID of the requester, and tasks to be added, changed, and deleted can be specified. If dep is specified, it is a list of task IDs that must be present for the update to succeed. On success, the returned slice of tasks will contain the concatenation of newly added tasks and changed tasks, in order (e.g., [add0, add1, add2, change0, change1, change2]). On failure, an error of type UpdateError will be returned with details about the types of errors and the IDs that caused them.

type UpdateError

type UpdateError struct {
	// Changes contains the list of tasks that were not present and could thus not be changed.
	Changes []int64

	// Deletes contains the list of IDs that could not be deleted.
	Deletes []int64

	// Depends contains the list of IDs that were not present and caused the update to fail.
	Depends []int64

	// Owned contains the list of IDs that were owned by another client and could not be changed.
	Owned []int64

	// Bugs contains a list of errors representing caller precondition failures (bad inputs).
	Bugs []error
}

UpdateError contains a map of errors, the key is the index of a task that was not present in an expected way. All fields are nil when empty.

func (UpdateError) Error

func (ue UpdateError) Error() string

Error returns an error string (and satisfies the Error interface).

func (UpdateError) HasBugs

func (ue UpdateError) HasBugs() bool

func (UpdateError) HasDependencyErrors

func (ue UpdateError) HasDependencyErrors() bool

func (UpdateError) HasErrors

func (ue UpdateError) HasErrors() bool

Directories

Path Synopsis
Package journal is an implementation and interface specification for an append-only journal with rotations.
Package journal is an implementation and interface specification for an append-only journal with rotations.
Package keyheap implements a library for a simple heap that allows peeking and popping from the middle based on a Key() in the stored interface.
Package keyheap implements a library for a simple heap that allows peeking and popping from the middle based on a Key() in the stored interface.
service
client
Package client implements a client for the HTTP taskstore service.
Package client implements a client for the HTTP taskstore service.
protocol
Definitions of protocol structures.
Definitions of protocol structures.
taskserver
A RESTful HTTP-based task service that uses the taskstore.
A RESTful HTTP-based task service that uses the taskstore.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL