tsync

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2019 License: Apache-2.0 Imports: 7 Imported by: 5

Documentation

Index

Constants

View Source
const (
	// SpinPrioritySuspend should be used for spinning loops that are expected
	// to wait for very long periods of time. The loop will sleep for 1 second
	// after 100 iterations.
	SpinPrioritySuspend = SpinPriority(iota)

	// SpinPriorityLow should be used for spinning loops that are expected to
	// spin for a rather long time before being able to exit.
	// After 100 loops the caller waits for 100 milliseconds.
	SpinPriorityLow = SpinPriority(iota)

	// SpinPriorityMedium should be used for spinning loops that are expected to
	// spin for a short amount of time before being able to exit.
	// After 100 loops the caller waits for 1 millisecond.
	SpinPriorityMedium = SpinPriority(iota)

	// SpinPriorityHigh should be used for spinning loops that are expected to
	// almost never spin.
	// After 100 loops the caller waits for 10 microseconds.
	SpinPriorityHigh = SpinPriority(iota)

	// SpinPriorityRealtime should be used for loops that need to run as fast
	// as possible. After 100 loops the go scheduler is triggered.
	SpinPriorityRealtime = SpinPriority(iota)
)

Variables

This section is empty.

Functions

func AbortAfter

func AbortAfter(t time.Duration, routine func()) bool

func Fanout

func Fanout(in interface{}, out ...interface{})

Fanout receives from the given in channel and forwards the data to the first non-blocking out channel. Fanout returns when in has been closed.

func Funnel

func Funnel(out interface{}, in ...interface{})

Funnel receives from the first non-blocking in channel and forwards it to the given out channel. Funnel returns when all in channels have been closed.

func Turnout

func Turnout(in []interface{}, out []interface{})

Turnout multiplexes data between the list of in and out channels. The data of the first non-blocking in channel will be forwarded to the first non-blocking out channel. Turnout returns when all in channels have been closed.

Types

type Fuse

type Fuse struct {
	// contains filtered or unexported fields
}

Fuse is a local circuit breaker implementation that is ment to be used to manage the state of a given resource between different threads of execution (consumer/producer). If the resource is not available the fuse is "burned". Components may now wait on that fuse and are woken as soon as the resource becomes available again (the fuse is "activated" again).

func NewFuse

func NewFuse() *Fuse

NewFuse creates a new Fuse and returns it. A new fuse is always active.

func (*Fuse) Activate

func (fuse *Fuse) Activate()

Activate sets the fuse back to the "running" state. An already active fuse cannot be activated again (call is ignored).

func (*Fuse) Burn

func (fuse *Fuse) Burn()

Burn sets the fuse back to the "inactive" state. An already burned fuse cannot be burned again (call is ignored).

func (Fuse) IsBurned

func (fuse Fuse) IsBurned() bool

IsBurned returns true if the fuse in the "inactive" state

func (Fuse) Wait

func (fuse Fuse) Wait()

Wait blocks until the fuse enters active state. Multiple go routines may wait on the same fuse.

type LimitError

type LimitError terrors.SimpleError

LimitError is returned when a datastructure reached its limit

func (LimitError) Error

func (err LimitError) Error() string

type LockedError

type LockedError terrors.SimpleError

LockedError is returned when an item has been encountered as locked

func (LockedError) Error

func (err LockedError) Error() string

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex is a lightweight, spinner based mutex implementation, extending the standard go mutex by the possibility to query the mutex' state and by adding a TryLock function.

func NewMutex

func NewMutex(priority SpinPriority) *Mutex

NewMutex creates a new mutex with the given spin priority used during Lock.

func (*Mutex) IsLocked

func (m *Mutex) IsLocked() bool

IsLocked returns the state of this mutex. The result of this function might change directly after call so it should only be used in situations where this fact is not considered problematic.

func (*Mutex) Lock

func (m *Mutex) Lock()

Lock blocks (spins) until the lock becomes available

func (*Mutex) TryLock

func (m *Mutex) TryLock() bool

TryLock tries to acquire a lock and returns true if it succeeds. This function does not block.

func (*Mutex) Unlock

func (m *Mutex) Unlock()

Unlock unblocks one routine waiting on lock.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements a multi-producer, multi-consumer, fair, lockfree queue. Please note that while this queue is faster than a chan interface{} it is not faster than a channel of a specific type due to the overhead that interface{} brings.

How it works: Given a queue of capacity 7, we have two write indexes "p" and "n" for process and next.

reader n/p
v

[ |1|2|3| | | | ]

^     ^
p     n

Here threads 1,2 and 3 are writing to slots 1,2,3. When 3 is done writing it has to wait for p to move to slot 3. Same for slot 2. When 1 is done writing it immediatly moves p to slot 2, making it possible for 2 to continue. Same for slot 3. While p is still on 1 the "reader n" can be fetched but Pop will wait until p has moved on. If "reader n" is done, "reader p" is moved in similar fashion as p for writes. This implements a FIFO queue for writes and reads and makes sure that no incomplete reads or overwrites occur.

func NewQueue

func NewQueue(capacity uint32) Queue

NewQueue creates a new queue with medium spinning priority

func NewQueueWithSpinner

