Documentation
¶
Overview ¶
The sync package contains the distributed coordination and choreography facility of Testground.
The sync service is lightweight, and uses Redis recipes to implement coordination primitives like barriers, signalling, and pubsub. Additional primitives like locks, semaphores, etc. are in scope, and may be added in the future.
Constructing sync.Clients ¶
To use the sync service, test plan writers must create a sync.Client via the sync.NewBoundClient constructor, passing a context that governs the lifetime of the sync.Client, as well as a runtime.RunEnv to bind to. All sync operations will be automatically scoped/namespaced to the runtime.RunEnv.
Infrastructure services, such as sidecar instances, can create generic sync.Clients via the sync.NewGenericClient constructor. Such clients are not bound/constrained to a runtime.RunEnv, and instead are required to pass in runtime.RunParams in the context.Context to all operations. See WithRunParams for more info.
Recommendations for test plan writers ¶
All constructors and methods on sync.Client have Must* versions, which panic if an error occurs. Using these methods in combination with runtime.Invoke is safe, as the runner captures panics and records them as test crashes. The resulting code will be less pedantic.
We have added sugar methods that compose basic primitives into frequently used katas, such as client.PublishSubscribe, client.SignalAndWait, client.PublishAndWait, etc. These katas also have Must* variations. We encourage developers to adopt them in order to streamline their code.
Garbage collection ¶
The sync service is decentralised: it has no centralised actor, dispatcher, or coordinator that supervises the lifetime of a test. All participants in a test hit Redis directly, using its operations to implement the sync primitives. As a result, keys from past runs can accumulate.
Sync clients can participate in collaborative garbage collection by enabling background GC:
client.EnableBackgroundGC(ch) // see method godoc for info on ch
GC uses SCAN and OBJECT IDLETIME operations to find keys to purge, and its configuration is controlled by the GC* variables.
In the standard testground architecture, only sidecar processes are participate in GC:
Index ¶
- Constants
- Variables
- func GetRunParams(ctx context.Context) *runtime.RunParams
- func WithRunParams(ctx context.Context, rp *runtime.RunParams) context.Context
- type Barrier
- type Client
- func MustBoundClient(ctx context.Context, runenv *runtime.RunEnv) *Client
- func MustGenericClient(ctx context.Context, log *zap.SugaredLogger) *Client
- func NewBoundClient(ctx context.Context, runenv *runtime.RunEnv) (*Client, error)
- func NewGenericClient(ctx context.Context, log *zap.SugaredLogger) (*Client, error)
- func (c *Client) Barrier(ctx context.Context, state State, target int) (*Barrier, error)
- func (c *Client) Close() error
- func (c *Client) EnableBackgroundGC(notifyCh chan error)
- func (c *Client) MustBarrier(ctx context.Context, state State, required int) *Barrier
- func (c *Client) MustPublish(ctx context.Context, topic *Topic, payload interface{}) (seq int64)
- func (c *Client) MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, ...) (seq int64)
- func (c *Client) MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription)
- func (c *Client) MustSignalAndWait(ctx context.Context, state State, target int) (seq int64)
- func (c *Client) MustSignalEntry(ctx context.Context, state State) (current int64)
- func (c *Client) MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription)
- func (c *Client) Publish(ctx context.Context, topic *Topic, payload interface{}) (seq int64, err error)
- func (c *Client) PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, ...) (seq int64, err error)
- func (c *Client) PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error)
- func (c *Client) RunGC() error
- func (c *Client) SignalAndWait(ctx context.Context, state State, target int) (seq int64, err error)
- func (c *Client) SignalEntry(ctx context.Context, state State) (after int64, err error)
- func (c *Client) Subscribe(ctx context.Context, topic *Topic, ch interface{}) (*Subscription, error)
- type State
- type Subscription
- type Topic
Constants ¶
const ( RedisPayloadKey = "p" EnvRedisHost = "REDIS_HOST" EnvRedisPort = "REDIS_PORT" RedisHostname = "testground-redis" HostHostname = "host.docker.internal" )
const ( StateStopped = 0 StateInterrupting = 1 StateRunning = 2 )
Variables ¶
var DefaultRedisOpts = redis.Options{ MinIdleConns: 2, PoolSize: 5, PoolTimeout: 3 * time.Minute, MaxRetries: 30, MinRetryBackoff: 1 * time.Second, MaxRetryBackoff: 3 * time.Second, DialTimeout: 10 * time.Second, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, IdleCheckFrequency: 30 * time.Second, MaxConnAge: 2 * time.Minute, }
var ErrNoRunParameters = fmt.Errorf("no run parameters provided")
ErrNoRunParameters is returned by the generic client when an unbound context is passed in. See WithRunParams to bind RunParams to the context.
var GCFrequency = 30 * time.Minute
GCFrequency is the frequency at which periodic GC runs, if enabled.
var GCLastAccessThreshold = 30 * time.Minute
GCLastAccessThreshold specifies the minimum amount of time that should've elapsed since a Redis key was last accessed to be pruned by garbage collection.
Functions ¶
func GetRunParams ¶
GetRunParams extracts the RunParams from a context, previously set by calling WithRunParams.
Types ¶
type Barrier ¶
type Barrier struct { C chan error // contains filtered or unexported fields }
Barrier represents a barrier over a State. A Barrier is a synchronisation checkpoint that will fire once the `target` number of entries on that state have been registered.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func MustBoundClient ¶
MustBoundClient creates a new bound client by calling NewBoundClient, and panicking if it errors.
func MustGenericClient ¶
func MustGenericClient(ctx context.Context, log *zap.SugaredLogger) *Client
MustGenericClient creates a new generic client by calling NewGenericClient, and panicking if it errors.
func NewBoundClient ¶
NewBoundClient returns a new sync Client that is bound to the provided RunEnv. All operations will be automatically scoped to the keyspace of that run.
The context passed in here will govern the lifecycle of the client. Cancelling it will cancel all ongoing operations. However, for a clean closure, the user should call Close().
For test plans, a suitable context to pass here is the background context.
func NewGenericClient ¶
NewGenericClient returns a new sync Client that is bound to no RunEnv. It is intended to be used by testground services like the sidecar.
All operations expect to find the RunParams of the run to scope its actions inside the supplied context.Context. Call WithRunParams to bind the appropriate RunParams.
The context passed in here will govern the lifecycle of the client. Cancelling it will cancel all ongoing operations. However, for a clean closure, the user should call Close().
A suitable context to pass here is the background context of the main process.
func (*Client) Barrier ¶
Barrier sets a barrier on the supplied State that fires when it reaches its target value (or higher).
The caller should monitor the channel C returned inside the Barrier object. If the barrier is satisfied, the value sent will be nil.
When the context fires, the context's error will be propagated instead. The same will occur if the Client's context fires.
If an internal error occurs,
The returned Barrier object contains a channel (C) that fires when the barrier reaches its target, is cancelled, or fails.
The Barrier channel is owned by the Client, and by no means should the caller close it. It is safe to use a non-cancellable context here, like the background context. No cancellation is needed unless you want to stop the process early.
func (*Client) Close ¶
Close closes this client, cancels ongoing operations, and releases resources.
func (*Client) EnableBackgroundGC ¶
EnableBackgroundGC enables a background process to perform periodic GC. It ticks once when called, then with GCFrequency frequency.
An optional notifyCh can be passed in to be notified everytime GC runs, with the result of each run. This is mostly used for testing.
func (*Client) MustBarrier ¶
MustBarrier calls Barrier, panicking if it errors.
Suitable for shorthanding in test plans.
func (*Client) MustPublish ¶
MustPublish calls Publish, panicking if it errors.
Suitable for shorthanding in test plans.
func (*Client) MustPublishAndWait ¶
func (c *Client) MustPublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64)
MustPublishAndWait calls PublishAndWait, panicking if it errors.
Suitable for shorthanding in test plans.
func (*Client) MustPublishSubscribe ¶
func (c *Client) MustPublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription)
MustPublishSubscribe calls PublishSubscribe, panicking if it errors.
Suitable for shorthanding in test plans.
func (*Client) MustSignalAndWait ¶
MustSignalAndWait calls SignalAndWait, panicking if it errors.
Suitable for shorthanding in test plans.
func (*Client) MustSignalEntry ¶
MustSignalEntry calls SignalEntry, panicking if it errors.
Suitable for shorthanding in test plans.
func (*Client) MustSubscribe ¶
func (c *Client) MustSubscribe(ctx context.Context, topic *Topic, ch interface{}) (sub *Subscription)
MustSubscribe calls Subscribe, panicking if it errors.
Suitable for shorthanding in test plans.
func (*Client) Publish ¶
func (c *Client) Publish(ctx context.Context, topic *Topic, payload interface{}) (seq int64, err error)
Publish publishes an item on the supplied topic. The payload type must match the payload type on the Topic; otherwise Publish will error.
This method returns synchronously, once the item has been published successfully, returning the sequence number of the new item in the ordered topic, or an error if one ocurred, starting with 1 (for the first item).
If error is non-nil, the sequence number must be disregarded.
func (*Client) PublishAndWait ¶
func (c *Client) PublishAndWait(ctx context.Context, topic *Topic, payload interface{}, state State, target int) (seq int64, err error)
PublishAndWait composes Publish and a Barrier. It first publishes the provided payload to the specified topic, then awaits for a barrier on the supplied state to reach the indicated target.
If any operation fails, PublishAndWait short-circuits and returns a non-nil error and a negative sequence. If Publish succeeds, but the Barrier fails, the seq number will be greater than zero.
func (*Client) PublishSubscribe ¶
func (c *Client) PublishSubscribe(ctx context.Context, topic *Topic, payload interface{}, ch interface{}) (seq int64, sub *Subscription, err error)
PublishSubscribe publishes the payload on the supplied Topic, then subscribes to it, sending paylods to the supplied channel.
If any operation fails, PublishSubscribe short-circuits and returns a non-nil error and a negative sequence. If Publish succeeds, but Subscribe fails, the seq number will be greater than zero, but the returned Subscription will be nil, and the error, non-nil.
func (*Client) RunGC ¶
RunGC runs a round of GC. GC consists of paging through the Redis database with SCAN, fetching the last access time of all keys via a pipelined OBJECT IDLETIME, and deleting the keys that have been idle for greater or equal to GCLastAccessThreshold.
func (*Client) SignalAndWait ¶
SignalAndWait composes SignalEntry and Barrier, signalling entry on the supplied state, and then awaiting until the required value has been reached.
The returned error will be nil if the barrier was met successfully, or non-nil if the context expired, or some other error ocurred.
func (*Client) SignalEntry ¶
SignalEntry increments the state counter by one, returning the value of the new value of the counter, or an error if the operation fails.
func (*Client) Subscribe ¶
func (c *Client) Subscribe(ctx context.Context, topic *Topic, ch interface{}) (*Subscription, error)
Subscribe subscribes to a topic, consuming ordered, typed elements from index 0, and sending them to channel ch.
The supplied channel must be buffered, and its type must be a value or pointer type matching the topic type. If these conditions are unmet, this method will error immediately.
The caller must consume from this channel promptly; failure to do so will backpressure the Client's subscription event loop.
type State ¶
type State string
State represents a state in a distributed state machine, identified by a unique string within the test case.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription represents a receive channel for data being published in a Topic.
func (*Subscription) Done ¶
func (s *Subscription) Done() <-chan error
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic represents a meeting place for test instances to exchange arbitrary data.