nsync

package
v0.1.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: BSD-3-Clause Imports: 5 Imported by: 4

Documentation

Overview

The nsync package provides a mutex Mu and a Mesa-style condition variable CV.

The nsync primitives differ from those in sync in that nsync provides timed/cancellable wait on CV, and try-lock on Mu; CV's wait primitives take the mutex as an explicit argument to remind the reader that they have a side effect on the mutex; the zero value CV can be used without further initialization; and Mu forbids a lock acquired by one thread to be released by another.

As well as Mu and CV being usable with one another, an nsync,Mu can be used with a sync.Cond, and an nsync.CV can be used with a sync.Mutex.

Index

Examples

Constants

View Source
const (
	OK        = iota // Neither expired nor cancelled.
	Expired   = iota // absDeadline expired.
	Cancelled = iota // cancelChan was closed.
)

Values returned by CV.WaitWithDeadline().

Variables

View Source
var NoDeadline time.Time

NoDeadline represents a time in the far future---a deadline that will not expire.

Functions

This section is empty.

Types

type CV

type CV struct {
	// contains filtered or unexported fields
}

A CV is a condition variable in the style of Mesa, Java, POSIX, and Go's sync.Cond. It allows a thread to wait for a condition on state protected by a mutex, and to proceed with the mutex held and the condition true.

When compared with sync.Cond: (a) CV adds WaitWithDeadline() which allows timeouts and cancellation, (b) the mutex is an explicit argument of the wait calls to remind the reader that they have a side-effect on the mutex, and (c) (as a result of (b)), a zero-valued CV is a valid CV with no enqueued waiters, so there is no need of a call to construct a CV.

Usage:

After making the desired predicate true, call:

cv.Signal() // If at most one thread can make use of the predicate becoming true.

or

cv.Broadcast() // If multiple threads can make use of the predicate becoming true.

To wait for a predicate with no deadline (assuming cv.Broadcast() is called whenever the predicate becomes true):

mu.Lock()
for !some_predicate_protected_by_mu { // the for-loop is required.
        cv.Wait(&mu)
}
// predicate is now true
mu.Unlock()

To wait for a predicate with a deadline (assuming cv.Broadcast() is called whenever the predicate becomes true):

mu.Lock()
for !some_predicate_protected_by_mu && cv.WaitWithDeadline(&mu, absDeadline, cancelChan) == nsync.OK {
}
if some_predicate_protected_by_mu { // predicate is true
} else { // predicate is false, and deadline expired, or cancelChan was closed.
}
mu.Unlock()

or, if the predicate is complex and you wish to write it just once and inline, you could use the following instead of the for-loop above:

mu.Lock()
var predIsTrue bool
for outcome := OK; ; outcome = cv.WaitWithDeadline(&mu, absDeadline, cancelChan) {
        if predIsTrue = some_predicate_protected_by_mu; predIsTrue || outcome != nsync.OK {
                break
        }
}
if predIsTrue { // predicate is true
} else { // predicate is false, and deadline expired, or cancelChan was closed.
}
mu.Unlock()

As the examples show, Mesa-style condition variables require that waits use a loop that tests the predicate anew after each wait. It may be surprising that these are preferred over the precise wakeups offered by the condition variables in Hoare monitors. Imprecise wakeups make more efficient use of the critical section, because threads can enter it while a woken thread is still emerging from the scheduler, which may take thousands of cycles. Further, they make the programme easier to read and debug by making the predicate explicit locally at the wait, where the predicate is about to be assumed; the reader does not have to infer the predicate by examining all the places where wakeups may occur.

func (*CV) Broadcast

func (cv *CV) Broadcast()

Broadcast() wakes all threads currently enqueued on *cv.

func (*CV) Signal

func (cv *CV) Signal()

Signal() wakes at least one thread currently enqueued on *cv.

func (*CV) Wait

func (cv *CV) Wait(mu sync.Locker)

Wait() atomically releases "mu" and blocks the caller on *cv. It waits until it is awakened by a call to Signal() or Broadcast(), or a spurious wakeup. It then reacquires "mu", and returns. It is equivalent to a call to WaitWithDeadline() with absDeadline==NoDeadline, and a nil cancelChan. It should be used in a loop, as with all standard Mesa-style condition variables. See examples above.

Example

ExampleMuWait() demonstrates the use of nsync.Mu's Wait() via a priority queue of strings. See the routine RemoveWithDeadline(), above.

// Example use of Mu.Wait():  A priority queue of strings whose
// RemoveWithDeadline() operation has a deadline.

package main

import (
	"container/heap"
	"fmt"
	"time"

	"v.io/x/lib/nsync"
)

// ---------------------------------------

// A priQueue implements heap.Interface and holds strings.
type priQueue []string

func (pq priQueue) Len() int               { return len(pq) }
func (pq priQueue) Less(i int, j int) bool { return pq[i] < pq[j] }
func (pq priQueue) Swap(i int, j int)      { pq[i], pq[j] = pq[j], pq[i] }
func (pq *priQueue) Push(x interface{})    { *pq = append(*pq, x.(string)) }
func (pq *priQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	s := old[n-1]
	*pq = old[0 : n-1]
	return s
}

// ---------------------------------------

// A StringPriorityQueue is a priority queue of strings, which emits the
// lexicographically least string available.
type StringPriorityQueue struct {
	nonEmpty nsync.CV // signalled when heap becomes non-empty
	mu       nsync.Mu // protects priQueue
	heap     priQueue
}