func NewQueueWithSpinner(capacity uint32, spinner Spinner) Queue

NewQueueWithSpinner allows to set the spinning priority of the queue to be created.

func (*Queue) Close

func (q *Queue) Close()

Close blocks the queue from write access. It also allows Pop() to return false as a second return value

func (*Queue) IsClosed

func (q *Queue) IsClosed() bool

IsClosed returns true if Close() has been called.

func (*Queue) IsDrained

func (q *Queue) IsDrained() bool

IsDrained combines IsClosed and IsEmpty.

func (*Queue) IsEmpty

func (q *Queue) IsEmpty() bool

IsEmpty returns true if there is no item in the queue to be processed. Please note that this state is extremely volatile unless IsClosed returned true.

func (*Queue) Pop

func (q *Queue) Pop() interface{}

Pop removes an item from the queue. This call may block if the queue is empty. If the queue is drained Pop() will not block and return nil.

func (*Queue) Push

func (q *Queue) Push(item interface{}) error

Push adds an item to the queue. This call may block if the queue is full. An error is returned when the queue is locked.

func (*Queue) Reopen

func (q *Queue) Reopen()

Reopen unblocks the queue to allow write access again.

type SpinPriority

type SpinPriority uint32

SpinPriority is used for Spinner priority enum values

type Spinner

type Spinner struct {
	// contains filtered or unexported fields
}

Spinner is a helper struct for spinning loops.

func NewCustomSpinner

func NewCustomSpinner(suspendFor time.Duration) Spinner

NewCustomSpinner creates a new spinner with a custom delay.

func NewSpinner

func NewSpinner(priority SpinPriority) Spinner

NewSpinner creates a new helper for spinning loops

func (*Spinner) Reset

func (spin *Spinner) Reset()

Reset sets the internal counter back to 0

func (*Spinner) Yield

func (spin *Spinner) Yield()

Yield should be called in spinning loops and will assure correct spin/wait/schedule behavior according to the set priority.

type Stack

type Stack struct {
	// contains filtered or unexported fields
}

Stack implements a simple, growing, lockfree stack. The main idea is to use the sign bit of the head index as a mutex. If the index is negative, the stack is locked so we need to spin. If the index is non-negative the stack is unlocked and we can write or read.

func NewStack

func NewStack(size int) Stack

NewStack creates a new stack with the given initial size. The given size will also be used as grow size. SpinPriorityMedium is used to initialize the spinner.

func NewStackWithGrowSize

func NewStackWithGrowSize(size, grow int) Stack

NewStackWithGrowSize allows to pass a custom grow size to the stack. SpinPriorityMedium is used to initialize the spinner.

func NewStackWithSpinner

func NewStackWithSpinner(size int, spin Spinner) Stack

NewStackWithSpinner allows to pass a custom spinner to the stack. The given size will also be used as grow size.

func NewStackWithSpinnerAndGrowSize

func NewStackWithSpinnerAndGrowSize(size, grow int, spin Spinner) Stack

NewStackWithSpinnerAndGrowSize allows to fully configure the new stack.

func (*Stack) Len

func (s *Stack) Len() int

Len returns the number of elements on the stack. Please note that this value can be highly unreliable in multithreaded environments as this is only a snapshot of the state at calltime.

func (*Stack) Pop

func (s *Stack) Pop() (interface{}, error)

Pop retrieves the topmost element from the stack. A LimitError is returned when the stack is empty.

func (*Stack) Push

func (s *Stack) Push(v interface{}) error

Push adds an element to the top of the stack. When the stack's capacity is reached the storage grows as defined during construction. If the stack reaches 2^31 elements it is considered full and will return an LimitError.

type TimeoutError

type TimeoutError terrors.SimpleError

TimeoutError is returned when a function returned because of a timeout

func (TimeoutError) Error

func (err TimeoutError) Error() string

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

WaitGroup is a replacement for sync/waitgroup that allows the internal counter to go negative. This version allows a missed Done to be recovered but will make it a lot harder to detect missing Done or Add calls. Use only where needed.

func (*WaitGroup) Active

func (wg *WaitGroup) Active() bool

Active returns true if the counter is > 0

func (*WaitGroup) Add

func (wg *WaitGroup) Add(delta int)

Add increments the waitgroup counter by the given value. Delta may be negative.

func (*WaitGroup) Done

func (wg *WaitGroup) Done()

Done is the shorthand version for Add(-1)

func (*WaitGroup) Inc

func (wg *WaitGroup) Inc()

Inc is the shorthand version for Add(1)

func (*WaitGroup) IncWhenDone

func (wg *WaitGroup) IncWhenDone()

IncWhenDone wait until the counter is exactly 0 and triggeres an increment if this is found to be true

func (*WaitGroup) Reset

func (wg *WaitGroup) Reset()

Reset sets the counter to 0

func (*WaitGroup) Wait

func (wg *WaitGroup) Wait()

Wait blocks until the counter is 0 or less.

func (*WaitGroup) WaitFor

func (wg *WaitGroup) WaitFor(timeout time.Duration) bool

WaitFor blocks until the counter is 0 or less. If the block takes longer than the given timeout, WaitFor will return false. If duration is 0, Wait is called.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL