Package dsset implements a particular flavor of datastore-backed set.

Due to its internal structure, it requires some maintenance on behalf of the caller to periodically cleanup removed items (aka tombstones).

Items added to the set should have unique IDs, at least for the duration of some configurable time interval, as defined by TombstonesDelay property. It means removed items can't be added back to the set right away (the set will think they are already there). This is required to make 'Add' operation idempotent.

TombstonesDelay is assumed to be much larger than time scale of all "fast" processes in the system, in particular all List+Pop processes. For example, if List+Pop is expected to take 1 min, TombstonesDelay should be >> 1 min (e.g. 5 min). Setting TombstonesDelay to very large value is harmful though, since it may slow down 'List' and 'Pop' (by allowing more garbage that will have to be filtered out).

Properties (where N is current size of the set):

* Batch 'Add' with configurable QPS limit, O(1) performance.
* Transactional consistent 'Pop' (1 QPS limit), O(N) performance.
* Non-transactional consistent 'List' (1 QPS limit), O(N) performance.
* Popped items can't be re-added until their tombstones expire.

These properties make dsset suitable for multiple producers, single consumer queues, where order of items is not important, each item has a unique identifier, and the queue size is small.

Structurally dsset consists of N+1 entity groups:

* N separate entity groups that contain N shards of the set.
* 1 entity group (with a configurable root) that holds tombstones.

It is safe to increase number of shards at any time. Decreasing number of shards is dangerous (but can be done with some more coding).

More shards make:

* Add() less contentious (so it can support more QPS).
* List() and CleanupGarbage() slower and more expensive.
* Pop() is not affected by number of shards.



This section is empty.


This section is empty.


func CleanupGarbage

func CleanupGarbage(c context.Context, cleanup ...Garbage) error

CleanupGarbage deletes entities used to store items under given tombstones.

This is datastore's MultiDelete RPC in disguise. Touches many entity groups. Must be called outside of transactions. Idempotent.

Can handle tombstones from multiple different sets at once. This is preferred over calling 'CleanupGarbage' multiple times (once per set), since it collapses multiple datastore RPCs into one.

This MUST be called before tombstones returned by 'List' are removed in 'Pop'. Failure to do so will make items reappear in the set.

Returns only transient errors. There's no way to know which items were removed and which weren't in case of an error.


type Garbage

type Garbage []*tombstone

Garbage is a list of tombstones to cleanup.

func FinishPop

func FinishPop(ctx context.Context, ops ...*PopOp) (tombs Garbage, err error)

FinishPop completes one or more pop operations (for different sets) by submitting changes to datastore.

Must be called within the same transaction that called BeginPop.

It returns a list of tombstones for popped items. The storage used by the items can be reclaimed right away by calling 'CleanupGarbage'. It is fine not to do so, 'List' will eventually return all tombstones that need cleaning anyway. Calling 'CleanupGarbage' as best effort is still beneficial though, since it will reduce the amount of garbage in the set.

Returns only transient errors.

type Item

type Item struct {
	ID    string // unique in time identifier of the item
	Value []byte // arbitrary value (<1 MB, but preferably much smaller)

Item is what's stored in the set.

type Listing

type Listing struct {
	Items   []Item  // all items in the set, in arbitrary order
	Garbage Garbage // tombstones that can be cleaned up now
	// contains filtered or unexported fields

Listing is returned by 'List' call.

It contains actual listing of items in the set, as well as a bunch of service information used by other operations ('CleanupGarbage' and 'Pop') to keep the set in a garbage-free and consistent state.

The only way to construct a correct Listing is to call 'List' method.

See comments for Set struct and List method for more info.

type PopOp

type PopOp struct {
	// contains filtered or unexported fields

PopOp is an in-progress 'Pop' operation.

See BeginPop.

func (*PopOp) CanPop

func (p *PopOp) CanPop(id string) bool

CanPop returns true if the given item can be popped from the set.

Returns false if this item has been popped before (perhaps in another transaction), or it's not in the the listing passed to BeginPop.

func (*PopOp) Pop

func (p *PopOp) Pop(id string) bool

Pop removed the item from the set and returns true if it was there.

Returns false if this item has been popped before (perhaps in another transaction), or it's not in the the listing passed to BeginPop.

type Set

type Set struct {
	ID              string         // global ID, used to construct datastore keys
	ShardCount      int            // number of entity groups to use for storage
	TombstonesRoot  *datastore.Key // tombstones entity parent key
	TombstonesDelay time.Duration  // how long to keep tombstones in the set

Set holds a set of Items and uses tombstones to achieve idempotency of Add.

Producers just call Add(...).

The consumer must run more elaborate algorithm that ensures atomicity of 'Pop' and takes care of cleaning up of the garbage. This requires a mix of transactional and non-transactional actions:

listing, err := set.List(ctx)
if err != nil {
  return err

if err := dsset.CleanupGarbage(ctx, listing.Garbage); err != nil {
  return err

... Fetch any additional info associated with 'listing.Items' ...

var garbage dsset.Garbage
err = datastore.RunInTransaction(ctx, func(ctx context.Context) error {
  op, err := set.BeginPop(ctx, listing)
  if err != nil {
    return err
  for _, itm := range listing.items {
    if op.Pop(item.ID) {
      // The item was indeed in the set and we've just removed it!
    } else {
      // Some other transaction has popped it already.
  garbage, err = dsset.FinishPop(ctx, op)
  return err
}, nil)
if err == nil {
  dsset.CleanupGarbage(ctx, garbage)  // best-effort cleanup
return err

func (*Set) Add

func (s *Set) Add(c context.Context, items []Item) error

Add idempotently adds a bunch of items to the set.

If items with given keys are already in the set, or have been deleted recently, they won't be re-added. No error is returned in this case. When retrying the call like that, the caller is responsible to pass exact same Item.Value, otherwise 'List' may return random variant of the added item.

Writes to some single entity group (not known in advance). If called outside of a transaction and the call fails, may add only some subset of items. Running inside a transaction makes this operation atomic.

Returns only transient errors.

func (*Set) BeginPop

func (s *Set) BeginPop(c context.Context, listing *Listing) (*PopOp, error)

BeginPop initiates 'Pop' operation.

Pop operation is used to transactionally remove items from the set, as well as cleanup old tombstones. It must be finished with 'dsset.FinishPop', even if no items have been popped: the internal state still can change in this case, since 'BeginPop' cleans up old tombstones. Even more, it is necessary to do 'Pop' if listing contains non-empty set of tombstones (regardless of whether the caller wants to actually pop any items from the set). This is part of the required set maintenance.

Requires a transaction. Modifies TombstonesRoot entity group (and only it).

Returns only transient errors. Such errors usually mean that the entire pop sequence ('List' + 'Pop') should be retried.

func (*Set) List

func (s *Set) List(c context.Context) (*Listing, error)

List returns all items that are currently in the set (in arbitrary order), as well as a set of tombstones that points to items that were previously popped and can be cleaned up now.

Must be called outside of transactions (panics otherwise). Reads many entity groups, including TombstonesRoot one.

The set of tombstones to cleanup should be passed to 'CleanupGarbage', and later to 'BeginPop' (via Listing), in that order. Not doing so will lead to accumulation of a garbage in the set that will slow down 'List' and 'Pop'.

Returns only transient errors.

Source Files