// Add() adds "s" to the queue *q.
func (q *StringPriorityQueue) Add(s string) {
	q.mu.Lock()
	if q.heap.Len() == 0 {
		q.nonEmpty.Broadcast()
	}
	heap.Push(&q.heap, s)
	q.mu.Unlock()
}

// RemoveWithDeadline() waits until queue *q is non-empty, then removes a string from its
// beginning, and returns it with true; or if absDeadline is reached before the
// queue becomes non-empty, returns the empty string and false.
func (q *StringPriorityQueue) RemoveWithDeadline(absDeadline time.Time) (s string, ok bool) {
	q.mu.Lock()
	for q.heap.Len() == 0 && q.nonEmpty.WaitWithDeadline(&q.mu, absDeadline, nil) == nsync.OK {
	}
	if q.heap.Len() != 0 {
		s = heap.Pop(&q.heap).(string)
		ok = true
	}
	q.mu.Unlock()
	return s, ok
}

// ---------------------------------------

// removeAndPrint() removes the first item from *q and outputs it on stdout,
// or outputs "timeout: <delay>" if no value can be found before "delay" elapses.
func removeAndPrint(q *StringPriorityQueue, delay time.Duration) {
	if s, ok := q.RemoveWithDeadline(time.Now().Add(delay)); ok {
		fmt.Printf("%s\n", s)
	} else {
		fmt.Printf("timeout %v\n", delay)
	}
}

// ExampleMuWait() demonstrates the use of nsync.Mu's Wait() via a priority queue of strings.
// See the routine RemoveWithDeadline(), above.
func main() {
	var q StringPriorityQueue
	ch := make(chan bool)

	go func() {
		q.Add("one")
		q.Add("two")
		q.Add("three")
		close(ch)
		time.Sleep(500 * time.Millisecond)
		q.Add("four")
		time.Sleep(500 * time.Millisecond)
		q.Add("five")
	}()

	// delay while "one", "two" and "three" are queued, but not yet "four"
	<-ch

	removeAndPrint(&q, 1*time.Second)        // should get "one"
	removeAndPrint(&q, 1*time.Second)        // should get "three" (it's lexicographically less than "two")
	removeAndPrint(&q, 1*time.Second)        // should get "two"
	removeAndPrint(&q, 100*time.Millisecond) // should time out because 1.1 < 0.5*3
	removeAndPrint(&q, 1*time.Second)        // should get "four"
	removeAndPrint(&q, 100*time.Millisecond) // should time out because 0.1 < 0.5
	removeAndPrint(&q, 1*time.Second)        // should get "five"
	removeAndPrint(&q, 1*time.Second)        // should time out because there's no more to fetch
}
Output:

one
three
two
timeout 100ms
four
timeout 100ms
five
timeout 1s

func (*CV) WaitWithDeadline

func (cv *CV) WaitWithDeadline(mu sync.Locker, absDeadline time.Time, cancelChan <-chan struct{}) (outcome int)

WaitWithDeadline() atomically releases "mu" and blocks the calling thread on *cv. It then waits until awakened by a call to Signal() or Broadcast() (or a spurious wakeup), or by the time reaching absDeadline, or by cancelChan being closed. In all cases, it reacquires "mu", and returns the reason for the call returned (OK, Expired, or Cancelled). Use absDeadline==nsync.NoDeadline for no deadline, and cancelChan==nil for no cancellation. WaitWithDeadline() should be used in a loop, as with all Mesa-style condition variables. See examples above.

There are two reasons for using an absolute deadline, rather than a relative timeout---these are why pthread_cond_timedwait() also uses an absolute deadline. First, condition variable waits have to be used in a loop; with an absolute time, the deadline does not have to be recomputed on each iteration. Second, in most real programmes, some activity (such as an RPC to a server, or when guaranteeing response time in a UI), there is a deadline imposed by the specification or the caller/user; relative delays can shift arbitrarily with scheduling delays, and so after multiple waits might extend beyond the expected deadline. Relative delays tend to be more convenient mostly in tests and trivial examples than they are in real programmes. nolint: gocyclo

type Mu

type Mu struct {
	// contains filtered or unexported fields
}

A Mu is a mutex. Its zero value is valid, and unlocked. It is similar to sync.Mutex, but implements TryLock().

A Mu can be "free", or held by a single thread (aka goroutine). A thread that acquires it should eventually release it. It is not legal to acquire a Mu in one thread and release it in another.

Example usage, where p.mu is an nsync.Mu protecting the invariant p.a+p.b==0

p.mu.Lock()
// The current thread now has exclusive access to p.a and p.b; invariant assumed true.
p.a++
p.b-- // restore invariant p.a+p.b==0 before releasing p.mu
p.mu.Unlock()

func (*Mu) AssertHeld

func (mu *Mu) AssertHeld()

AssertHeld() panics if *mu is not held.

func (*Mu) Lock

func (mu *Mu) Lock()

Lock() blocks until *mu is free and then acquires it.

func (*Mu) TryLock

func (mu *Mu) TryLock() bool

TryLock() attempts to acquire *mu without blocking, and returns whether it is successful. It returns true with high probability if *mu was free on entry.

func (*Mu) Unlock

func (mu *Mu) Unlock()

Unlock() unlocks *mu, and wakes a waiter if there is one.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL