concurrent

package
v0.13.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WaitUntilSignal

func WaitUntilSignal(closers ...io.Closer)

Types

type BatchStreamOnce

type BatchStreamOnce[T any] struct {
	// contains filtered or unexported fields
}

func (*BatchStreamOnce[T]) OnComplete

func (b *BatchStreamOnce[T]) OnComplete(err error)

func (*BatchStreamOnce[T]) OnNext

func (b *BatchStreamOnce[T]) OnNext(t T) error

type Callback

type Callback[T any] interface {
	// OnComplete is invoked when the operation completes successfully with the result 't' of type T.
	OnComplete(t T)

	// OnCompleteError is invoked when the operation fails, providing an error 'err' indicating the failure reason.
	OnCompleteError(err error)
}

func NewOnce

func NewOnce[T any](onComplete func(t T), onError func(err error)) Callback[T]

NewOnce creates a new instance of Once with the provided success and error callbacks. It ensures that the callbacks are invoked only once, either for success or failure.

type ConditionContext

type ConditionContext interface {
	// Wait atomically unlocks the locker and suspends execution
	// of the calling goroutine. After later resuming execution,
	// Wait locks c.L before returning. Unlike in other systems,
	// Wait cannot return unless awoken by Broadcast or Signal.
	//
	// Because c.L is not locked when Wait first resumes, the caller
	// typically cannot assume that the condition is true when
	// Wait returns. Instead, the caller should Wait in a loop:
	//
	//	lock.Lock()
	//	for !condition() {
	//	    c.Wait(ctx)
	//	}
	//	... make use of condition ...
	//	lock.Unlock()
	Wait(ctx context.Context) error

	// Signal wakes one goroutine waiting on c, if there is any.
	//
	// It is allowed but not required for the caller to hold c.L
	// during the call.
	//
	// Signal() does not affect goroutine scheduling priority; if other goroutines
	// are attempting to lock c.L, they may be awoken before a "waiting" goroutine.
	Signal()

	// Broadcast wakes all goroutines waiting on c.
	//
	// It is allowed but not required for the caller to hold c.L
	// during the call.
	Broadcast()
}

ConditionContext implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.

This version of condition takes a `context.Context` in the `Wait()` method, to allow for timeouts and cancellations of the operation.

func NewConditionContext

func NewConditionContext(locker sync.Locker) ConditionContext

type Future

type Future[T any] interface {

	// Wait until the future is either completed or failed
	// You should only call wait once
	Wait(ctx context.Context) (T, error)

	//
	Complete(result T)

	// Fail Signal that one party has failed in the operation
	Fail(err error)
}

func NewFuture

func NewFuture[T any]() Future[T]

type Once

type Once[T any] struct {
	// contains filtered or unexported fields
}

func (*Once[T]) OnComplete

func (c *Once[T]) OnComplete(t T)

OnComplete is called to notify that the operation has completed successfully with the result 't'. It ensures that the 'OnComplete' callback is only called once.

func (*Once[T]) OnCompleteError

func (c *Once[T]) OnCompleteError(err error)

OnCompleteError is called to notify that the operation has failed with an error 'err'. It ensures that the 'OnCompleteError' callback is only called once.

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

func (*Stream[T]) OnComplete

func (s *Stream[T]) OnComplete(err error)

func (*Stream[T]) OnNext

func (s *Stream[T]) OnNext(element T) error

type StreamCallback

type StreamCallback[T any] interface {

	// OnNext is called whenever a new data item of type T is received from the stream.
	// It processes the received data item and returns an error if any issues occur during processing.
	// This method allows for custom logic to be applied to each individual data item in the stream.
	OnNext(t T) error

	// OnComplete is called when the stream has ended, either successfully or due to an error.
	// The 'err' parameter indicates the status of the stream completion.
	// If the stream ended successfully, 'err' will be nil. Otherwise,
	// it will contain the error that caused the stream to terminate.
	OnComplete(err error)
}

func NewBatchStreamOnce

func NewBatchStreamOnce[T any](
	maxBatchCount int,
	maxBatchBytes int,
	getBytes func(T) int,
	onFlush func(container []T) error,
	onComplete func(err error)) StreamCallback[T]

func NewStreamOnce

func NewStreamOnce[T any](callback StreamCallback[T]) StreamCallback[T]

func ReadFromStreamCallback

func ReadFromStreamCallback[T any](ch chan *entity.TWithError[T]) StreamCallback[T]

type WaitGroup

type WaitGroup interface {

	// Wait until all the parties in the group are either done or if there is any failure
	// You should only call wait once
	Wait(ctx context.Context) error

	// Done Signals that one party in the group is done
	Done()

	// Fail Signal that one party has failed in the operation
	Fail(err error)
}

WaitGroup is similar to sync.WaitGroup but adds 2 capabilities:

  1. Returning an error if any operation fails
  2. Accept a context to cancel the Wait

func NewWaitGroup

func NewWaitGroup(parties int) WaitGroup

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL