sync2

package
v0.0.0-...-5f226fc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: MIT Imports: 15 Imported by: 62

Documentation

Overview

Package sync2 provides a set of functions and types for:

  • Having context aware functionalities which aren't present in the standard library.
  • For offloading memory through the file system.
  • To control execution of tasks which can run repetitively, concurrently or asynchronously.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Concurrently

func Concurrently(fns ...func() error) []error

Concurrently runs fns concurrently and returns the non-nil errors.

func Copy

func Copy(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error)

Copy implements copying with cancellation.

func Go

func Go(fns ...func()) (wait func())

Go runs fns concurrently and returns a func to wait for them to complete.

See also Concurrently and errs2.Group.

func IsManuallyTriggeredCycle

func IsManuallyTriggeredCycle(ctx context.Context) bool

IsManuallyTriggeredCycle returns whether ctx comes from a context that was started due to a `Trigger` or `TriggerWait` call in Cycle.

func NewTeeFile

func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error)

NewTeeFile returns a tee that uses file-system to offload memory.

func NewTeeInmemory

func NewTeeInmemory(readers int, blockSize int64) ([]PipeReader, PipeWriter, error)

NewTeeInmemory returns a tee that uses inmemory.

func Sleep

func Sleep(ctx context.Context, duration time.Duration) bool

Sleep implements sleeping with cancellation.

func WithTimeout

func WithTimeout(timeout time.Duration, do, onTimeout func())

WithTimeout calls `do` and when the timeout is reached and `do` has not finished, it'll call `onTimeout` concurrently.

When WithTimeout returns it's guaranteed to not call onTimeout.

Types

type Cooldown

type Cooldown struct {
	// contains filtered or unexported fields
}

Cooldown implements an event that can only occur once in a given timeframe.

Cooldown control methods PANICS after Close has been called and don't have any effect after Stop has been called.

Start or Run (only one of them, not both) must be only called once.

func NewCooldown

func NewCooldown(interval time.Duration) *Cooldown

NewCooldown creates a new cooldown with the specified interval.

func (*Cooldown) Close

func (cooldown *Cooldown) Close()

Close closes all resources associated with it.

It MUST NOT be called concurrently.

func (*Cooldown) Run

func (cooldown *Cooldown) Run(ctx context.Context, fn func(ctx context.Context) error) error

Run waits for a message on the trigger channel, then runs the specified function. Afterwards it will sleep for the cooldown duration and drain the trigger channel.

Run PANICS if it's called after Stop has been called.

func (*Cooldown) SetInterval

func (cooldown *Cooldown) SetInterval(interval time.Duration)

SetInterval allows to change the interval before starting.

func (*Cooldown) Start

func (cooldown *Cooldown) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)

Start runs the specified function with an errgroup.

func (*Cooldown) Stop

func (cooldown *Cooldown) Stop()

Stop stops the cooldown permanently.

func (*Cooldown) Trigger

func (cooldown *Cooldown) Trigger()

Trigger attempts to run the cooldown function. If the timer has not expired, the function will not run.

type Cycle

type Cycle struct {
	// contains filtered or unexported fields
}

Cycle implements a controllable recurring event.

Cycle control methods PANICS after Close has been called and don't have any effect after Stop has been called.

Start or Run (only one of them, not both) must be only called once.

func NewCycle

func NewCycle(interval time.Duration) *Cycle

NewCycle creates a new cycle with the specified interval.

func (*Cycle) ChangeInterval

func (cycle *Cycle) ChangeInterval(interval time.Duration)

ChangeInterval allows to change the ticker interval after it has started.

func (*Cycle) Close

func (cycle *Cycle) Close()

Close closes all resources associated with it.

It MUST NOT be called concurrently.

func (*Cycle) Pause

func (cycle *Cycle) Pause()

Pause pauses the cycle.

func (*Cycle) Restart

func (cycle *Cycle) Restart()

Restart restarts the ticker from 0.

func (*Cycle) Run

func (cycle *Cycle) Run(ctx context.Context, fn func(ctx context.Context) error) error

Run runs the specified in an interval.

Every interval `fn` is started. When `fn` is not fast enough, it may skip some of those executions.

Run PANICS if it's called after Stop has been called.

func (*Cycle) SetDelayStart

func (cycle *Cycle) SetDelayStart()

SetDelayStart wait interval before first trigger on start/run.

func (*Cycle) SetInterval

func (cycle *Cycle) SetInterval(interval time.Duration)

SetInterval allows to change the interval before starting.

func (*Cycle) Start

func (cycle *Cycle) Start(ctx context.Context, group *errgroup.Group, fn func(ctx context.Context) error)

Start runs the specified function with an errgroup.

func (*Cycle) Stop

func (cycle *Cycle) Stop()

Stop stops the cycle permanently.

func (*Cycle) Trigger

func (cycle *Cycle) Trigger()

Trigger ensures that the loop is done at least once. If it's currently running it waits for the previous to complete and then runs.

func (*Cycle) TriggerWait

func (cycle *Cycle) TriggerWait()

TriggerWait ensures that the loop is done at least once and waits for completion. If it's currently running it waits for the previous to complete and then runs.

type Event

type Event struct {
	// contains filtered or unexported fields
}

Event allows to signal another goroutine of something happening. This primitive is useful for signaling a single goroutine to update it's internal state.

An Event doesn't need initialization. An Event must not be copied after first use.

func (*Event) Signal

func (event *Event) Signal()

Signal signals once. Signal guarantees that at least one goroutine is released from Wait or the next call to Wait. Multiple signals may be coalesced into a single wait release. In other words N signals results in 1 to N releases from [Wait] or [Signaled].

func (*Event) Signaled

func (event *Event) Signaled() chan struct{}

Signaled returns channel that is notified when a signal happens. Only one goroutine should call `Wait` or `Signaled`. The implementation allows concurrent calls, however the exact behaviour is hard to reason about.

func (*Event) Wait

func (event *Event) Wait(ctx context.Context) bool

Wait waits for a signal. Only one goroutine should call [Wait] or [Signaled]. The implementation allows concurrent calls, however the exact behaviour is hard to reason about.

Returns true when it was not related to context cancellation.

type Fence

type Fence struct {
	// contains filtered or unexported fields
}

Fence allows to wait for something to happen.

func (*Fence) Done

func (fence *Fence) Done() chan struct{}

Done returns channel that will be closed when the fence is released.

func (*Fence) Release

func (fence *Fence) Release()

Release releases everyone from Wait.

func (*Fence) Released

func (fence *Fence) Released() bool

Released returns whether the fence has been released.

func (*Fence) Wait

func (fence *Fence) Wait(ctx context.Context) bool

Wait waits for wait to be unlocked. Returns true when it was successfully released.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements concurrent goroutine limiting.

After calling Wait or Close, no new goroutines are allowed to start.

func NewLimiter

func NewLimiter(n int) *Limiter

NewLimiter creates a new limiter with limit set to n.

func (*Limiter) Close

func (limiter *Limiter) Close()

Close waits for all running goroutines to finish and disallows new goroutines to start.

func (*Limiter) Go

func (limiter *Limiter) Go(ctx context.Context, fn func()) bool

Go tries to start fn as a goroutine. When the limit is reached it will wait until it can run it or the context is canceled.

func (*Limiter) Wait

func (limiter *Limiter) Wait()

Wait waits for all running goroutines to finish and disallows new goroutines to start.

type PipeReader

type PipeReader interface {
	io.ReadCloser
	CloseWithError(reason error) error
}

PipeReader allows closing the reader with an error.

type PipeWriter

type PipeWriter interface {
	io.WriteCloser
	CloseWithError(reason error) error
}

PipeWriter allows closing the writer with an error.

type ReadCache

type ReadCache = ReadCacheOf[any]

ReadCache is a backwards compatible implementation.

type ReadCacheOf

type ReadCacheOf[T any] struct {
	// contains filtered or unexported fields
}

ReadCacheOf implements refreshing of state based on a refresh timeout, but also allows for stale reads up to a certain duration.

func NewReadCache

func NewReadCache[T any](refresh time.Duration, stale time.Duration, read func(ctx context.Context) (T, error)) (*ReadCacheOf[T], error)

NewReadCache returns a new ReadCacheOf.

func (*ReadCacheOf[T]) Get

func (cache *ReadCacheOf[T]) Get(ctx context.Context, now time.Time) (state T, err error)

Get fetches the latest state and refreshes when it's needed.

func (*ReadCacheOf[T]) Init

func (cache *ReadCacheOf[T]) Init(refresh time.Duration, stale time.Duration, read func(ctx context.Context) (T, error)) error

Init initializes the cache for in-place initialization. This is only needed when NewReadCache was not used to initialize it.

func (*ReadCacheOf[T]) RefreshAndGet

func (cache *ReadCacheOf[T]) RefreshAndGet(ctx context.Context, now time.Time) (state T, err error)

RefreshAndGet refreshes the cache and returns the latest result.

func (*ReadCacheOf[T]) Run

func (cache *ReadCacheOf[T]) Run(ctx context.Context) error

Run starts the background process for the cache.

func (*ReadCacheOf[T]) Wait

func (cache *ReadCacheOf[T]) Wait(ctx context.Context) (state T, err error)

Wait waits for any pending refresh and returns the result.

type ReceiverClosableChan

type ReceiverClosableChan[T any] struct {
	// contains filtered or unexported fields
}

ReceiverClosableChan is a channel with altered semantics from the go runtime channels. It is designed to work well in a many-producer, single-receiver environment, where the receiver consumes until it is shut down and must signal to many senders to stop sending.

func MakeReceiverClosableChan

func MakeReceiverClosableChan[T any](bufferSize int) *ReceiverClosableChan[T]

MakeReceiverClosableChan makes a new buffered channel of the given buffer size. A zero buffer size is currently undefined behavior.

func (*ReceiverClosableChan[T]) BlockingSend

func (c *ReceiverClosableChan[T]) BlockingSend(v T) (ok bool)

BlockingSend will send the value into the channel's buffer. If the buffer is full, BlockingSend will block. BlockingSend will fail and return false if StopReceiving is called.

func (*ReceiverClosableChan[T]) Receive

func (c *ReceiverClosableChan[T]) Receive(ctx context.Context) (v T, err error)

Receive returns the next request, until and unless ctx is canceled. Receive does not stop if there are no more requests and StopReceiving has been called, as it is expected that the caller of Receive is who called StopReceiving. The error is not nil if and only if the context was canceled.

func (*ReceiverClosableChan[T]) StopReceiving

func (c *ReceiverClosableChan[T]) StopReceiving() (drained []T)

StopReceiving will cause all currently blocked and future sends to return false. StopReceiving will return what remains in the queue.

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore implements a closable semaphore.

func NewSemaphore

func NewSemaphore(size int) *Semaphore

NewSemaphore creates a semaphore with the specified size.

func (*Semaphore) Close

func (sema *Semaphore) Close()

Close closes the semaphore from further use.

func (*Semaphore) Init

func (sema *Semaphore) Init(size int)

Init initializes semaphore to the specified size.

func (*Semaphore) Lock

func (sema *Semaphore) Lock() bool

Lock locks the semaphore.

func (*Semaphore) Unlock

func (sema *Semaphore) Unlock()

Unlock unlocks the semaphore.

type SuccessThreshold

type SuccessThreshold struct {
	// contains filtered or unexported fields
}

SuccessThreshold tracks task formed by a known amount of concurrent tasks. It notifies the caller when reached a specific successful threshold without interrupting the remaining tasks.

func NewSuccessThreshold

func NewSuccessThreshold(tasks int, successThreshold float64) (*SuccessThreshold, error)

NewSuccessThreshold creates a SuccessThreshold with the tasks number and successThreshold.

It returns an error if tasks is less or equal than 1 or successThreshold is less or equal than 0 or greater or equal than 1.

func (*SuccessThreshold) Failure

func (threshold *SuccessThreshold) Failure()

Failure tells the SuccessThreshold that one task was a failure.

func (*SuccessThreshold) FailureCount

func (threshold *SuccessThreshold) FailureCount() int

FailureCount returns the number of failures so far.

func (*SuccessThreshold) Success

func (threshold *SuccessThreshold) Success()

Success tells the SuccessThreshold that one tasks was successful.

func (*SuccessThreshold) SuccessCount

func (threshold *SuccessThreshold) SuccessCount() int

SuccessCount returns the number of successes so far.

func (*SuccessThreshold) Wait

func (threshold *SuccessThreshold) Wait(ctx context.Context)

Wait blocks the caller until the successThreshold is reached or all the tasks have finished.

type Throttle

type Throttle struct {
	// contains filtered or unexported fields
}

Throttle implements two-sided throttling, between a consumer and producer.

Example
package main

import (
	"fmt"
	"io"
	"math/rand"
	"sync"
	"time"

	"storj.io/common/sync2"
)

func main() {
	throttle := sync2.NewThrottle()
	var wg sync.WaitGroup

	// consumer
	go func() {
		defer wg.Done()
		totalConsumed := int64(0)
		for {
			available, err := throttle.ConsumeOrWait(8)
			if err != nil {
				return
			}
			fmt.Println("- consuming ", available, " total=", totalConsumed)
			totalConsumed += available

			// do work for available amount
			time.Sleep(time.Duration(available) * time.Millisecond)
		}
	}()

	// producer
	go func() {
		defer wg.Done()

		step := int64(8)
		for total := int64(64); total >= 0; total -= step {
			err := throttle.ProduceAndWaitUntilBelow(step, step*3)
			if err != nil {
				return
			}

			fmt.Println("+ producing", step, " left=", total)
			time.Sleep(time.Duration(rand.Intn(8)) * time.Millisecond)
		}

		throttle.Fail(io.EOF)
	}()

	wg.Wait()

	fmt.Println("done", throttle.Err())
}
Output:

func NewThrottle

func NewThrottle() *Throttle

NewThrottle returns a new Throttle primitive.

func (*Throttle) Consume

func (throttle *Throttle) Consume(amount int64) error

Consume subtracts amount from the throttle.

func (*Throttle) ConsumeOrWait

func (throttle *Throttle) ConsumeOrWait(maxAmount int64) (int64, error)

ConsumeOrWait tries to consume at most maxAmount.

func (*Throttle) Err

func (throttle *Throttle) Err() error

Err returns the finishing error.

func (*Throttle) Fail

func (throttle *Throttle) Fail(err error)

Fail stops both consumer and allocator.

func (*Throttle) Produce

func (throttle *Throttle) Produce(amount int64) error

Produce adds amount to the throttle.

func (*Throttle) ProduceAndWaitUntilBelow

func (throttle *Throttle) ProduceAndWaitUntilBelow(amount, limit int64) error

ProduceAndWaitUntilBelow adds amount to the throttle and waits until it's below the given threshold.

func (*Throttle) WaitUntilAbove

func (throttle *Throttle) WaitUntilAbove(limit int64) error

WaitUntilAbove waits until availability drops below limit.

func (*Throttle) WaitUntilBelow

func (throttle *Throttle) WaitUntilBelow(limit int64) error

WaitUntilBelow waits until availability drops below limit.

type WorkGroup

type WorkGroup struct {
	// contains filtered or unexported fields
}

WorkGroup implements waitable and closable group of workers.

func (*WorkGroup) Close

func (group *WorkGroup) Close()

Close prevents from new work being started.

func (*WorkGroup) Done

func (group *WorkGroup) Done()

Done finishes a pending work item.

func (*WorkGroup) Go

func (group *WorkGroup) Go(fn func()) bool

Go starts func and tracks the execution. Returns false when WorkGroup has been closed.

func (*WorkGroup) Start

func (group *WorkGroup) Start() bool

Start returns true when work can be started.

func (*WorkGroup) Wait

func (group *WorkGroup) Wait()

Wait waits for all workers to finish.

type Workplace

type Workplace struct {
	// contains filtered or unexported fields
}

Workplace allows controlling separate jobs that must not run concurrently.

func NewWorkPlace

func NewWorkPlace() *Workplace

NewWorkPlace creates a new work place.

func (*Workplace) Cancel

func (place *Workplace) Cancel()

Cancel cancels any active place and prevents new ones from being started. It does not wait for the active job to be finished.

func (*Workplace) Done

func (place *Workplace) Done() <-chan struct{}

Done returns channel for waiting for the current job to be completed. If there's no active job, it'll return a closed channel.

func (*Workplace) Start

func (place *Workplace) Start(root context.Context, jobTag interface{}, shouldCancel func(jobTag interface{}) bool, fn func(ctx context.Context)) (started bool)

Start tries to spawn a goroutine in background. It returns false, when it cannot cancel the previous work, the context is cancelled or the workplace itself has been canceled.

Directories

Path Synopsis
Package mpscqueue is a multi-producer, single-consumer queue.
Package mpscqueue is a multi-producer, single-consumer queue.
Package race2 exposes race detector API such that some assembly code can be manually instrumented for race detector.
Package race2 exposes race detector API such that some assembly code can be manually instrumented for race detector.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL