Back to godoc.org

Package sync

v0.2.4
Latest Go to latest
Published: Jul 27, 2020 | Licenses: Apache-2.0 , MIT | Module: github.com/testground/sdk-go

Overview

The sync package contains the distributed coordination and choreography facility of Testground.

The sync service is lightweight, and uses Redis recipes to implement coordination primitives like barriers, signalling, and pubsub. Additional primitives like locks, semaphores, etc. are in scope, and may be added in the future.

Constructing sync.Clients

To use the sync service, test plan writers must create a sync.DefaultClient via the sync.NewBoundClient constructor, passing a context that governs the lifetime of the sync.DefaultClient, as well as a runtime.RunEnv to bind to. All sync operations will be automatically scoped/namespaced to the runtime.RunEnv.

Infrastructure services, such as sidecar instances, can create generic sync.Clients via the sync.NewGenericClient constructor. Such clients are not bound/constrained to a runtime.RunEnv, and instead are required to pass in runtime.RunParams in the context.Context to all operations. See WithRunParams for more info.

Recommendations for test plan writers

All constructors and methods on sync.DefaultClient have Must* versions, which panic if an error occurs. Using these methods in combination with runtime.Invoke is safe, as the runner captures panics and records them as test crashes. The resulting code will be less pedantic.

We have added sugar methods that compose basic primitives into frequently used katas, such as client.PublishSubscribe, client.SignalAndWait, client.PublishAndWait, etc. These katas also have Must* variations. We encourage developers to adopt them in order to streamline their code.

Garbage collection

The sync service is decentralised: it has no centralised actor, dispatcher, or coordinator that supervises the lifetime of a test. All participants in a test hit Redis directly, using its operations to implement the sync primitives. As a result, keys from past runs can accumulate.

Sync clients can participate in collaborative garbage collection by enabling background GC:

client.EnableBackgroundGC(ch) // see method godoc for info on ch

GC uses SCAN and OBJECT IDLETIME operations to find keys to purge, and its configuration is controlled by the GC* variables.

In the standard testground architecture, only sidecar processes are participate in GC:

Index

Package Files

Constants

const (
	RedisPayloadKey = "p"

	EnvRedisHost = "REDIS_HOST"
	EnvRedisPort = "REDIS_PORT"
)
const (
	StateStopped      = 0
	StateInterrupting = 1
	StateRunning      = 2
)

Variables

var DefaultRedisOpts = redis.Options{
	MinIdleConns:       2,
	PoolSize:           5,
	PoolTimeout:        3 * time.Minute,
	MaxRetries:         30,
	MinRetryBackoff:    1 * time.Second,
	MaxRetryBackoff:    3 * time.Second,
	DialTimeout:        10 * time.Second,
	ReadTimeout:        10 * time.Second,
	WriteTimeout:       10 * time.Second,
	IdleCheckFrequency: 30 * time.Second,
	MaxConnAge:         2 * time.Minute,
}
var ErrNoRunParameters = fmt.Errorf("no run parameters provided")

ErrNoRunParameters is returned by the generic client when an unbound context is passed in. See WithRunParams to bind RunParams to the context.

var GCFrequency = 30 * time.Minute

GCFrequency is the frequency at which periodic GC runs, if enabled.

var GCLastAccessThreshold = 30 * time.Minute

GCLastAccessThreshold specifies the minimum amount of time that should've elapsed since a Redis key was last accessed to be pruned by garbage collection.

func GetRunParams

func GetRunParams(ctx context.Context) *runtime.RunParams

GetRunParams extracts the RunParams from a context, previously set by calling WithRunParams.

func NewInmemClient

func NewInmemClient() *inmemClient

NewInmemClient creates an in-memory sync client for testing.

func WithRunParams

func WithRunParams(ctx context.Context, rp *runtime.RunParams) context.Context

WithRunParams returns a context that embeds the supplied RunParams, such that it can be passed to a GenericClient.

type Barrier

type Barrier struct {
	C chan error
	// contains filtered or unexported fields
}

Barrier represents a barrier over a State. A Barrier is a synchronisation checkpoint that will fire once the `target` number of entries on that state have been registered.

type Client

type Client interface {
	io.Closer

	Publish(ctx context.Context, topic *Topic, payload interface{}) (seq int64, err error)
	Subscribe(ctx context.Context, topic *Topic, ch interface{}) (*Subscription, error)
	PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64, err error)
	PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error)

	Barrier(ctx context.Context, state State, target int) (*Barrier, error)
	SignalEntry(ctx context.Context, state State) (after int64, err error)
	SignalAndWait(ctx context.Context, state State, target int) (seq int64, err error)

	MustBarrier(ctx context.Context, state State, target int) *Barrier
	MustSignalEntry(ctx context.Context, state State) int64
	MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) *Subscription
	MustPublish(ctx context.Context, topic *Topic, payload interface{}) (seq int64)

	MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64)
	MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription)
	MustSignalAndWait(ctx context.Context, state State, target int) (seq int64)
}

type DefaultClient

type DefaultClient struct {
	// contains filtered or unexported fields
}

func MustBoundClient

func MustBoundClient(ctx context.Context, runenv *runtime.RunEnv) *DefaultClient

MustBoundClient creates a new bound client by calling NewBoundClient, and panicking if it errors.

func MustGenericClient

func MustGenericClient(ctx context.Context, log *zap.SugaredLogger) *DefaultClient

MustGenericClient creates a new generic client by calling NewGenericClient, and panicking if it errors.

func NewBoundClient

func NewBoundClient(ctx context.Context, runenv *runtime.RunEnv) (*DefaultClient, error)

NewBoundClient returns a new sync DefaultClient that is bound to the provided RunEnv. All operations will be automatically scoped to the keyspace of that run.

The context passed in here will govern the lifecycle of the client. Cancelling it will cancel all ongoing operations. However, for a clean closure, the user should call Close().

For test plans, a suitable context to pass here is the background context.

func NewGenericClient

func NewGenericClient(ctx context.Context, log *zap.SugaredLogger) (*DefaultClient, error)

NewGenericClient returns a new sync DefaultClient that is bound to no RunEnv. It is intended to be used by testground services like the sidecar.

All operations expect to find the RunParams of the run to scope its actions inside the supplied context.Context. Call WithRunParams to bind the appropriate RunParams.

The context passed in here will govern the lifecycle of the client. Cancelling it will cancel all ongoing operations. However, for a clean closure, the user should call Close().

A suitable context to pass here is the background context of the main process.

func (*DefaultClient) Barrier

func (c *DefaultClient) Barrier(ctx context.Context, state State, target int) (*Barrier, error)

Barrier sets a barrier on the supplied State that fires when it reaches its target value (or higher).

The caller should monitor the channel C returned inside the Barrier object. If the barrier is satisfied, the value sent will be nil.

When the context fires, the context's error will be propagated instead. The same will occur if the DefaultClient's context fires.

If an internal error occurs,

The returned Barrier object contains a channel (C) that fires when the barrier reaches its target, is cancelled, or fails.

The Barrier channel is owned by the DefaultClient, and by no means should the caller close it. It is safe to use a non-cancellable context here, like the background context. No cancellation is needed unless you want to stop the process early.

func (*DefaultClient) Close

func (c *DefaultClient) Close() error

Close closes this client, cancels ongoing operations, and releases resources.

func (*DefaultClient) EnableBackgroundGC

func (c *DefaultClient) EnableBackgroundGC(notifyCh chan error)

EnableBackgroundGC enables a background process to perform periodic GC. It ticks once when called, then with GCFrequency frequency.

An optional notifyCh can be passed in to be notified everytime GC runs, with the result of each run. This is mostly used for testing.

func (DefaultClient) MustBarrier

func (c DefaultClient) MustBarrier(ctx context.Context, state State, required int) *Barrier

MustBarrier calls Barrier, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustPublish

func (c DefaultClient) MustPublish(ctx context.Context, topic *Topic, payload interface{}) (seq int64)

MustPublish calls Publish, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustPublishAndWait

func (c DefaultClient) MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64)

MustPublishAndWait calls PublishAndWait, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustPublishSubscribe

func (c DefaultClient) MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription)

MustPublishSubscribe calls PublishSubscribe, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustSignalAndWait

func (c DefaultClient) MustSignalAndWait(ctx context.Context, state State, target int) (seq int64)

MustSignalAndWait calls SignalAndWait, panicking if it errors.

Suitable for shorthanding in test plans.

func (DefaultClient) MustSignalEntry

func (c DefaultClient) MustSignalEntry(ctx context.Context, state State) (current int64)

MustSignalEntry calls SignalEntry, panicking if it errors.

Suitable for shorthanding in test plans.

func (*DefaultClient) MustSubscribe

func (c *DefaultClient) MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription)

MustSubscribe calls Subscribe, panicking if it errors.

Suitable for shorthanding in test plans.

func (*DefaultClient) Publish

func (c *DefaultClient) Publish(ctx context.Context, topic *Topic, payload interface{}) (seq int64, err error)

Publish publishes an item on the supplied topic. The payload type must match the payload type on the Topic; otherwise Publish will error.

This method returns synchronously, once the item has been published successfully, returning the sequence number of the new item in the ordered topic, or an error if one ocurred, starting with 1 (for the first item).

If error is non-nil, the sequence number must be disregarded.

func (DefaultClient) PublishAndWait

func (c DefaultClient) PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64, err error)

PublishAndWait composes Publish and a Barrier. It first publishes the provided payload to the specified topic, then awaits for a barrier on the supplied state to reach the indicated target.

If any operation fails, PublishAndWait short-circuits and returns a non-nil error and a negative sequence. If Publish succeeds, but the Barrier fails, the seq number will be greater than zero.

func (DefaultClient) PublishSubscribe

func (c DefaultClient) PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error)

PublishSubscribe publishes the payload on the supplied Topic, then subscribes to it, sending paylods to the supplied channel.

If any operation fails, PublishSubscribe short-circuits and returns a non-nil error and a negative sequence. If Publish succeeds, but Subscribe fails, the seq number will be greater than zero, but the returned Subscription will be nil, and the error, non-nil.

func (*DefaultClient) RunGC

func (c *DefaultClient) RunGC() error

RunGC runs a round of GC. GC consists of paging through the Redis database with SCAN, fetching the last access time of all keys via a pipelined OBJECT IDLETIME, and deleting the keys that have been idle for greater or equal to GCLastAccessThreshold.

func (DefaultClient) SignalAndWait

func (c DefaultClient) SignalAndWait(ctx context.Context, state State, target int) (seq int64, err error)

SignalAndWait composes SignalEntry and Barrier, signalling entry on the supplied state, and then awaiting until the required value has been reached.

The returned error will be nil if the barrier was met successfully, or non-nil if the context expired, or some other error ocurred.

func (*DefaultClient) SignalEntry

func (c *DefaultClient) SignalEntry(ctx context.Context, state State) (after int64, err error)

SignalEntry increments the state counter by one, returning the value of the new value of the counter, or an error if the operation fails.

func (*DefaultClient) Subscribe

func (c *DefaultClient) Subscribe(ctx context.Context, topic *Topic, ch interface{}) (*Subscription, error)

Subscribe subscribes to a topic, consuming ordered, typed elements from index 0, and sending them to channel ch.

The supplied channel must be buffered, and its type must be a value or pointer type matching the topic type. If these conditions are unmet, this method will error immediately.

The caller must consume from this channel promptly; failure to do so will backpressure the DefaultClient's subscription event loop.

type State

type State string

State represents a state in a distributed state machine, identified by a unique string within the test case.

func (State) Key

func (s State) Key(rp *runtime.RunParams) string

Key gets the Redis key for this State, contextualized to a set of RunParams.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription represents a receive channel for data being published in a Topic.

func (*Subscription) Done

func (s *Subscription) Done() <-chan error

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic represents a meeting place for test instances to exchange arbitrary data.

func NewTopic

func NewTopic(name string, typ interface{}) *Topic

NewTopic constructs a Topic with the provided name, and the type of the supplied value, derived via reflect.TypeOf, unless the supplied value is already a reflect.Type. This method does not retain actual value from which the type is derived.

func (Topic) Key

func (t Topic) Key(rp *runtime.RunParams) string

Key gets the key for this Topic, contextualized to a set of RunParams.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier