waypoint

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 3 Imported by: 0

Documentation

Overview

Package waypoint implements an opinionated scheme for coordinating limited concurrency over a variably-sized set of cooperating callers. This is accomplished primarily through the provided types Worker and Waypoint.

A Worker represents a cooperating unit of work (most often a goroutine) intended for concurrent execution along with other Workers. The lifecycle for each Worker transitions through three states: Waiting, Active, and Finished (albeit, Waiting workers are never visible to the caller).

A Waypoint acts as a coordination point over a group of Workers by ensuring only a set number may be in the Active state at any given time. The number of Workers that a Waypoint allows in the Active state is referred to as its capacity. Workers wishing to become Active while a Waypoint is at (or above) its set capacity are blocked until one or more currently Active Workers are transitioned to the Finished state.

As an analogy, you could think of a Waypoint as similar to a doorman at a busy nightclub whose job it is to prevent the club from becoming too crowded (and possibly unsafe) by ensuring there are never too many people in the building at the same time.

Here's an (obscenely contrived) example showing how to use a Waypoint:

func ProcessStuff(ctx context.Context, inch <-chan *Stuff, outch chan<- *Stuff) error {
  defer close(outch)

  // Create a Waypoint with capacity for 5 concurrent Workers.
  wp := waypoint.New(5)

  for stuff := range inch {
    // As each "stuff" arrives from "inch" we create a new Worker by
    // calling the Waypoint's Wait method. This call will immediately
    // return an Active Worker until we reach the Waypoint's capacity
    // of 5. Subsequent calls will block until one or more of the
    // Active Workers is moved to the Finished state thus exposing
    // more capacity.
    a, err := wp.Wait(ctx)
    if err != nil {
      return err
    }

    // Next, we'll create a new goroutine to process our stuff and then
    // send it to outch. On return, this function calls a.Done() to move
    // this Worker into the Finished state which will unblock one of the
    // above blocked calls to Wait (that should be in a separate goroutine,
    // right?)
    //
    // When used in this way, there will never be more than 5 of these
    // goroutines at any given time (unless, of course, you alter the
    // Waypoint's capacity).
    go func(stuff *Stuff) {
      defer a.Done()
      //
      // Do something with the stuff, then send it to 'outch'.
      //
      outch <- stuff
    }(stuff)
  }

  return nil
}

For an even better use case, see:

"github.com/go-sage/synctools/pkg/pipeline"

A Waypoint's capacity is not a set value; it may be adjusted over time to allow for dynamic scalling. When capacity is reduced, Active Workers are unaffected but as each transitions to the Finished state, Waiting Workers will remain blocked until the number of Active Workers drops below the new capacity value. Conversely, if capacity is increased, enough Waiting Workers will immediately unblock in order to consume the newly available capacity.

Note that a Waypoint is not a queue; the order in which Workers become unblocked is unrelated to the order they're created.

Index

Constants

View Source
const (
	// Waiting Workers are those that are ready to do work but are not yet
	// allowed to do so because their associated Waypoint is at or above its
	// set capacity.
	Waiting = State("Waiting")

	// Active Workers are those that are currently consuming capacity from
	// their associated Waypoint.
	Active = State("Active")

	// Finished Workers are those who have completed their work and are no
	// longer consuming capacity from their associated Waypoint.
	Finished = State("Finished")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Metrics

type Metrics struct {
	Timestamp  time.Time     // Time these metrics were gathered
	Capacity   int           // Waypoint's current capacity
	Waiting    int           // Current number of waiting Workers
	Active     int           // Current number of active Workers
	Finished   int           // Current number of finished Workers
	WaitTime   time.Duration // Total accumulated Wait time
	ActiveTime time.Duration // Total accumulated Active time
}

Metrics represents point-in-time metrics for a Waypoint.

type State

type State string

State is the type used to populate a Worker's State field.

type Waypoint

type Waypoint struct {
	// contains filtered or unexported fields
}

A Waypoint is a coordination point that ensure only a set number of Workers are allowed to do work concurrently.

func New

func New(capacity int) *Waypoint

New returns a new Waypoint initialized to the provided capacity.

func (*Waypoint) Done

func (w *Waypoint) Done() <-chan struct{}

Done marks the receiver as closed thus denying any new Workers to be added through its Wait method. Currently Active Workers are allowed to continue and currently Waiting Workers will become Active when/if capacity becomes available. However, since capacity cannot be altered on a closed Waypoint, if Done is called with zero capacity, all Waiting Workers will be abandoned and will never become Active.

The returned channel will be closed once all actionable Workers have reached the Finished state.

func (*Waypoint) Metrics

func (w *Waypoint) Metrics() Metrics

Metrics returns a point-in-time Metrics value for the receiver.

func (*Waypoint) Resize

func (w *Waypoint) Resize(newcap int) int

Resize sets the receiver's capacity to newcap returning the previous capacity value. A value of -1 is returned if a) the receiver is nil, b) newcap is less than zero, or c) the receiver has been closed.

This method may be called at any time on a non-closed Waypoint having any number of Workers in any State.

If the receiver's capacity is increased by this call, the receiver will activate Waiting workers allowing this new capacity to be consumed. If capacity is reduced, Worker completion will not start new workers until the number of Active Workers drops below the new capacity level.

Note that setting capacity to zero will allow currently Active Workers to complete but will not allow Waiting workers to become Active until such time that capacity is increased. Also, since this method cannot be called on a closed Waypoint, setting capacity to zero then closing the Waypoint will abandon all Waiting Workers.

func (*Waypoint) Wait

func (w *Waypoint) Wait(ctx context.Context) (*Worker, error)

Wait returns an Active *Worker ready to do some work. If the receiver has available capacity, Wait returns immediately, otherwise it blocks until capacity is made available. If the provided context is canceled or times out while waiting, a nil *Worker is returned along with the error value returned by ctx.Err().

type Worker

type Worker struct {
	ID    uint64
	State State
	// contains filtered or unexported fields
}

A Worker represents an individual unit of work (most often a goroutine) issued by an associated Waypoint. Each Worker has a unique ID and exists in one of three states: Waiting, Active, or Finished.

Note that, since each Worker maintains a reference to the Waypoint that created it, no Worker should be copied once it has been created.

func (*Worker) Done

func (w *Worker) Done()

Done is called to transition the receiver to the Finished state. If this drops the associated Waypoint below its set, non-zero, capacity -- and the Waypoint has not yet been closed -- a Worker from the associated Waypoint's pool of Waiting Workers will be moved to the Active state to begin work.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL