parl

package module
v0.4.184 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2024 License: ISC Imports: 31 Imported by: 14

README

Parl

Parl is a Go library for massive virtual parallelism in command-line utilities, system services and any executable code with ISC License.

Harald Rudell is the right person to write Parl due to his:

  • — 5 years of virtually threaded Go
  • — 6 years of multi-threaded Java
  • — 8 years of ECMAScript with Node.js and React

“justice, peace and massive virtual parallelism”

© 2018–present Harald Rudell (https://haraldrudell.github.io/haraldrudell/)

How to use

import "github.com/haraldrudell/parl"

Documentation

Go Reference  Documentation in the Go Package Index
parl targets latest versions of macOS and stable Linux, as of 1/3/2023 macOS 13.1 Darwin xnu and Linux 5.15


Parl features: moderator, sqlite interface, package-selectable logging, debouncer, self-signed certificate authority, watchers, thread management, file system scan and operations, …

On March 16th, 2022, parl was open-sourced under an ISC License
Parl is about 25,000 lines of Go code with first line written on November 21, 2018

© 2018–present Harald Rudell harald.rudell@gmail.com (https://haraldrudell.github.io/haraldrudell/)

Documentation

Overview

Package parl handles inter-thread communication and controls parallelism

parl has sub-packages augmenting the Go standard library:

perrors pfs plog pnet pos pruntime psql pstrings
psyscall pterm ptime

parl has feature packages:

ev — handling of goroutine-based functions
goid — unique goroutine IDs
mains — functions for writing command-line utilities and services
parlca — self-signed certificate authority
progress — monitor work progress for a large number of threads
sqlite — local SQL database
threadprof — profiling and counters for what threads are doing
// statuser: thread hang detector
tracer — event lists by task rather than by time or thread

parl features per-writer thread-safe logging with topic and per-package output control:

Logging is to stderr except for the Out function. parl logging uses comma separator for numbers. One argument is output as string, two or more arguments is Printf. The location matched against the regular expression is full package path, optional type receiver and the funtion name: “github.com/haraldrudell/mypackage.(*MyType).MyFunc”

Out(string, ...interface{}) — Standard output
Log(string, ...interface{}) — Always outputs to stderr
parl.D(string, ...interface{}) — Same as Log, intended for temporary use

Info(string, ...interface{}) — Informational progress messages
SetSilent(true) — removes Info output
IsSilent() — deteremines if Info printing applies

Debug(string, ...interface{}) — only prints for locations where SetDebug(true)
SetDebug(true) — Control Debug() globally, code location for all prints, long stack traces
SetRegexp(regExp string) (err error) — Regular expression controlling local Debug() printing
IsThisDebug() — Determines if debug is active for the executing function

Console(string, ...interface{}) — terminal interactivity output

parl.Recover() and parl.Recover2() thread recovery and mains.Executable.Recover() process recovery:

Threads can provide their errors via the perrors.ParlError thread-safe error store, plain error channels, parl.NBChan[error] or parl.ClosableChan[error]. parl.Recover and parl.Recover2 convert thread panic to error along with regular errors, annotating, retrieving and storing those errors and invoking error handling functions for them. mains.Recover is similar for the process.

func thread(errCh *parl.NBChan[error]) { // real-time non-blocking error channel
  defer errCh.Close() // non-blocking close effective on send complete
  var err error
  defer parl.Recover2("", &err, errCh.Send)
  errCh.Ch() <- err // non-blocking
  if err = someFunc(); err != nil {
    err = perrors.Errorf("someFunc: %w", err) // labels and attaches a stack
    return
…
func myThreadSafeThread(wg *sync.WaitGroup, errs *perrors.ParlError) { // ParlError: thread-safe error store
  defer wg.Done()
  var err error
  defer parl.Recover(parl.Annotation(), &err, errs.AddErrorProc)
…

parl package features:

atomic.Bool — Thread-safe boolean
Closer — Deferrable, panic-free channel close
ClosableChan — Initialization-free channel with observable deferrable panic-free close
Moderator — A ticketing system for limited parallelism
NBChan — A non-blocking channel with trillion-size dynamic buffer
SerialDo — Serialization of invocations
WaitGroup —Observable WaitGroup
Debouncer — Invocation debouncer, pre-generics
Sprintf — Supporting thousands separator

Parl is about 15,000 lines of Go code with first line written on November 21, 2018

On March 16th, 2022, parl was open-sourced under an ISC License

© 2018–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)

Package pdebug provides a portable, parsed stack trace.

Index

Constants

View Source
const (
	// disables the debounce time
	//	- debounce time holds incoming items until
	//		debounce time elapses with no additional items
	//	- when disabled max delay defaults to1 s and
	//		items are sent when maxDelay reached
	NoDebounceTime time.Duration = 0
	// disables debouncer max delay function
	//	- when debounce timer holds items, those items
	//		are sent when age reaches maxDelay
	//	- when debounce time disabled, defaults to 1 s.
	//		otherwise no default
	NoDebounceMaxDelay time.Duration = 0
)
View Source
const (
	AllowTermination   = true
	PreventTermination = false
)
View Source
const (
	// [OnceCh.IsWinner] loser threads do not wait
	NoOnceWait = true
	// [OnceCh.IsWinner] loser threads wait
	LoserWait = false
)
View Source
const (
	Rfc3339s  = "2006-01-02 15:04:05Z07:00"
	Rfc3339ms = "2006-01-02 15:04:05.000Z07:00"
	Rfc3339us = "2006-01-02 15:04:05.000000Z07:00"
	Rfc3339ns = "2006-01-02 15:04:05.000000000Z07:00"
)
View Source
const (
	SlowDefault slowType = iota
	SlowOwnThread
	SlowShutdownThread
)
View Source
const CloseAwaiter = false

AwaitableSlice.EmptyCh initialize: this invocation will wait for close-like state, do not activate EmptyCh awaitable

View Source
const (
	CloseChannelDrain = true
)
View Source
const EvCon = true

Awaitable.Close argument to Close meaning eventually consistency

  • may return before the channel is actually closed
View Source
const SendNonBlocking = true

with nonBlocking set to SendNonBlocking, ChannelSend will never block

Variables

View Source
var ErrEndCallbacks = EndCallbacks(errors.New("end callbacks error"))

ErrEndCallbacks indicates upon retun from a callback function that no more callbacks are desired

  • ErrEndCallbacks does not indicate an error and should not be returned by any other function than a callback
  • check for ErrrEndCallback is using the errors.Is method with the parl.ErrEndCallbacks value
  • EndCallbacks creates an ErrEndCallbacks value basd on another error
  • an ErrEndCallbacks type implements:
  • — an Is method returning true for errors implementing a EndCallbacks method
  • parl.ErrEndCallbacks additionally implements:
  • — a dummy EndCallbacks method

Usage:

func callback() (err error) {
  return parl.ErrEndCallbacks

  if errors.Is(err, parl.ErrEndCallbacks) {
    err = nil
    …
View Source
var ErrNil = &nilValue{"value"}

ErrNil is used with errors.Is to detect that a panic or error value was caused by a value that cannot be nil, such as a function argument to a new function, was nil

  • a nilValue type implements:
  • — a dummy NilValueError method
  • — an Is method returning true for errors implementing a NilValueError method

Usage:

if errors.Is(err, parl.ErrNil) { …
View Source
var ErrNotCancelContext = errors.New("context chain does not have CancelContext")

ErrNotCancelContext indicates that InvokeCancel was provided a context not a return value from NewCancelContext or NewCancelContextFunc.

Test for ErrNotCancelContext:

if errors.Is(err, parl.ErrNotCancelContext) …
View Source
var NoErrp *error

argument to Recover: no error aggregation

View Source
var NoopErrorSink = &noopErrorSink{}

for RecoverAnnotation when no error sink is present and no standard error output should be output

Functions

func AddNotifier added in v0.4.29

func AddNotifier(ctx context.Context, notifier NotifierFunc) (ctx2 context.Context)

AddNotifier adds a function that is invoked when any context is canceled

  • AddNotifier is typically invoked on the root context
  • any InvokeCancel in the context tree below the top AddNotifier invocation causes notification
  • invocation is immediately after context cancel completes
  • implemented by inserting a thread-safe slice value into the context chain
  • notifier receives a stack trace of the cancel invocation, typically beginning with parl.InvokeCancel
  • notifier should be thread-safe and not long running
  • typical usage is debug of unexpected context cancel

func AddNotifier1 added in v0.4.97

func AddNotifier1(ctx context.Context, notifier NotifierFunc) (ctx2 context.Context)

AddNotifier1 adds a function that is invoked when a child context is canceled

  • child contexts with their own AddNotifier1 are not detected
  • invocation is immediately after context cancel completes
  • implemented by inserting a value into the context chain
  • notifier receives a stack trace of the cancel invocation, typically beginning with parl.InvokeCancel
  • notifier should be thread-safe and not long running
  • typical usage is debug of unexpected context cancel

func Await added in v0.4.179

func Await(ch AwaitableCh)

Await is a deferrable funciton awaiting close of ch

func AwaitValue added in v0.4.180

func AwaitValue[T any](stream Source1[T]) (value T, hasValue bool)

AwaitValue awaits value or close, blocking until either event

  • hasValue true: value is valid, possibly the zero-value like a nil interface value
  • hasValue: false: the stream is closable and closed
  • stream: an awaitable possibly closable source type like Source1
  • — stream’s DataWaitCh Get and if present EmptyCh methods are used
  • — stream cannot be eg. AtomicError because it is not awaitable
  • AwaitValue wraps a 10-line read operation as a two-value expression

func AwaitableSliceString added in v0.4.179

func AwaitableSliceString[T any](a *AwaitableSlice[T]) (s string)

func CancelOnError added in v0.4.28

func CancelOnError(errp *error, ctx context.Context)

CancelOnError invokes InvokeCancel if errp has an error.

  • CancelOnError is deferrable and thread-safe.
  • ctx must have been returned by either NewCancelContext or NewCancelContextFunc.
  • errp == nil or *errp == nil means no error
  • ctx nil is panic
  • ctx not from NewCancelContext or NewCancelContextFunc is panic
  • thread-safe, idempotent

func ChannelSend added in v0.4.101

func ChannelSend[T any](ch chan<- T, value T, nonBlocking ...bool) (didSend, isNilChannel, isClosedChannel bool, err error)

ChannelSend is channel send without panics and possibly non-blocking

  • if nonBlocking is SendNonBlocking or true, channel send will be attempted but not block
  • didSend is true if value was successfully sent on ch
  • err is non-nil if a panic occurred or ch is nil
  • isNilChannel is true if the channel is nil, ie. send would block indefinitely
  • isClosedChannel is true if the panic was caused by ch being closed
  • there should be no panics other than from ch being closed

func Close added in v0.4.15

func Close(closable io.Closer, errp *error)

Close closes an io.Closer object.

  • if errp is non-nil, panic values updates it using errors.AppendError.
  • Close is thread-safe, panic-free and deferrable
  • type Closer interface { Close() error }

func CloseChannel added in v0.4.103

func CloseChannel[T any](ch chan T, errp *error, drainChannel ...bool) (
	isNilChannel, isCloseOfClosedChannel bool, n int, err error,
)

CloseChannel closes a channel

  • CloseChannel is thread-safe, deferrable and panic-free, handles closed-channel panic, nil-channel case and has channel drain feature
  • isNilChannel returns true if ch is nil. closing a nil channel would cause panic.
  • isCloseOfClosedChannel is true if close paniced due to the channel already closed. A channel transferring data cannot be inspected for being closed
  • if errp is non-nil, panic values updates it using errors.AppendError.
  • if doDrain is parl.CloseChannelDrain, the channel is drained first. Note: closing a channel while a thread is blocked in channel send is a data race. If a thread is continuously sending items and doDrain is true, CloseChannel will block indefinitely.
  • n returns the number of drained items.

func Closer added in v0.4.0

func Closer[T any](ch chan T, errp *error)

Closer is a deferrable function that closes a channel.

  • if errp is non-nil, panic values updates it using errors.AppendError.
  • Closer is thread-safe, panic-free and deferrable

func CloserSend added in v0.4.15

func CloserSend[T any](ch chan<- T, errp *error)

CloserSend is a deferrable function that closes a send-channel.

  • if errp is non-nil, panic values updates it using errors.AppendError.
  • CloserSend is thread-safe, panic-free and deferrable

func Console

func Console(format string, a ...interface{})

Console always prints to interactive standard out

  • intended for command-line interactivity.
  • if debug is enabled, code location is appended.

func Consolew added in v0.4.12

func Consolew(format string, a ...interface{})

Consolew always prints to standard out

  • intended for command-line interactivity.
  • Consolew does not ensure ending newline

func D

func D(format string, a ...interface{})

D always prints to stderr with code location. Thread-safe

  • D is meant for temporary output intended to be removed prior to check-in

func Debug

func Debug(format string, a ...interface{})

Debug outputs only if debug is configured globally or for the executing function

  • Debug outputs to standard error
  • code location is appended

func DeferredErrorSink added in v0.4.176

func DeferredErrorSink(errorSink ErrorSink1, errp *error)

DeferredErrorSink is a deferrable function that provides any error at *errp to errorSink

  • errp may be nil

func DeferredErrorSource added in v0.4.178

func DeferredErrorSource(errorSource ErrorSource1, errp *error)

DeferredErrorSource is a deferrable function that appends all errors in errorSource to errp

func EndCallbacks added in v0.4.46

func EndCallbacks(err error) (err2 error)

EndCallbacks creates a EndCallbacks error based on another error

func EnsureError

func EnsureError(panicValue any) (err error)

ensureError interprets a panic values as an error

  • returned value is either nil or an error value with stack trace
  • the error is ensured to have stack trace

func GetD added in v0.4.26

func GetD(skipFrames int) (debug func(format string, a ...interface{}))

GetD returns a function value that always invokes logging

  • output to stderr
  • the caller frame is appended
  • D is meant for temporary output intended to be removed prior to check-in
  • the function value can be passed around or invoked later

func GetDebug added in v0.4.25

func GetDebug(skipFrames int) (debug func(format string, a ...interface{}))

GetDebug returns a function value that can be used to invokes logging

  • output to stderr if debug is enabled for the specified caller frame
  • the caller frame is appended
  • the function value can be passed around or invoked later

func HasCancel added in v0.4.29

func HasCancel(ctx context.Context) (hasCancel bool)

HasCancel return if ctx can be used with parl.InvokeCancel

func Info

func Info(format string, a ...interface{})

Info prints unless silence has been configured with SetSilence(true)

  • Info outputs to standard error
  • IsSilent() deteremines the state of silence
  • if debug is enabled, code location is appended

func InvokeCancel added in v0.4.28

func InvokeCancel(ctx context.Context)

InvokeCancel cancels the last CancelContext in ctx’ chain of contexts

  • ctx must have been returned by either NewCancelContext or NewCancelContextFunc
  • ctx nil is panic
  • ctx not from NewCancelContext or NewCancelContextFunc is panic
  • thread-safe, idempotent, deferrable

func InvokeIf added in v0.4.29

func InvokeIf[T comparable](tp *T, fn func())

InvokeIf is a deferrable function invoking its function argument when:

  • the pointer tp is non-nil and the function fn is non-nil
  • what tp points to is not a T zero-value

Usage:

someFlag := false
defer InvokeIf(&someFlag, someFunction)
…
someFlag = someValue

func IsClosed added in v0.4.180

func IsClosed[T any](closable Closable[T]) (isClosed bool)

IsClosed returns true if closable is closed or triggered

  • isClosed is a single boolean value usable with for or if
  • IsClosed wraps a 6-line read into a single-value boolean expression

func IsNil added in v0.4.36

func IsNil(v any) (isNil bool)

IsNil checks whether an interface value is truly nil

  • In Go, comparison of an interface value that has been assigned a concretely typed nil value yields unexpected results
  • (any)((*int)(nil)) == nil → false, where true is expected
  • IsNil((*int)(nil)) → true
  • as of go1.20.3, an interface value is 2 pointers,
  • — the first currently assigned type and
  • —the second currently assigned value

func IsSilent

func IsSilent() (isSilent bool)

IsSilent if true it means that Info does not print

func IsThisDebug

func IsThisDebug() bool

IsThisDebug returns whether the executing code location has debug logging enabled

  • true when -debug globally enabled using SetDebug(true)
  • true when the -verbose regexp set with SetRegexp matches
  • matched against pruntime.CodeLocation.FuncName
  • “github.com/haraldrudell/parl/mains.(*Executable).AddErr”

func IsThisDebugN added in v0.4.25

func IsThisDebugN(skipFrames int) (isDebug bool)

IsThisDebugN returns whether the specified stack frame has debug logging enabled. 0 means caller of IsThisDebugN.

  • true when -debug globally enabled using SetDebug(true)
  • true when the -verbose regexp set with SetRegexp matches

func Log

func Log(format string, a ...interface{})

Log always prints to standard error

  • if debug is enabled, code location is appended

func Logw added in v0.4.12

func Logw(format string, a ...interface{})

Logw always prints to standard error

  • Logw outputs without ensruing ending newline

func NewCancelContext added in v0.4.12

func NewCancelContext(ctx context.Context) (cancelCtx context.Context)

NewCancelContext creates a cancelable context without managing a CancelFunction value

  • NewCancelContext is like context.WithCancel but has the CancelFunc embedded.
  • after use, InvokeCancel must be invoked with cancelCtx as argument to release resources
  • for unexported code in context package to work, a separate type cannot be used

Usage:

ctx := NewCancelContext(context.Background())
…
InvokeCancel(ctx)

func NewCancelContextFunc added in v0.4.26

func NewCancelContextFunc(ctx context.Context, cancel context.CancelFunc) (cancelCtx context.Context)

NewCancelContextFunc stores the cancel function cancel in the context ctx. the returned context can be provided to InvokeCancel to cancel the context.

func NilError added in v0.4.120

func NilError(valueName string) (err error)

NilError returns an error used with panic() indicating that a value that cannot be nil, such as a function argument to a new function, was nil

  • such panics typically indicate compile-time issues with code

Usage:

func NewX(xValue *int) (x *X) {
  if xValue == nil {
    panic(parl.NilError("xValue")) // “somePackage.NewX xValue cannot be nil”

func NilPanic added in v0.4.179

func NilPanic[T any](label string, value T)

NilPanic panics with NilError label if value is nil

func NoopPrintf added in v0.4.160

func NoopPrintf(format string, a ...any)

NoopPrintf is a parl.PrintfFunc that does not print anything

func OnCancel added in v0.4.12

func OnCancel(fn func(), ctx context.Context)

OnCancel invokes fn when work done on behalf of context ctx should be canceled

func OnDebug added in v0.4.114

func OnDebug(invokedIfDebug func() string)

OnDebug is similar to parl.Debug but arguments are only resolved when debug is true, ie. when arguments should actually be printed

  • the argument can be a function literal invoking parl.Sprintf

Usage:

var x int
parl.OnDebug(func() string { return parl.Sprintf("before: %d", x)})

func Out

func Out(format string, a ...interface{})

Out always prints to standard out

  • if debug is enabled, code location is appended

func Outw added in v0.4.12

func Outw(format string, a ...interface{})

Outw always prints to standard out without ensuring terminating newline

func PanicToErr added in v0.4.117

func PanicToErr(errp *error, isPanic ...*bool)

PanicToErr recovers active panic, aggregating errors in errp

  • PanicToErr does not provide enclosing function. For that, use RecoverErr: “recover from panic in pack.Func…”
  • errp cannot be nil
  • if isPanic is non-nil and active panic, it is set to true
  • sample error message, including message in the panic value and the code line causing the panic:
  • “recover from panic: message: “runtime error: invalid memory address or nil pointer dereference” at parl.panicFunction()-panic-to-err_test.go:96”
  • parl recover options:
  • RecoverErr: aggregates to error pointer with enclosing function location, optional panic flag
  • Recover: aggregates to error pointer with enclosing function location, optional single-invocation parl.ErrorSink
  • Recover2: aggregates to error pointer with enclosing function location, optional multiple-invocation parl.ErrorSink
  • RecoverAnnotation: aggregates to error pointer with fixed-string annotation, optional single-invocation parl.ErrorSink
  • PanicToErr: aggregates to error pointer with generic annotation, optional panic flag
  • — preferrably: RecoverErr, Recover or Recover2 should be used to provide the package and function name of the enclosing function for the defer-statement that invoked recover
  • — PanicToErr and RecoverAnnotation cannot provide where in the stack trace recover was invoked

Usage:

func someFunc() (isPanic bool, err error) {
  defer parl.PanicToErr(&err, &isPanic)

func someGoroutine(g parl.Go) {
  var err error
  defer g.Register().Done(&err)
  defer parl.PanicToErr(&err)

func Recover

func Recover(deferredLocation func() DA, errp *error, errorSink ...ErrorSink1)

Recover recovers panic using deferred annotation

  • Recover creates a single aggregate error of *errp and any panic
  • if onError non-nil, the function is invoked zero or one time with the aggregate error
  • if onError nil, the error is logged to standard error
  • if errp is non-nil, it is updated with any aggregate error
  • parl recover options:
  • RecoverErr: aggregates to error pointer with enclosing function location, optional panic flag
  • Recover: aggregates to error pointer with enclosing function location, optional single-invocation parl.ErrorSink
  • Recover2: aggregates to error pointer with enclosing function location, optional multiple-invocation parl.ErrorSink
  • RecoverAnnotation: aggregates to error pointer with fixed-string annotation, optional single-invocation parl.ErrorSink
  • PanicToErr: aggregates to error pointer with generic annotation, optional panic flag
  • — preferrably: RecoverErr, Recover or Recover2 should be used to provide the package and function name of the enclosing function for the defer-statement that invoked recover
  • — PanicToErr and RecoverAnnotation cannot provide where in the stack trace recover was invoked

Usage:

func someFunc() (err error) {
  defer parl.Recover(func() parl.DA { return parl.A() }, &err, parl.NoOnError)

func Recover2

func Recover2(deferredLocation func() DA, errp *error, errorSink ...ErrorSink1)

Recover2 recovers panic using deferred annotation

  • if onError non-nil, the function is invoked zero, one or two times with any error in *errp and any panic
  • if onError nil, errors are logged to standard error
  • if errp is non-nil:
  • — if *errp was nil, it is updated with any panic
  • — if *errp was non-nil, it is updated with any panic as an aggregate error
  • parl recover options:
  • RecoverErr: aggregates to error pointer with enclosing function location, optional panic flag
  • Recover: aggregates to error pointer with enclosing function location, optional single-invocation parl.ErrorSink
  • Recover2: aggregates to error pointer with enclosing function location, optional multiple-invocation parl.ErrorSink
  • RecoverAnnotation: aggregates to error pointer with fixed-string annotation, optional single-invocation parl.ErrorSink
  • PanicToErr: aggregates to error pointer with generic annotation, optional panic flag
  • — preferrably: RecoverErr, Recover or Recover2 should be used to provide the package and function name of the enclosing function for the defer-statement that invoked recover
  • — PanicToErr and RecoverAnnotation cannot provide where in the stack trace recover was invoked

Usage:

func someFunc() (err error) {
  defer parl.Recover2(func() parl.DA { return parl.A() }, &err, parl.NoOnError)

func RecoverAnnotation added in v0.4.135

func RecoverAnnotation(annotation string, deferredLocation func() DA, errp *error, errorSink ...ErrorSink1)

RecoverAnnotation is like Recover but with fixed-string annotation

  • default annotation: “recover from panic:”
  • parl recover options:
  • RecoverErr: aggregates to error pointer with enclosing function location, optional panic flag
  • Recover: aggregates to error pointer with enclosing function location, optional single-invocation parl.ErrorSink
  • Recover2: aggregates to error pointer with enclosing function location, optional multiple-invocation parl.ErrorSink
  • RecoverAnnotation: aggregates to error pointer with fixed-string annotation, optional single-invocation parl.ErrorSink
  • PanicToErr: aggregates to error pointer with generic annotation, optional panic flag
  • — preferrably: RecoverErr, Recover or Recover2 should be used to provide the package and function name of the enclosing function for the defer-statement that invoked recover
  • — PanicToErr and RecoverAnnotation cannot provide where in the stack trace recover was invoked

Usage:

func someFunc() (err error) {
  defer parl.RecoverAnnotation("property " + property, func() parl.DA { return parl.A() }, &err, parl.NoOnError)

func RecoverErr added in v0.4.117

func RecoverErr(deferredLocation func() DA, errp *error, isPanic ...*bool)

RecoverErr recovers panic using deferred annotation

  • signature is error pointer and a possible isPanic pointer
  • parl recover options:
  • RecoverErr: aggregates to error pointer with enclosing function location, optional panic flag
  • Recover: aggregates to error pointer with enclosing function location, optional single-invocation parl.ErrorSink
  • Recover2: aggregates to error pointer with enclosing function location, optional multiple-invocation parl.ErrorSink
  • RecoverAnnotation: aggregates to error pointer with fixed-string annotation, optional single-invocation parl.ErrorSink
  • PanicToErr: aggregates to error pointer with generic annotation, optional panic flag
  • — preferrably: RecoverErr, Recover or Recover2 should be used to provide the package and function name of the enclosing function for the defer-statement that invoked recover
  • — PanicToErr and RecoverAnnotation cannot provide where in the stack trace recover was invoked

Usage:

func someFunc() (isPanic bool, err error) {
  defer parl.RecoverErr(func() parl.DA { return parl.A() }, &err, &isPanic)

func SetDebug

func SetDebug(debug bool)

if SetDebug is true, Debug prints everywhere produce output

  • other printouts have location appended
  • More selective debug printing can be achieved using SetInfoRegexp that matches on function names.

func SetRegexp

func SetRegexp(regExp string) (err error)

SetRegexp defines a regular expression for function-level debug printing to stderr.

  • SetRegexp affects Debug() GetDebug() IsThisDebug() IsThisDebugN() functions.

Regular Expression

Regular expression is the RE2 syntax used by golang. command-line documentation: “go doc regexp/syntax”. The regular expression is matched against code location.

Code Location Format

Code location is the fully qualified function name for the executing code line being evaluated. This is a fully qualified golang package path, ".", a possible type name in parenthesis ending with "." and the function name.

  • method with pointer receiver:
  • — "github.com/haraldrudell/parl/mains.(*Executable).AddErr"
  • — sample regexp: mains...Executable..AddErr
  • top-level function:
  • — "github.com/haraldrudell/parl/g0.NewGoGroup"
  • — sample regexp: g0.NewGoGroup

To obtain the fully qualified function name for a particular location:

parl.Log(pruntime.NewCodeLocation(0).String())

func SetSilent

func SetSilent(silent bool)

SetSilent(true) prevents Info() invocations from printing

func Short added in v0.4.26

func Short(tim ...time.Time) (s string)

Short provides a brief time-stamp in compact second-precision including time-zone.

  • sample: 060102_15:04:05-08
  • The timestamp does not contain space.
  • time zone is what is included in tim, typically time.Local
  • if tim is not specified, time.Now() in local time zone

func ShortMs added in v0.4.27

func ShortMs(tim ...time.Time) (s string)

ShortMs provides a brief time-stamp in compact milli-second-precision including time-zone.

  • sample: 060102_15:04:05.123-08
  • The timestamp does not contain space.
  • time zone is what is included in tim, typically time.Local
  • if tim is not specified, time.Now() in local time zone

func ShortSpace added in v0.4.28

func ShortSpace(tim ...time.Time) (s string)

Short provides a brief time-stamp in compact second-precision including time-zone.

  • sample: 060102 15:04:05-08
  • date is 6-digit separated from time by a space
  • time zone is what is included in tim, typically time.Local
  • if tim is not specified, time.Now() in local time zone

func Sprintf added in v0.2.0

func Sprintf(format string, a ...any) string

Sprintf is a printer that supports comma in large numbers

func Uintptr added in v0.4.97

func Uintptr(v any) (p uintptr)

Uintptr returns v as a pointer

  • usable with fmt.Printf %x
  • if uintptr is not used, Printf may go off interpreting the value pointed to, depending on its type

Usage:

var p = &SomeStruct{}
parl.Log("p: 0x%x", parl.Uintptr(p))

Types

type AdbAdressProvider added in v0.4.0

type AdbAdressProvider interface {
	// AdbSocketAddress retrievs the tcp socket address used
	// by a near Adbette implementation
	AdbSocketAddress() (socketAddress AdbSocketAddress)
}

AdressProvider retrieves the address from an adb server or device so that custom devices can be created

type AdbRequest added in v0.3.0

type AdbRequest string

AdbRequest is a string formatted as an adb request. AdbRequest is only required for implementations using the Adbette protocol impementation

type AdbResponseID added in v0.4.0

type AdbResponseID string

type AdbSocketAddress added in v0.4.0

type AdbSocketAddress string

AdbSocketAddress is a tcp socket address accessible to the local host. The format is two parts separated by a colon. The first part is an IP address or hostname. The second part is a numeric port number. The empty string "" represents "localhost:5037". If the port part is missing, such as "localhost" it implies port 5037. If the host part is missing, it implies "localhost". Note that locahost is valid both for IPv4 and IPv6.

type AdbSyncRequest added in v0.4.0

type AdbSyncRequest string

type Adbette added in v0.3.0

type Adbette interface {
	// SendReadOkay sends a request to a remote adb endpoint.
	// if anything else than OKAY is received back from the
	// remote endpoint, err is non-nil.
	SendReadOkay(request AdbRequest) (err error)
	// ReadString reads utf-8 text up to 64 KiB-1 in length
	ReadString() (s string, err error)
	// ConnectToDevice sends a forwarding request to an adb
	// server to connect to one of its devices
	ConnectToDevice(serial AndroidSerial) (err error)
	// Shell executes a shell command on a device connected to the adb server.
	//	- out is a combination of stderr and stdout.
	//	- The status code from an on-device command cannot be obtained
	Shell(command string) (out []byte, err error)
	// ShellStream executes a shell command on the device returning a readable socket
	ShellStream(command string) (conn io.ReadWriteCloser, err error)
	// TrackDevices orders a server to emit serial number as they become available
	TrackDevices() (err error)
	// Devices lists the currently online serials
	Devices() (serials []AndroidSerial, err error)
	// DeviceStati returns all available serials and their status
	// The two slices correspond and are of the same length
	DeviceStati() (serials []AndroidSerial, stati []AndroidStatus, err error)
	// Closer closes an adb connection, meant to be a deferred function
	Closer(errp *error)
	// SetSync sets the protocol mode of an adb device connection to sync
	SetSync() (err error)
	// LIST is a sync request that lists file system entries in a directory of an adb device
	LIST(remoteDir string, dentReceiver func(mode uint32, size uint32, time uint32, byts []byte) (err error)) (err error)
	// RECV fetches the contents of a file on an adb device
	RECV(remotePath string, blobReceiver func(data []byte) (err error)) (err error)
	// CancelError is a value that a LIST or RECV callback routines can return to
	// cancel further invocations
	CancelError() (cancelError error)

	SendBlob(syncRequest AdbSyncRequest, data []byte) (err error)
	ReadBlob() (byts []byte, err error)
	ReadResponseID() (responseID AdbResponseID, err error)
	ReadBytes(byts []byte) (err error)
	SendBytes(byts []byte) (err error)
}

Adbette is a minimal implementation of the adb Android debug bridge protocol. Adbette include both adb server and Android device functions. Adbette is extensible in that additional protocol features are easily implemented without concern for protocol details. to shutdown an Adbette and release its resouces, invoke the Closer method

type Adbetter added in v0.3.0

type Adbetter interface {
	NewConnection(address AdbSocketAddress, ctx context.Context) (conn Adbette, err error)
}

Adbetter is a factory instance for connections featuring Adbette context is used for terminating a connection attempt. context is not retained in the connection.

type AddError deprecated added in v0.4.106

type AddError func(err error)

AddError is a function to submit non-fatal errors

Deprecated: should use github.com/haraldrudell/parl.ErrorSink possibly the error container github.com/haraldrudell/parl.ErrSlice

var NoAddErr AddError

absent parl.AddError argument

Deprecated: should use github.com/haraldrudell/parl.ErrorSink possibly the error container github.com/haraldrudell/parl.ErrSlice

type AggregatePriority added in v0.4.30

type AggregatePriority[V any, P constraints.Ordered] interface {
	// Aggregator returns the aggregator associated with this AggregatePriority
	Aggregator() (aggregator Aggregator[V, P])
	// Update caches the current priority from the aggregator
	Update()
	// Priority returns the effective cached priority
	//	- Priority is used by consumers of the AggregatingPriorityQueue
	CachedPriority() (priority P)
	// Index indicates insertion order
	//	- used for ordering elements of equal priority
	Index() (index int)
}

AggregatePriority caches the priority value from an aggregator for priority.

  • V is the value type used as a pointer
  • P is the priority type descending order, ie. Integer Floating-Point string

type AggregatingPriorityQueue added in v0.4.30

type AggregatingPriorityQueue[V any, P constraints.Ordered] interface {
	// Get retrieves a possible value container associated with valuep
	Get(valuep *V) (aggregator Aggregator[V, P], ok bool)
	// Put stores a new value container associated with valuep
	//   - the valuep is asusmed to not have a node in the queue
	Put(valuep *V, aggregator Aggregator[V, P])
	// Update re-prioritizes a value
	Update(valuep *V)
	// Clear empties the priority queue. The hashmap is left intact.
	Clear()
	// List returns the first n or default all values by pirority
	List(n ...int) (aggregatorQueue []AggregatePriority[V, P])
}

AggregatingPriorityQueue uses cached priority obtained from Aggregators that operates on the values outside of the AggregatingPriorityQueue.

  • the Update method reprioritizes an updated value element

type Aggregator added in v0.4.30

type Aggregator[V any, P constraints.Ordered] interface {
	// Value returns the value object this Aggregator is associated with
	//	- the Value method is used by consumers of the AggregatingPriorityQueue
	Value() (valuep *V)
	// Aggregate aggregates and snapshots data values from the value object.
	//	- Aggregate is invoked outside of AggregatingPriorityQueue
	Aggregate()
	// Priority returns the current priority for the associated value
	//	- this priority is cached by AggregatePriority
	Priority() (priority P)
}

Aggregator aggregates, snapshots and assigns priority to an associated value.

  • V is the value type used as a pointer
  • V may be a thread-safe object whose values change in real-time
  • P is the priority type descending order, ie. Integer Floating-Point string

type AllSource added in v0.4.180

type AllSource[T any] interface {
	// Get GetSlice DataWaitCh
	Source[T]
	GetAll() (values []T)
}

Get GetSlice GetAll DataWaitCh

type AndroidSerial added in v0.3.0

type AndroidSerial string

AndroidSerial uniquely identities an Android device.

  • has .String and .IsValid, is Ordered
  • typically a string of a dozen or so 8-bit chanacters consisting of lower and upper case a-zA-Z0-9

func NewAndroidSerial added in v0.4.113

func NewAndroidSerial(s string) (serial AndroidSerial)

NewAndroidSerial returns Android serial for s

  • typically a string of a dozen or so 8-bit chanacters consisting of lower and upper case a-zA-Z0-9

func (AndroidSerial) IsValid added in v0.4.106

func (a AndroidSerial) IsValid() (isValid bool)

IsValid() returns whether a contains a valid Android serial

func (AndroidSerial) String added in v0.4.106

func (a AndroidSerial) String() (s string)

type AndroidStatus added in v0.3.0

type AndroidStatus string

AndroidStatus indicates the current status of a device known to a Server or Serverette

  • AndroidStatus is a single word of ANSII-set characters
const AndroidOnline AndroidStatus = "device"

AndroidOnline is the Android device status that indicates an online device

  • can be checked using method [AndroidSerial.IsOnline]

func NewAndroidStatus added in v0.4.113

func NewAndroidStatus(s string) (status AndroidStatus)

NewAndroidStatus returns Anddroid status of s

  • AndroidStatus is a single word of ANSII-set characters

func (AndroidStatus) IsOnline added in v0.4.113

func (a AndroidStatus) IsOnline() (isOnline bool)

IsOnline returns whether the Android status is device online, ie. ready for interactions

func (AndroidStatus) IsValid added in v0.4.113

func (a AndroidStatus) IsValid() (isValid bool)

IsValid returns whether a conatins a valid Android device status

func (AndroidStatus) String added in v0.4.113

func (a AndroidStatus) String() (s string)

type AssignedPriority added in v0.4.30

type AssignedPriority[V any, P constraints.Ordered] interface {
	SetPriority(priority P)
}

AssignedPriority contains the assigned priority for a priority-queue element

  • V is the element value type whose pointer-value provides identity
  • P is the priority, a descending-ordered type: Integer Floating-Point string

type Atomic32 added in v0.4.143

type Atomic32[T uint32Types] struct {
	// contains filtered or unexported fields
}

Atomic32 is a generic 32-bit integer with atomic access

  • generic for named types of select signed and unsigned underlying integers
  • generic for select built-in integer types
  • includes ~int ~uint
  • excludes ~uint64 ~uintptr ~int64
  • for large values or excluded types, use Atomic64
  • generic version of atomic.Uint32
  • when using int or uint underlying type on a 64-bit platform, type-conversion data loss may occur for larger than 32-bit values
  • no performance impact compared to other atomics

func (*Atomic32[T]) Add added in v0.4.143

func (a *Atomic32[T]) Add(delta T) (new T)

Add atomically adds delta to a and returns the new value.

func (*Atomic32[T]) CompareAndSwap added in v0.4.143

func (a *Atomic32[T]) CompareAndSwap(old, new T) (swapped bool)

CompareAndSwap executes the compare-and-swap operation for a.

func (*Atomic32[T]) Load added in v0.4.143

func (a *Atomic32[T]) Load() (value T)

Load atomically loads and returns the value stored in a.

func (*Atomic32[T]) Store added in v0.4.143

func (a *Atomic32[T]) Store(val T)

Store atomically stores val into a.

func (*Atomic32[T]) Swap added in v0.4.143

func (a *Atomic32[T]) Swap(new T) (old T)

Swap atomically stores new into a and returns the previous value.

type Atomic64 added in v0.4.148

type Atomic64[T constraints.Integer] struct{ atomic.Uint64 }

Atomic64 is a generic 64-bit integer with atomic access

  • generic for named types of any underlying integer or any built-in integer type
  • generic version of atomic.Uint64
  • Atomic64[int] is atomic int for all platforms
  • go1.21.5 due to alignment using atomic.align64, Atomic64 must be based on atomic.Uint64

func (*Atomic64[T]) Add added in v0.4.148

func (a *Atomic64[T]) Add(delta T) (new T)

Add atomically adds delta to a and returns the new value.

func (*Atomic64[T]) CompareAndSwap added in v0.4.148

func (a *Atomic64[T]) CompareAndSwap(old, new T) (swapped bool)

CompareAndSwap executes the compare-and-swap operation for a.

func (*Atomic64[T]) Load added in v0.4.148

func (a *Atomic64[T]) Load() (value T)

Load atomically loads and returns the value stored in a.

func (*Atomic64[T]) Store added in v0.4.148

func (a *Atomic64[T]) Store(val T)

Store atomically stores val into a.

func (*Atomic64[T]) Swap added in v0.4.148

func (a *Atomic64[T]) Swap(new T) (old T)

Swap atomically stores new into x and returns the previous value.

type AtomicCounter added in v0.4.41

type AtomicCounter atomic.Uint64

AtomicCounter is a uint64 thread-safe counter

  • atomic.Uint64 added:
  • Inc Dec delegating to Add
  • Inc2 Dec2: preventing wrap-around CompareAndSwap mechanic
  • Set sets particular value
  • Value returns current value

func (*AtomicCounter) Add added in v0.4.43

func (a *AtomicCounter) Add(value uint64) (newValue uint64)

Add is add with wrap-around. Thread-Safe

func (*AtomicCounter) Dec added in v0.4.41

func (a *AtomicCounter) Dec() (value uint64)

Dec is decrement with wrap-around. Thread-Safe

func (*AtomicCounter) Dec2 added in v0.4.106

func (a *AtomicCounter) Dec2() (value uint64, didDec bool)

Dec2 is decrement with no wrap-around. Thread-Safe

  • at 0, decrements are ineffective

func (*AtomicCounter) Inc added in v0.4.41

func (a *AtomicCounter) Inc() (value uint64)

Inc increments with wrap-around. Thread-Safe

  • value is new value

func (*AtomicCounter) Inc2 added in v0.4.106

func (a *AtomicCounter) Inc2() (value uint64, didInc bool)

Inc2 is increment without wrap-around. Thread-Safe

  • at math.MaxUint64, increments are ineffective

func (*AtomicCounter) Set added in v0.4.46

func (a *AtomicCounter) Set(value uint64) (oldValue uint64)

Set sets a new aggregate value. Thread-Safe

func (*AtomicCounter) Value added in v0.4.41

func (a *AtomicCounter) Value() (value uint64)

Value returns current counter-value

type AtomicError added in v0.4.178

type AtomicError struct {
	// contains filtered or unexported fields
}

AtomicError is a thread-safe container for a single error value

  • AtomicError.AddError updates the errorvalue
  • AtomicError.AddErrorSwap conditionally updates the error value
  • AtomicError.Error returns the current error value
  • AtomicError is not closable and holds only one updatable value
  • AtomicError is not awaitable or readable to empty
  • consecutive Get returns the same error value
  • initialization-free, not awaitable

func (*AtomicError) AddError added in v0.4.178

func (a *AtomicError) AddError(err error)

AddError is a function to submit non-fatal errors

  • if the container is not empty, err is appended to the current error value
  • values are received by [ErrorSource1.Error]

func (*AtomicError) AddErrorSwap added in v0.4.179

func (a *AtomicError) AddErrorSwap(oldErr, newErr error) (didSwap bool, otherErr error)

AddErrorSwap is thread-safe atomic swap of error values

  • oldErr: nil or an error returned by AtomicError.AddErrorSwap or this methods’s otherErr
  • newErr: the error to store
  • didSwap true: newErr was stored either because of empty or matching oldErr
  • didSwap false: the error held by the container that is differerent from oldErr and is returned in otherErr
  • AddErrorSwap writes newErr if:
  • — oldErr is nil and the errror container is empty or
  • — oldErr matches the errror held by the error container

func (*AtomicError) Error added in v0.4.178

func (a *AtomicError) Error() (err error, hasValue bool)

Error returns the error value

  • hasValue true: err is non-nil
  • hasValue false: the error source is currently empty

type AtomicMax added in v0.4.41

type AtomicMax[T constraints.Integer] struct {
	// contains filtered or unexported fields
}

AtomicMax is a thread-safe max container

  • hasValue indicator true if a value was equal to or greater than threshold
  • optional threshold for minimum accepted max value
  • generic for any basic or named Integer type
  • negative values cause panic
  • can used to track maximum time.Duration that should never be negative
  • if threshold is not used, initialization-free
  • wait-free CompareAndSwap mechanic

func NewAtomicMax added in v0.4.41

func NewAtomicMax[T constraints.Integer](threshold T) (atomicMax *AtomicMax[T])

NewAtomicMax returns a thread-safe max container

  • T any basic or named type with underlying type integer
  • negative values not allowed and cause panic
  • if threshold is not used, AtomicMax is initialization-free

func (*AtomicMax[T]) Max added in v0.4.41

func (m *AtomicMax[T]) Max() (value T, hasValue bool)

Max returns current max and value-present flag

  • hasValue true indicates that value reflects a Value invocation
  • hasValue false: value is zero-value
  • Thread-safe

func (*AtomicMax[T]) Max1 added in v0.4.44

func (m *AtomicMax[T]) Max1() (value T)

Max1 returns current maximum whether zero-value or set by Value

  • threshold is ignored
  • Thread-safe

func (*AtomicMax[T]) Value added in v0.4.41

func (m *AtomicMax[T]) Value(value T) (isNewMax bool)

Value updates the container with a possible max value

  • value cannot be negative, that is panic
  • isNewMax is true if:
  • — value is equal to or greater than any threshold and
  • — invocation recorded the first 0 or
  • — a new max
  • upon return, Max and Max1 are guaranteed to reflect the invocation
  • the return order of concurrent Value invocations is not guaranteed
  • Thread-safe

type AtomicMin added in v0.4.43

type AtomicMin[T constraints.Integer] struct {
	// contains filtered or unexported fields
}

AtomicMin is a thread-safe container for a minimum value of any integer type

  • hasValue indicator
  • generic for any underlying Integer type
  • if type is signed, min may be negative
  • lock for first Value invocation
  • initialization-free

func (*AtomicMin[T]) Min added in v0.4.43

func (a *AtomicMin[T]) Min() (value T, hasValue bool)

Min returns current minimum value and a flag whether a value is present

  • Thread-safe

func (*AtomicMin[T]) Value added in v0.4.43

func (a *AtomicMin[T]) Value(value T) (isNewMin bool)

Value notes a new min-candidate

  • if not a new minima, state is not changed
  • Thread-safe

type Awaitable added in v0.4.115

type Awaitable struct {
	// contains filtered or unexported fields
}

Awaitable is a semaphore allowing any number of threads to observe and await any number of events in parallel

  • Awaitable.Ch returns an awaitable channel closing on trig of awaitable. The initial channel state is open
  • Awaitable.Close triggers the awaitable, ie. closes the channel. Upon return, the channel is guaranteed to be closed
  • — with optional EvCon argument, Close is eventually consistent, ie. Close may return prior to channel actually closed for higher performance
  • Awaitable.IsClosed returns whether the awaitable is triggered, ie. if the channel is closed
  • initialization-free, one-to-many wait mechanic, synchronizes-before, observable
  • use of channel as mechanic allows consumers to await multiple events
  • Awaitable costs a lazy channel and pointer allocation
  • note: parl.CyclicAwaitable is re-armable, cyclic version
  • alternative low-blocking inter-thread mechanics are sync.WaitGroup and sync.RWMutex
  • — neither is observable and the consumer cannot await other events in parallel
  • — RWMutex cyclic use has inversion of control issues
  • — WaitGroup lacks control over waiting threads requiring cyclic use to employ a re-created pointer and value
  • — both are less performant for the managing thread

func (*Awaitable) Ch added in v0.4.115

func (a *Awaitable) Ch() (ch AwaitableCh)

Ch returns an awaitable channel. Thread-safe

func (*Awaitable) Close added in v0.4.115

func (a *Awaitable) Close(eventuallyConsistent ...bool) (didClose bool)

Close triggers awaitable by closing the channel

  • upon return, the channel is guaranteed to be closed
  • eventuallyConsistent EvCon: may return before the channel is atcually closed for higher performance
  • idempotent, deferrable, panic-free, thread-safe

func (*Awaitable) IsClosed added in v0.4.115

func (a *Awaitable) IsClosed() (isClosed bool)

isClosed inspects whether the awaitable has been triggered

  • on true return, it is guaranteed that the channel has been closed
  • Thread-safe

type AwaitableCh added in v0.4.115

type AwaitableCh <-chan struct{}

AwaitableCh is a one-to-many inter-thread wait-mechanic with happens-before

  • AwaitableCh implements a semaphore
  • implementation is a channel whose only allowed operation is channel receive
  • AwaitableCh transfers no data, instead channel close is the significant event

Usage:

<-ch // waits for event

select {
  case <-ch:
    hasHappened = true
  default:
    hasHappened = false
}

type AwaitableSlice added in v0.4.172

type AwaitableSlice[T any] struct {
	// contains filtered or unexported fields
}

AwaitableSlice is a queue as thread-safe awaitable unbound slice of element value T or slices of value T

  • AwaitableSlice.Send AwaitableSlice.Get allows efficient transfer of single values
  • AwaitableSlice.SendSlice AwaitableSlice.GetSlice allows efficient transfer of slices where: a sender relinquish slice ownership by invoking SendSlice and a receiving thread gains slice ownership by invoking GetSlice
  • lower performing AwaitableSlice.SendClone
  • AwaitableSlice.DataWaitCh returns a channel that closes once data is available making the queue awaitable
  • AwaitableSlice.EmptyCh returns a channel that closes on slice empty, configurable to provide close-like behavior
  • AwaitableSlice.SetSize allows for setting initial slice capacity
  • AwaitableSlice benefits:
  • — #1 many-to-many thread-synchronization mechanic
  • — #2 trouble-free, closable value-sink: non-blocking unbound send, near-non-deadlocking, panic-free and error-free object
  • — #3 initialization-free, awaitable and observable, thread-less and thread-safe
  • — features channel-based wait usable with Go select and default: a consumer may wait for many events or poll for value or close
  • — unbound with tunable low-allocation
  • — contention-separation between Send SendSlice SendClone and Get GetSlice
  • — high-throughput multiple-value operation using SendSlice GetSlice
  • — slice size logic avoids large-slice memory leaks
  • — zero-out of unused slice elements avoids temporary memory leaks
  • — although the slice can transfer values almost allocation free or multiple values at a time, the wait mechanic requires pointer allocation 10 ns, channel make 21 ns, channel close 9 ns as well as CAS operation 8/21 ns
  • compared to Go channel:
  • — #1: has no errors or panics, blocking send or thread requirements
  • — #2: many-to-many: many threads can await slice data or close event, a thread can await many slices or events
  • — #3: initialization-free, observable with unbound size
  • — unbound, non-blocking-send that is error and panic free
  • — happens-before with each received value or detection of value avaliable or close: similar to unbuffered channel guarantees while being buffered
  • — closable by any thread without race condition
  • — observable, idempotent, panic-free and error-free close while also able to transmit values
  • — closing channel one-to-many mechanic for awaiting data and close. Data synchronization is sync.Mutex and the queue is slice. All is shielded by atomic performance
  • — for high parallelism, AwaitableSlice sustains predominately atomic performance while channel has 100× deteriorating unshielded lock performance as of go1.22.3

Usage:

var valueQueue parl.AwaitableSlice[*Value]
go func(valueSink parl.ValueSink) {
  defer valueSink.EmptyCh()
  …
  valueSink.Send(value)
  …
}(&valueQueue)
endCh := valueQueue.EmptyCh(parl.CloseAwait)
for {
  select {
  case <-valueQueue.DataWaitCh():
    for value := valueQueue.Init(); valueQueue.Condition(&value); {
      doSomething(value)
    }
  case <-endCh:
    break
}
// the slice closed
…
// to reduce blocking, at most 100 at a time
for i, hasValue := 0, true; i < 100 && hasValue; i++ {
  var value *Value
  if value, hasValue = valueQueue.Get(); hasValue {
    doSomething(value)

func (*AwaitableSlice[T]) AwaitValue added in v0.4.181

func (s *AwaitableSlice[T]) AwaitValue() (value T, hasValue bool)

AwaitValue awaits value or close, blocking until either event

  • hasValue true: value is valid, possibly the zero-value like a nil interface value
  • hasValue: false: the stream is closed
  • stream: an awaitable possibly closable source type like Source1
  • — stream’s DataWaitCh Get and if present EmptyCh methods are used
  • — stream cannot be eg. AtomicError because it is not awaitable
  • AwaitValue wraps a 10-line read operation as a two-value expression

func (*AwaitableSlice[T]) Condition added in v0.4.176

func (s *AwaitableSlice[T]) Condition(valuep *T) (hasValue bool)

Condition allows for AwaitableSlice to be used in a for clause

  • updates a value variable and returns whether values are present
  • thread-safe

Usage:

var a AwaitableSlice[…] = …
for value := a.Init(); a.Condition(&value); {
  // process received value
}
// the AwaitableSlice closed

func (*AwaitableSlice[T]) DataWaitCh added in v0.4.172

func (s *AwaitableSlice[T]) DataWaitCh() (ch AwaitableCh)

DataWaitCh returns a channel that is open on empty and closes once values are available

  • each DataWaitCh invocation may return a different channel value
  • Thread-safe

func (*AwaitableSlice[T]) EmptyCh added in v0.4.173

func (s *AwaitableSlice[T]) EmptyCh(doNotInitialize ...bool) (ch AwaitableCh)

EmptyCh returns an awaitable channel that closes on queue being or becoming empty

  • doNotInitialize missing: enable closing of ch which will happen as soon as the slice is empty, possibly prior to return
  • doNotInitialize CloseAwaiter: obtain the channel but do not enable it closing. A subsequent invocation with doNotInitialize missing will enable its closing thus act as a deferred Close function
  • EmptyCh always returns the same channel value
  • thread-safe

func (*AwaitableSlice[T]) Get added in v0.4.172

func (s *AwaitableSlice[T]) Get() (value T, hasValue bool)

Get returns one value if the queue is not empty

  • hasValue true: value is valid
  • hasValue false: the queue is empty
  • Get may attain allocation-free receive or allocation-free operation
  • — a slice is not returned
  • — an internal slice may be reused reducing allocations
  • thread-safe

func (*AwaitableSlice[T]) GetAll added in v0.4.173

func (s *AwaitableSlice[T]) GetAll() (values []T)

GetAll returns a single slice of all unread values in the queue

  • values nil: the queue is empty
  • thread-safe

func (*AwaitableSlice[T]) GetSlice added in v0.4.174

func (s *AwaitableSlice[T]) GetSlice() (values []T)

GetSlice returns a slice of values from the queue

  • values non-nil: a non-empty slice at a time, not necessarily all data. values is never non-nil and empty
  • — Send-GetSlice: each GetSlice empties the queue
  • — SendMany-GetSlice: each GetSlice receives one SendMany slice
  • values nil: the queue is empty
  • GetSlice may increase performance by slice-at-a-time operation, however, slices need to be allocated:
  • — Send-GetSlice requires internal slice allocation
  • — SendMany-GetSlice requires sender to allocate slices
  • — Send-Get1 may reduce allocations
  • thread-safe

func (*AwaitableSlice[T]) Init added in v0.4.176

func (s *AwaitableSlice[T]) Init() (value T)

Init allows for AwaitableSlice to be used in a for clause

  • returns zero-value for a short variable declaration in a for init statement
  • thread-safe

Usage:

var a AwaitableSlice[…] = …
for value := a.Init(); a.Condition(&value); {
  // process value
}
// the AwaitableSlice closed

func (*AwaitableSlice[T]) IsClosed added in v0.4.181

func (s *AwaitableSlice[T]) IsClosed() (isClosed bool)

IsClosed returns true if closable is closed or triggered

  • isClosed is a single boolean value usable with for or if
  • IsClosed wraps a 6-line read into a single-value boolean expression

func (*AwaitableSlice[T]) Send added in v0.4.172

func (s *AwaitableSlice[T]) Send(value T)

Send enqueues a single value. Thread-safe

func (*AwaitableSlice[T]) SendClone added in v0.4.177

func (s *AwaitableSlice[T]) SendClone(values []T)

SendClone provides a value-slice without transferring ownership of a slice to the queue

  • allocation
  • Thread-safe

func (*AwaitableSlice[T]) SendSlice added in v0.4.172

func (s *AwaitableSlice[T]) SendSlice(values []T)

SendSlice provides values by transferring ownership of a slice to the queue

  • SendSlice may reduce allocations and increase performance by handling multiple values
  • Thread-safe

func (*AwaitableSlice[T]) SetSize added in v0.4.172

func (s *AwaitableSlice[T]) SetSize(size int)

SetSize set initial allocation size of slices. Thread-safe

type CBFunc added in v0.4.41

type CBFunc func(reason CBReason, maxParallelism uint64, maxLatency time.Duration, threadID ThreadID)

CBFunc is a thread-safe function invoked on

  • reason == ITParallelism: parallelism exceeding parallelismWarningPoint
  • reason == ITLatency: latency of an ongoing or just ended invocation exceeds latencyWarningPoint

type CBReason added in v0.4.41

type CBReason uint8

CBReason explains to consumer why parl.InvocationTimer invoked the callback

  • ITParallelism ITLatency
const (
	// [parl.InvocationTimer] callback due to increased parallelism
	ITParallelism CBReason = iota + 1
	// [parl.InvocationTimer] callback due to increased latency
	ITLatency
)

func (CBReason) String added in v0.4.41

func (r CBReason) String() (s string)

type Callbacker added in v0.4.179

type Callbacker[T any] interface {
	// Callback is an asynchronous return of a value
	//	- Callback typically needs to be thread-safe
	Callback(value T)
}

Callbacker accepts asynchronously provided values

type CallbackerErr added in v0.4.179

type CallbackerErr[T any] interface {
	// Callback is an asynchronous return of a value that can fail
	//	- Callback typically needs to be thread-safe
	Callback(value T) (err error)
}

CallbackerErr accepts asynchronously provided values and can fail

type CallbackerNoArg added in v0.4.179

type CallbackerNoArg interface {
	// Callback is an asynchronous event notifier
	//	- Callback typically needs to be thread-safe
	Callback()
}

CallbackerNoArg receives asynchronous event notifications

var NoCallbackerNoArg CallbackerNoArg

type CallbackerNoArgErr added in v0.4.179

type CallbackerNoArgErr interface {
	// Callback is an asynchronous event notifier that can fail
	//	- Callback typically needs to be thread-safe
	Callback() (err error)
}

CallbackerNoArgErr receives asynchronous event notifications and can fail

var NoCallbackerNoArgErr CallbackerNoArgErr

type CbSlowDetector added in v0.4.44

type CbSlowDetector func(sdi *SlowDetectorInvocation, hasReturned bool, duration time.Duration)

type Certificate added in v0.4.26

type Certificate interface {
	DER() (der CertificateDer)
	PEM() (pemBytes PemBytes)
	ParseCertificate() (certificate *x509.Certificate, err error)
}

type CertificateAuthority added in v0.4.26

type CertificateAuthority interface {
	Check() (cert *x509.Certificate, err error) // gets x509.Certificate version
	DER() (certificateDer CertificateDer)       // untyped bytes, der: Distinguished Encoding Rules binary format
	Sign(template *x509.Certificate, publicKey crypto.PublicKey) (certDER CertificateDer, err error)
	PEM() (pemBytes PemBytes)
	Private() (privateKey PrivateKey)
}

type CertificateDer added in v0.4.26

type CertificateDer []byte

CertificateDer is a binary encoding of a certificate. der: Distinguished Encoding Rules is a binary format based on asn1.

type Closable added in v0.4.180

type Closable[T any] interface {
	EmptyCh(doNotInitialize ...bool) (ch AwaitableCh)
	IsClosed() (isClosed bool)
}

EmptyCh

type ClosableAllSource added in v0.4.180

type ClosableAllSource[T any] interface {
	AllSource[T]
	EmptyCh(doNotInitialize ...bool) (ch AwaitableCh)
}

type ClosableChan added in v0.4.0

type ClosableChan[T any] struct {
	// contains filtered or unexported fields
}

ClosableChan wraps a channel with thread-safe idempotent panic-free observable close.

  • ClosableChan is initialization-free
  • Close is deferrable
  • IsClosed provides wether the channel is closed

Usage:

var errCh parl.ClosableChan[error]
go thread(&errCh)
err, ok := <-errCh.Ch()
if errCh.IsClosed() { // can be inspected
…

func thread(errCh *parl.ClosableChan[error]) {
  var err error
  …
  defer errCh.Close(&err) // will not terminate the process
  errCh.Ch() <- err

func NewClosableChan added in v0.4.5

func NewClosableChan[T any](ch ...chan T) (closable *ClosableChan[T])

NewClosableChan returns a channel with idempotent panic-free observable close

  • ch is an optional non-closed channel object
  • if ch is not present, an unbuffered channel will be created
  • cannot use lock in new function
  • if an unbuffered channel is used, NewClosableChan is not required

func (*ClosableChan[T]) Ch added in v0.4.0

func (c *ClosableChan[T]) Ch() (ch chan T)

Ch retrieves the channel as bi-directional. Thread-safe

  • nil is never returned
  • the channel may be closed, use IsClosed to determine
  • do not close the channel other than using Close method
  • per Go channel close, if one thread is blocked in channel send while another thread closes the channel, a data race occurs
  • thread-safe solution is to set an additional indicator of close requested and then reading the channel which releases the sending thread

func (*ClosableChan[T]) Close added in v0.4.0

func (cl *ClosableChan[T]) Close(errp ...*error) (didClose bool, err error)

Close ensures the channel is closed

  • Close does not return until the channel is closed
  • all invocations have the same close result in err
  • didClose indicates whether this invocation closed the channel
  • if errp is non-nil, it will receive the close result
  • per Go channel close, if one thread is blocked in channel send while another thread closes the channel, a data race occurs
  • thread-safe, panic-free, deferrable, idempotent, observable
  • Close does not feature deferred close indication
  • — caller must ensure no channel send is in progress
  • — channel send after Close will fail
  • — a buffered channel can be read to empty after Close

func (*ClosableChan[T]) IsClosed added in v0.4.0

func (c *ClosableChan[T]) IsClosed(includePending ...bool) (isClosed bool)

IsClosed indicates whether the channel is closed. Thread-safe

  • includePending: because there is a small amount of time between
  • — a thread discovering the channel closed and
  • — closeOnce indicating close complete
  • includePending true includes a check for the channel being about to close

func (*ClosableChan[T]) ReceiveCh added in v0.4.100

func (c *ClosableChan[T]) ReceiveCh() (ch <-chan T)

ReceiveCh retrieves the channel as receive-only. Thread-safe

  • nil is never returned
  • the channel may already be closed

func (*ClosableChan[T]) SendCh added in v0.4.100

func (c *ClosableChan[T]) SendCh() (ch chan<- T)

SendCh retrieves the channel as send-only. Thread-safe

  • nil is never returned
  • the channel may already be closed
  • do not close the channel other than using the Close method
  • per Go channel close, if one thread is blocked in channel send while another thread closes the channel, a data race occurs
  • thread-safe solution is to set an additional indicator of close requested and then reading the channel which releases the sending thread

type ClosableSink added in v0.4.180

type ClosableSink[T any] interface {
	// Send SendSlice SendClone
	Sink[T]
	// EmptyCh
	Closable[T]
}

type ClosableSource added in v0.4.180

type ClosableSource[T any] interface {
	Source[T]
	EmptyCh(doNotInitialize ...bool) (ch AwaitableCh)
}

type ClosableSource1 added in v0.4.180

type ClosableSource1[T any] interface {
	// Get DataWaitCh
	Source1[T]
	// EmptyCh
	Closable[T]
}

Get DataWaitCh EmptyCh

type ClosableSourceSink added in v0.4.180

type ClosableSourceSink[T any] interface {
	SourceSink[T]
	EmptyCh(doNotInitialize ...bool) (ch AwaitableCh)
}

type Counter added in v0.3.0

type Counter interface {
	// Inc increments the counter. Thread-Safe, method chaining
	Inc() (counter Counter)
	// Dec decrements the counter but not below zero. Thread-Safe, method chaining
	Dec() (counter Counter)
	// Add adds a positive or negative delta. Thread-Safe, method chaining
	Add(delta int64) (counter Counter)
}

Counter is the data provider interface for a counter

  • Inc Dec Add operations, Thread-safe

type CounterID added in v0.3.0

type CounterID string

CounterID is a unique type containing counter names

type CounterSet added in v0.4.29

type CounterSet interface {
	// GetCounters gets a list and a map for consuming counter data
	GetCounters() (orderedKeys []CounterID, m map[CounterID]any)
	// ResetCounters resets all counters to their initial state
	ResetCounters(stopRateCounters bool)
}

CounterSet is the consumer interface for a counter set

type CounterSetData added in v0.4.41

type CounterSetData interface {
	Exists(name CounterID) (exists bool)
	// Value returns the monotonically increasing value for a possible plain counter
	//	- if no such counter exists, 0
	Value(name CounterID) (value uint64)
	// Running returns the increasing and decreasing running value for a possible plain counter
	//	- if no such counter exists, 0
	Get(name CounterID) (value, running, max uint64)
	// Rates returns the rate values a possible rate counter
	//	- if no such counter exists or values are not yet available, nil
	Rates(name CounterID) (rates map[RateType]float64)
	// DatapointValue returns the latest value a possible datapoint
	//	- if no such datapoint exists, 0
	DatapointValue(name CounterID) (value uint64)
	// DatapointMax returns the highest seen value for a possible datapoint
	//	- if no such datapoint exists, 0
	DatapointMax(name CounterID) (max uint64)
	// DatapointMin returns the lowest seen value for a possible datapoint
	//	- if no such datapoint exists, 0
	DatapointMin(name CounterID) (min uint64)
	// GetDatapoint returns dfatapoint data for a possible datapoint
	//	- if no such datapoint exists, 0
	GetDatapoint(name CounterID) (value, max, min uint64, isValid bool, average float64, n uint64)
}

CounterSetData provides simple access to a set of counters, rate counters and datapoints/

type CounterStore added in v0.4.100

type CounterStore interface {
	// GetOrCreateCounter retrieves a regular counter
	//	- never returns nil
	//	- type asserted to CounterValues
	GetCounter(name CounterID) (counter Counter)
	//GetCounter retrieves a counter that must exist
	//	- may return nil
	//	- type asserted to RateCounterValues or DatapointValue
	GetNamedCounter(name CounterID) (counter any)
}

CounterStore is a CounterSet consumer interface facilitating caching

type CounterValues added in v0.3.0

type CounterValues interface {
	// Get returns value/running/max with integrity. Thread-Safe
	//	- value is the monotonically increasing value
	//	- running is the fluctuating running value
	//	- max is the highest value running has had
	Get() (value, running, max uint64)
	// GetReset returns value/running/max with integrity and resets the counter. Thread-Safe
	//	- value is the monotonically increasing value
	//	- running is the fluctuating running value
	//	- max is the highest value running has had
	GetReset() (value, running, max uint64)
	// Value returns the monotonically increasing value. Thread-Safe
	//	- number of Inc invocations and positive Adds
	Value() (value uint64)
	// Running returns the fluctuating running value. Thread-Safe
	//	- number of Inc less Dec invocations and sum of Adds
	//	- never below 0
	Running() (running uint64)
	// Max returns the highest value running has had. Thread-Safe
	Max() (max uint64)
}

CounterValues is the consumer interface for a counter. Thread-safe

type Counters added in v0.3.0

type Counters interface {
	// GetOrCreateCounter is used by the data provider of a counter.
	//	- A counter has Inc and Dec operations.
	//	- Counters can be used to track number of currently operating invocations.
	//	- if period is present and positive, the counter created is a RateCounter.
	// Rate counters use threads, one per distinct period requested.
	// Rate counter threads are monitored by the provided Go group and failure
	// may force a counter restart or cause application termination. Failures
	// are typically caused by panics in the counter update task.
	//	- Counter is the data-provider side of a counter
	//	- CounterValues is the consumer side of a counter
	GetOrCreateCounter(name CounterID, period ...time.Duration) (counter Counter)
	// GetOrCreateCounter is used by the data provider of a datapoint.
	// A datapoint supports SetValue operation.
	// A datapoint tracks a quantity such as a latency value.
	GetOrCreateDatapoint(name CounterID, period time.Duration) (datapoint Datapoint)
}

Counters is the data provider interface for a counter set.

  • max and running values are offered
  • Counters and datapointy are thread-safe
  • counters may be used to determine that code abide by intended paralellism and identifying hangs or abnormalisms.
  • Printing counters every second can verify adequate progress and possibly identify blocking of threads or swapping and garbage collection outages.

type CountersFactory added in v0.3.0

type CountersFactory interface {
	// NewCounters returns a counter container.
	// is useCounters is false, the container does not actually do any counting.
	NewCounters(useCounters bool, g0 GoGen) (counters Counters)
}

CountersFactory is an abstract counter factory. CountersFactory enables providing of different counter implementations.

type CountingAwaitable added in v0.4.125

type CountingAwaitable struct {
	Awaitable
	// contains filtered or unexported fields
}

func NewCountingAwaitable added in v0.4.125

func NewCountingAwaitable() (awaitable *CountingAwaitable)

NewCountingAwaitable returns a counting awaitable

  • similar to sync.WaitGroup but wait-mechanic is closing channel
  • observable via Count method. Count with missing delta will not panic
  • state is determined by counter. Guaranteed state is returned by Count IsTriggered
  • IsClosed is eventually consistent. A parallel Count may reflect triggered prior IsClose returning true. Upon return from the effective Count Add Done IsTriggered invocation, IsClose is consistent and the channel is closed if it is to close. For race-sensitive code, synchronize or rely on Count
  • similarly, a parallel Count invocation may reflect triggered state prior to the Awaitable channel actually closing
  • mechanic is atomic.Int64.CompareAndSwap

func (*CountingAwaitable) Add added in v0.4.125

func (a *CountingAwaitable) Add(delta int)

Add signals thread-launch by adding to the counter

  • if delta is negative and result is zero: trig
  • if delta is negative and result is negative: panic
  • Add on triggered or negative is panic
  • similar to sync.WaitGroup.Add

func (*CountingAwaitable) Count added in v0.4.125

func (a *CountingAwaitable) Count(delta ...int) (counter, adds int)

Count returns the current count and may also adjust the counter

  • counter is current remaining count
  • adds is cumulative positive adds
  • if delta is present and the awaitable is triggered, this is panic
  • state can be retrieved without panic by omitting delta
  • if delta is negative and result is zero: trig
  • if delta is negative and result is negative: panic
  • similar to sync.WaitGroup.Add Done

func (*CountingAwaitable) Done added in v0.4.125

func (a *CountingAwaitable) Done()

Done signals thread exit by decrementing the counter

  • if delta is negative and result is zero: trig
  • if delta is negative and result is negative: panic
  • Add on triggered or negative is panic
  • similar to sync.WaitGroup.Done

func (*CountingAwaitable) IsTriggered added in v0.4.125

func (a *CountingAwaitable) IsTriggered() (isTriggered bool)

IsTriggered returns true if counter has been used and returned to zero

  • IsClosed true, AwaitableCh closed

type CyclicAwaitable added in v0.4.115

type CyclicAwaitable struct {
	// contains filtered or unexported fields
}

CyclicAwaitable is an awaitable that can be re-initialized

  • initialization-free, start in Open state
  • one-to-many, happens-before
  • the synchronization mechanic is closing channel, allowing consumers to await multiple events
  • CyclicAwaitable.IsClosed provides thread-safe observability
  • CyclicAwaitable.Close is idempotent, thread-safe and deferrable
  • Open means event is pending, Close means event has triggered
  • CyclicAwaitable.Open arms the awaitable returning a channel guaranteed to be open at timeof invocation
  • because Awaitable is renewed, access is via atomic Pointer
  • Pointer to struct allows for atomic update of IsClosed and Open

Usage:

valueWaiter *CyclicAwaitable
…
func (v *V) getOrWaitForValue(value T) {
  var hasValue bool
  // check if value is already present
  if value, hasValue = v.hasValueFromThread(); hasValue {
    return
  }
  // arm cyclable
  v.valueWaiter.Open()
  // collect any value arriving prior to arming cyclable
  if value, hasValue = v.hasValueFromThread(); hasValue {
    return
  }
  <- v.valueWaiter.Ch()
  value, _ = v.hasValueFromThread()
…
func (v *V) threadStoresValue(value T) {
  v.store(value)
  v.valueWaiter.Close()

func (*CyclicAwaitable) Ch added in v0.4.115

func (a *CyclicAwaitable) Ch() (ch AwaitableCh)

Ch returns an awaitable channel. Thread-safe

func (*CyclicAwaitable) Close added in v0.4.115

func (a *CyclicAwaitable) Close(eventuallyConsistent ...bool) (didClose bool)

Close triggers awaitable by closing the channel

  • upon return, the channel is guaranteed to be closed
  • eventuallyConsistent EvCon: may return before the channel is atcually closed for higher performance
  • idempotent, deferrable, panic-free, thread-safe

func (*CyclicAwaitable) IsClosed added in v0.4.115

func (a *CyclicAwaitable) IsClosed() (isClosed bool)

isClosed inspects whether the awaitable has been triggered

  • isClosed indicates that the channel is closed
  • Thread-safe

func (*CyclicAwaitable) Open added in v0.4.115

func (a *CyclicAwaitable) Open() (didOpen bool, ch AwaitableCh)

Open rearms the awaitable for another cycle

  • ch is guaranteed to have been open at time of invocation. Because each Open may return a different channel, use of the returned ch offers consistent state
  • didOpen is true if the channel was encountered closed
  • idempotent, thread-safe, panic-free

type DA added in v0.4.117

DA is the value returned by a deferred code location function

func A added in v0.4.117

func A() DA

A is a thunk returning a deferred code location

type DB added in v0.4.12

type DB interface {

	// Exec executes a query not returning any rows
	//	- ExecResult contains last inserted ID if any and rows affected
	Exec(partition DBPartition, query string, ctx context.Context,
		args ...any) (execResult ExecResult, err error)

	// Query executes a query returning zero or more rows
	Query(partition DBPartition, query string, ctx context.Context,
		args ...any) (sqlRows *sql.Rows, err error)

	// QueryRow executes a query returning only its first row
	//	- zero rows returns error: sql: no rows in result set: use [DB.Query]
	QueryRow(partition DBPartition, query string, ctx context.Context,
		args ...any) (sqlRow *sql.Row, err error)

	// QueryString executes a query known to return zero or one row and first column a string value
	//   - implemented by [sql.DB.QueryRowContext]
	QueryString(partition DBPartition, query string, noRowsOK NoRowsAction, ctx context.Context,
		args ...any) (value string, hasValue bool, err error)

	// QueryInt executes a query known to return zero or one row and first column an int value
	//   - implemented by [sql.DB.QueryRowContext]
	QueryInt(partition DBPartition, query string, cnoRowsOK NoRowsAction, tx context.Context,
		args ...any) (value int, hasValue bool, err error)

	// Close closes the database connection
	Close() (err error)
	fmt.Stringer
}

DB is a parallel database connection

  • parl.DB:
  • — applies to any database implementation and any SQL driver
  • — provides partition-mapping between a single parl.DB and any number of sql.DB comprising a partitioned data source
  • — hides the complexity of caching and partitioning through use of a single parl.DB value
  • — always uses context allowing pre-emptive cancelation of SQL operations
  • [psql.NewDB] provides a parl.DB implementation with:
  • — caching of prepared statements per sql.DB data source and
  • — on-the-fly schema-creation for created sql.DB data sources

type DBFactory added in v0.4.12

type DBFactory interface {
	// NewDB returns a DB object implementation
	//	- dsnr: a data source namer creating data sources:
	//	- [DataSourceNamer.DSN] returns the data source name for
	//		a partition identifier
	//	- [DataSourceNamer.DataSource] returns a data source
	//		for a data source name.
	//		The data source provides SQL query execution and data storage
	//	- —[sqliter.OpenDataSourceNamer] creates data source namer
	//		implementation for SQLite3
	//	- schema: a function that on execution carries out on-the-fly
	//		application-specific SQL initialization for a new datasource
	//	- — executes CREATE TABLE for tables and indexes
	//	- — configures database-specific referential integrity and journaling
	NewDB(
		dsnr DataSourceNamer,
		schema func(dataSource DataSource, ctx context.Context) (err error),
	) (db DB)
}

DBFactory is a standardized way to obtain parl.DB objects

  • DBFactory applies to any database implementation

type DBPartition added in v0.4.14

type DBPartition string

DBPartition is partition reference for a partitioned database

  • partition is typically one table per year
  • DBPartition applies to any database implementation
  • NoPartition is used for an unpartitioned database
const NoPartition DBPartition = ""

NoPartition value interacts with a data source that is not partitioned by year or otherwise

  • parl.DB represents multiple partitioned datasources and provides caching of prepared statements
  • a data source is a database that contains tables provided by a SQL driver implementation

type DSNrFactory added in v0.4.14

type DSNrFactory interface {
	// NewDSNr returns an object that can
	//	- provide data source names from partition selectors and
	//	- provide data sources from a data source name
	DataSourceNamer(appName string) (dsnr DataSourceNamer, err error)
}

DSNrFactory describes the signature for a data source namer new function

  • the data source namer provides data sources for query operations
  • DSNrFactory applies to any database implementation

type DataSource added in v0.4.12

type DataSource interface {
	// PrepareContext prepares a statement that is a query for reading or writing data
	//	- a prepared statement can be executed multiple times
	PrepareContext(ctx context.Context, query string) (stmt *sql.Stmt, err error)
	// Close closes the data-source
	Close() (err error)
}

DataSource represents a set of SQL tables, possibly partitioned, against which queries can be prepared and executed

  • DataSource applies to any database implementation

type DataSourceName added in v0.4.127

type DataSourceName string

DataSource is a value referring to a set of SQL tables, possibly a partition

  • a datasource is typically implemented by sql.DB delegating to the SQL driver in use
  • a data source name is SQL driver dependent
  • for SQLite3, a data source name is a filename like “~/.local/share/myapp/myapp-2024.db”

type DataSourceNamer added in v0.4.12

type DataSourceNamer interface {
	// DSN returns the data source name based on a partition selector
	//	- upon creation, the data source namer was provided with
	//		information on data source naming for a particular application program
	DSN(partition ...DBPartition) (dataSourceName DataSourceName)
	// DataSource returns a usable data source based on a data source name
	//	- with parl, all statements are prepared statements.
	//		The function provided by a data source is to create prepared statements.
	//		Those prepared statements are later executed efficiently
	DataSource(dsn DataSourceName) (dataSource DataSource, err error)
}

DataSourceNamer provides data source names for SQL based on possibly appplication name and partition year

  • a data source represents a set of SQL tables, possibly partitioned, against which queries can be prepared and executed
  • DataSourceNamer applies to any database implementation
  • sqliter provides implementations for SQLite3
  • the data source namer can map an application name and partition indentifier to the data source to be used

type Datapoint added in v0.4.29

type Datapoint interface {
	// SetValue records a value at time.Now().
	// SetValue supports method chaining.
	SetValue(value uint64) (datapoint Datapoint)
}

Datapoint tracks a value with average, max-min, and increase/decrease rates.

type DatapointValue added in v0.4.29

type DatapointValue interface {
	CloneDatapoint() (datapoint Datapoint)      // Clone takes a snapshot of a counter state.
	CloneDatapointReset() (datapoint Datapoint) // CloneReset takes a snapshot of a counter state and resets it to its initial state.
	GetDatapoint() (value, max, min uint64, isValid bool, average float64, n uint64)
	DatapointValue() (value uint64)
	DatapointMax() (max uint64)
	DatapointMin() (min uint64)
}

type Debouncer added in v0.4.26

type Debouncer[T any] struct {
	// contains filtered or unexported fields
}

Debouncer debounces event stream values

  • T values are received from the in channel
  • Once d time has elapsed with no further incoming Ts, a slice of read T values are provided to the sender function
  • errFn receives any panics in the threads, expected none
  • sender and errFn functions must be thread-safe.
  • Debouncer is shutdown gracefully by closing the input channel or immediately by invoking the Shutdown method
  • two threads are launched per debouncer

func NewDebouncer

func NewDebouncer[T any](
	debounceInterval, maxDelay time.Duration,
	inputCh <-chan T,
	sender func([]T),
	errorSink ErrorSink1,
) (debouncer *Debouncer[T])

NewDebouncer returns a channel debouncer

  • values incoming faster than debounceInterval are aggregated into slices
  • values are not kept waiting longer than maxDelay
  • debounceInterval is only used if > 0 ns
  • if debounceInterval is not used and maxDelay is 0, maxDelay defaults to 1 s to avoid a hanging debouncer
  • sender should not be long-running or blocking
  • inputCh sender errFn cannot be nil
  • close of input channel or Shutdown is required to release resources
  • errFn should not receive any errors but will receive possible runtime panics
  • NewDebouncer launches two threads prior to return

func (*Debouncer[T]) Shutdown added in v0.4.98

func (d *Debouncer[T]) Shutdown()

Shutdown shuts down the debouncer

  • Shutdown does not return until resources have been released
  • buffered values are discarded and input channle is not read to end

func (*Debouncer[T]) Wait added in v0.4.26

func (d *Debouncer[T]) Wait()

Wait blocks until the debouncer exits

  • the debouncer exits from input channel closing or Shutdown

type Dent added in v0.3.0

type Dent interface {
	// Name is utf-8 base path in device file system.
	// Name is base name, ie. file name and extension.
	Name() (name string)
	// Modified time, the time file contents was changed, second precision, continuous time
	Modified() (modified time.Time)
	// IsDir indicates directory.
	// LIST only support symbolic link, directory and regular file types
	IsDir() (isDir bool)
	// IsRegular indicates regular file, ie. not a directory or symbolic link.
	// LIST only support symbolic link, directory and regular file types
	IsRegular() (isRegular bool) // ie.not directory or symlink
	// Perm returns os.FileMode data.
	// 9-bit Unix permissions per os.FilePerm.
	// LIST also supports directory and symlink bits
	Perm() (perm fs.FileMode)
	// Size is limited to 4 GiB-1
	Size() (size uint32)
}

Dent is the information returned by adb ls or LIST

type Devicette added in v0.3.0

type Devicette interface {
	// Serial returns the serial number for this device
	Serial() (serial AndroidSerial)
	// Shell executes a shell command on the device.
	//	- the response is a byte sequence
	//	- note: interning large strings may lead to memory leaks
	Shell(command string) (out []byte, err error)
	// ShellStream executes a shell command on the device returning a readable socket
	ShellStream(command string) (conn io.ReadWriteCloser, err error)
	/*
		Pull copies a remote file or directory on the Android device to a local file system location.
		the local file must not exist.
		Pull refuses certain files like product apks. shell cat works
	*/
	Pull(remotePath, nearPath string) (err error)
	/*
		List has some peculiarities:
		If remoteDir is not an existing directory, an empty list is returned.
		Entries with insufficient permisions are ignored.
		Update: . and .. are removed, adb LIST ortherwise do return those.
		File mode: the only present type bits beyond 9-bit Unix permissions are
		symlink, regular file and directory.
		File size is limited to 4 GiB-1.
		Modification time resolution is second and range is confined to a 32-bit Unix timestamp.
	*/
	List(remoteDir string) (dFileInfo []Dent, err error)
}

Devicette is a generic implementation of the capabilities of a device implementing the adb Android debug bridge protocol

type DevicetteFactory added in v0.4.0

type DevicetteFactory interface {
	// NewDevicette creates a Devicette interacting with remote adb Android Debug Bridge
	// devices via an adb server available at the socket address address
	NewDevicette(address AdbSocketAddress, serial AndroidSerial, ctx context.Context) (devicette Devicette)
}

DevicetteFactory describes how Devicette objects are obtained.

type DoErr added in v0.4.26

type DoErr interface {
	DoErr(op func() (err error)) (err error)
}

type Done added in v0.4.167

type Done interface {
	// Done signals that some task has completed.
	Done()
}

Done declares an object with a Done method

  • The Done interface is similar to a done callback function value
  • — passing a method as function value causes allocation
  • — such function values create difficult-to-follow stack traces
  • The Done interface implemented by eg. sync.WaitGroup.Done
  • callbacks are used when:
  • — invoked repeatedly
  • — invoked by a subthread or asynchronous method
  • a thread-safe callback may save a listening thread and a lock
  • alternatives to callback are:
  • — blocking return or
  • — lock-featuring synchronization mechanics like chan costing a thread

type Doneable added in v0.4.12

type Doneable interface {
	Add(delta int)
	Done()
}

Doneable is the callee part of sync.Waitgroup and other implementations Doneable is a many-to-many relation. Doneable allows the callee to instatiate and invoke any number of things that are awaitable by the caller.

… = NewSomething(&waitsForLots, &shutsDownLots)
go someThread(&waitsForLots, &shutsDownLots)
func someThread(Doneable w, context.Context ctx) {
  defer w.Done()
  w.Add(2)
  go somethingElse()

type EchoModerator added in v0.4.41

type EchoModerator struct {
	// contains filtered or unexported fields
}

EchoModerator is a parallelism-limiting Moderator that:

  • prints any increase in parallelism over the concurrency value
  • prints exhibited invocation slowness exceeding latencyWarningPoint
  • prints progressive slowness exceeding latencyWarningPoint for non-returning invocations in progress on schedule timerPeriod
  • EchoModerator can be used in production for ongoing diagnose of apis and libraries
  • cost is one thread, one timer, and a locked linked-list of invocations
  • EchoModerator is intended to control and diagnose [exec.Command] invocations
  • problems include:
  • — too many parallel invocations
  • — invocations that do not return or are long running
  • — too many threads held waiting to invoke
  • — unexpected behavior under load
  • — deviating behavior when operated for extended periods of time

func NewEchoModerator added in v0.4.41

func NewEchoModerator(
	concurrency uint64,
	latencyWarningPoint time.Duration,
	waitingWarningPoint uint64,
	timerPeriod time.Duration,
	label string, goGen GoGen, log PrintfFunc,
) (echoModerator *EchoModerator)

NewEchoModerator returns a parallelism-limiting moderator with printouts for excessive slowness or parallelism

  • concurrency is the highest number of executions that can take place in parallel
  • printout on:
  • — too many threads waiting at the moderator
  • — too slow or hung invocations
  • stores self-referencing pointers

func (*EchoModerator) Ticket added in v0.4.130

func (m *EchoModerator) Ticket() (returnTicket func())

Ticket waits for a EchoModerator ticket and provides a function to return it

func moderatedFunc() {
  defer echoModerator.Ticket()()

type Enum added in v0.4.33

type Enum[T any] interface {
	KeyEnum[string, T]
}

Enum is an enumeration using string keys mapping to a value type T. T is a unique named type that separates the values of this enumeration from all other values.

type EnumItem added in v0.4.33

type EnumItem[K constraints.Ordered, V any] interface {
	// Key returns the key for this enumeration value
	Key() (key K)
	// Description returns a descriptive sentence for this enumeration value
	Description() (desc string)
	// Value returns this enumeration value’s value using the restricted type
	Value() (value V)

	fmt.Stringer
}

EnumItem is a generic interface for enumeration item implementations. Enumeration items are ordered by the K key type.

  • K is a key type whose values map to restricted type V values one-to-one.
  • V is a restricted type for enumeration values that may store more efficiently compared to a portable type.

type ErrSlice added in v0.4.173

type ErrSlice struct {
	// contains filtered or unexported fields
}

ErrSlice is a thread-safe unbound awaitable error container

  • ErrSlice.AddError is a function to submit errors
  • ErrSlice.WaitCh returns a closing channel to await the next error
  • ErrSlice.Error returns the next error value if any
  • ErrSlice.Errors returns a slice of all errors if any
  • [ErrSlice.EmptyCh] returns a closing channel to await container empty providing deferred-close functionality
  • ErrSlice features:
  • — real-time error stream or
  • — collect errors at end and
  • — close then read-to-end function
  • implements parl.Errs parl.ErrorSink

Usage:

var errs ErrSlice
go fn(&errs)
for err := errs.Init(); errs.Condition(&err); {
  // process real-time error stream
…
func fn(errs parl.ErrorSink) {
  defer errs.EndErrors()
  …
  errs.AddError(err)

var errs ErrSlice
fn2(&errs)
for _, err := range errs.Errors() {
  // process post-invocation errors
  …
func fn2(errs parl.ErrorSink1) {
  errs.AddError(err)

func (*ErrSlice) AddError added in v0.4.173

func (e *ErrSlice) AddError(err error)

AddError is a function to submit non-fatal errors

func (*ErrSlice) AppendErrors added in v0.4.178

func (e *ErrSlice) AppendErrors(errp *error)

AppendErrors collects any errors contained and appends them to errp

func (*ErrSlice) Condition added in v0.4.181

func (e *ErrSlice) Condition(errp *error) (hasValue bool)

func (*ErrSlice) EndCh added in v0.4.178

func (e *ErrSlice) EndCh() (ch AwaitableCh)

EndCh awaits the error source closing:

  • the error source must be read to empty
  • the error source must be closed by the error-source providing entity

func (*ErrSlice) EndErrors added in v0.4.178

func (e *ErrSlice) EndErrors()

EndCh awaits the error source closing:

  • the error source must be read to empty
  • the error source must be closed by the error-source providing entity

func (*ErrSlice) Error added in v0.4.173

func (e *ErrSlice) Error() (error, bool)

Error returns the next error value

  • hasValue true: err is valid
  • hasValue false: the error source is empty

func (*ErrSlice) Errors added in v0.4.173

func (e *ErrSlice) Errors() (errs []error)

Errors returns a slice of errors or nil

func (*ErrSlice) Init added in v0.4.181

func (e *ErrSlice) Init() (err error)

func (*ErrSlice) WaitCh added in v0.4.173

func (e *ErrSlice) WaitCh() (ch AwaitableCh)

WaitCh waits for the next error, possibly indefinitely

  • a received channel closes on errors available
  • the next invocation may return a different channel object

type ErrorCloser added in v0.4.20

type ErrorCloser interface {
	InvokeIfError(addError func(err error))
	Close()
}

type ErrorManager added in v0.4.20

type ErrorManager interface {
	Ch() (ch <-chan GoError)
}

type ErrorSink added in v0.4.20

type ErrorSink interface {
	// AddError is a function to submit non-fatal errors
	//	- triggers [ErrorSource.WaitCh]
	//	- values are received by [ErrorSource.Error] or [ErrorsSource.Errors]
	AddError(err error)
	// EndErrors optionally indicates that no more AddError
	// invocations will occur
	//	- enables triggering of [ErrorSource.EndCh]
	EndErrors()
}

ErrorSink provides send of non-fatal errors one at a time

func NewErrorSinkEndable added in v0.4.178

func NewErrorSinkEndable(errorSink1 ErrorSink1) (errorSink ErrorSink)

NewErrorSinkEndable returns an error sink based on a type with private addError method

type ErrorSink1 added in v0.4.178

type ErrorSink1 interface {
	// AddError is a function to submit non-fatal errors
	//	- triggers [ErrorSource.WaitCh]
	//	- values are received by [ErrorSource.Error] or [ErrorsSource.Errors]
	AddError(err error)
}

ErrorSink1 provides send of non-fatal errors one at a time that cannot be closed

var Infallible ErrorSink1 = &infallible{}

Infallible is an error sink logging to standard error

  • intended for failure recovery of threads that should not fail
  • use should be limited to threads reading from sockets that cannot be terminated, ie. standard input or the udev netlink socket
  • outputs the error with stack trace and the stack trace invoking [Infallible.AddError]

type ErrorSource added in v0.4.173

type ErrorSource interface {
	ErrorSource1
	// WaitCh waits for the next error, possibly indefinitely
	//	- each invocation returns a channel that closes on errors available
	//	- — invocations may return different channel values
	//	- the next invocation may return a different channel object
	WaitCh() (ch AwaitableCh)
	// EndCh awaits the error source closing:
	//	- the error source must be read to empty
	//	- the error source must be closed by the error-source providing entity
	EndCh() (ch AwaitableCh)
}

ErrorSource provides receive of errors one at a time

type ErrorSource1 added in v0.4.178

type ErrorSource1 interface {
	// Error returns the next error value
	//	- hasValue true: err is valid
	//	- hasValue false: the error source is currently empty
	Error() (err error, hasValue bool)
}

ErrorSource1 is an error source that is not awaitable

type ErrorsSource added in v0.4.173

type ErrorsSource interface {
	// Errors returns a slice of errors or nil
	Errors() (errs []error)
}

ErrorsSource provides receiving multiple errors at once

type Errs added in v0.4.173

type Errs interface {
	// ErrorSource provides receive of errors one at a time using
	// WaitCh Error
	ErrorSource
	// Errors returns a slice of errors
	ErrorsSource
}

Errs provides receiving errors, one at a time or multiple

type ExecResult added in v0.4.14

type ExecResult interface {
	// - ID is last inserted ID if any
	// - rows is number of rows affected
	Get() (ID int64, rows int64)
	// “sql.Result: ID afe3… rows: 123”
	String() (s string)
}

ExecResult is the result from [DB.Exec], a query not returning rows

  • in SQL, such queries instead return last inserted ID and number of rows affected
  • by having a new function exposing any errors, Get and String methods can be error-free
  • [psql2.NewExecResult] returns implementation

type FSLocation

type FSLocation interface {
	Directory() (directory string)
}

type Frame added in v0.4.20

type Frame interface {
	// the code location for this frame, never nil
	Loc() (location *pruntime.CodeLocation)
	// function argument values like “(1, 0x14000113040)”
	//	- values of basic types like int are displayed
	//	- most types appear as a pointer value “0x…”
	Args() (args string)
	// prints the Frame suitable to be part of a stack trace
	//   - fully qualified package name with function or type and method
	//     and argument values
	//   - absolute path to source file and line number
	//
	// output:
	//
	//	github.com/haraldrudell/parl/pdebug.TestFrame(0x1400014a340)␤
	//	␠␠frame_test.go:15
	String() (s string)
}

Frame represents an executing code location, ie. a code line in source code

type Future added in v0.4.26

type Future[T any] struct {
	// contains filtered or unexported fields
}

Future is a container for an awaitable calculation result

  • Future allows a thread to await a value calculated in parallel by other threads
  • unlike for a promise, consumer manages any thread, therefore producing debuggable code and meaningful stack traces
  • a promise launches the thread why there is no trace of what code created the promise or why

func NewFuture added in v0.4.26

func NewFuture[T any]() (calculation *Future[T])

NewFuture returns an awaitable calculation

  • has an Awaitable and a thread-safe TResult container

Usage:

 var calculation = NewFuture[someType]()
 go calculateThread(calculation)
 …
 var result, isValid = calculation.Result()

func calculateThread(future *Future[someType]) {
  var err error
  var isPanic bool
  var value someType
  defer calculation.End(&value, &isPanic, &err)
  defer parl.RecoverErr(func() parl.DA { return parl.A() }, &err, &isPanic)

   value = …

func (*Future[T]) Ch added in v0.4.26

func (f *Future[T]) Ch() (ch AwaitableCh)

Ch returns an awaitable channel. Thread-safe

func (*Future[T]) End added in v0.4.60

func (f *Future[T]) End(value *T, isPanic *bool, errp *error)

End writes the result of the calculation, deferrable

  • value is considered valid if errp is nil or *errp is nil
  • End can make a goroutine channel-awaitable
  • End can only be invoked once or panic
  • any argument may be nil
  • thread-safe

func (*Future[T]) IsCompleted added in v0.4.60

func (f *Future[T]) IsCompleted() (isCompleted bool)

IsCompleted returns whether the calculation is complete. Thread-safe

func (*Future[T]) Result added in v0.4.60

func (f *Future[T]) Result() (result T, hasValue bool)

Result retrieves the calculation’s result

  • May block. Thread-safe

func (*Future[T]) TResult added in v0.4.124

func (f *Future[T]) TResult() (tResult *TResult[T])

TResult returns a pointer to the future’s result

  • nil if future has not resolved
  • thread-safe

type Go added in v0.4.12

type Go interface {
	// Register performs no function but allows the Go object to collect
	// information on the new thread.
	// - label is an optional name that can be assigned to a Go goroutine thread
	Register(label ...string) (g0 Go)
	// AddError emits a non-fatal errors
	AddError(err error)
	// Go returns a Go object to be provided as a go-statement function-argument
	//	in a function call invocation launching a new gorotuine thread.
	//	- the new thread belongs to the same GoGroup thread-group as the Go
	//		object whose Go method was invoked.
	Go() (g Go)
	// SubGo returns a GoGroup thread-group whose fatal and non-fatel errors go to
	//	the Go object’s parent thread-group.
	//	- a SubGo is used to ensure sub-threads exiting prior to their parent thread
	//		or to facilitate separate cancelation of the threads in the subordinate thread-group.
	//	- fatal errors from SubGo threads are handled in the same way as those of the
	//		Go object, typically terminating the application.
	//   - the SubGo thread-group terminates when both its own threads have exited and
	//	- the threads of its subordinate thread-groups.
	SubGo(onFirstFatal ...GoFatalCallback) (subGo SubGo)
	// SubGroup returns a thread-group with its own error channel.
	//	- a SubGroup is used for threads whose fatal errors should be handled
	//		in the Go thread.
	//	- The threads of the Subgroup can be canceled separately.
	//		- SubGroup’s error channel collects fatal thread terminations
	//		- the SubGroup’s error channel needs to be read in real-time or after
	//		SubGroup termination
	//   - non-fatal errors in SubGroup threads are sent to the Go object’s parent
	//		similar to the AddError method
	//   - the SubGroup thread-group terminates when both its own threads have exited and
	//	- the threads of its subordinate thread-groups.
	SubGroup(onFirstFatal ...GoFatalCallback) (subGroup SubGroup)
	// Done indicates that this goroutine is exiting
	//	- err == nil means successful exit
	//	- non-nil err indicates fatal error
	// 	- deferrable
	Done(errp *error)
	// Wait awaits exit of this Go thread
	Wait()
	// ch closes upon exit of this Go thread
	WaitCh() (ch AwaitableCh)
	// Cancel signals for the threads in this Go thread’s parent GoGroup thread-group
	// and any subordinate thread-groups to exit.
	Cancel()
	// Context will Cancel when the parent thread-group Cancels
	//	or Cancel is invoked on this Go object.
	//	- Subordinate thread-groups do not Cancel the context of the Go thread.
	Context() (ctx context.Context)
	// ThreadInfo returns thread data that is partially or fully populated
	//	- ThreadID may be invalid: threadID.IsValid.
	//	- goFunction may be zero-value: goFunction.IsSet
	//	- those values present after public methods of parl.Go has been invoked by
	//		the new goroutine
	ThreadInfo() (threadData ThreadData)
	// values always present
	Creator() (threadID ThreadID, createLocation *pruntime.CodeLocation)
	//	- ThreadID may be invalid: threadID.IsValid.
	//	- goFunction may be zero-value: goFunction.IsSet
	//	- those values present after public methods of parl.Go has been invoked by
	//		the new goroutine
	GoRoutine() (threadID ThreadID, goFunction *pruntime.CodeLocation)
	// GoID efficiently returns the goroutine ID that may be invalid
	//	- valid after public methods of parl.Go has been invoked by
	//		the new goroutine
	GoID() (threadID ThreadID)
	// EntityID returns a value unique for this Go
	//	- ordered: usable as map key or for sorting
	//	- always valid, has .String method
	EntityID() (goEntityID GoEntityID)
	fmt.Stringer
}

Go provides the four needs of a running goroutione thread. The Go is provided as a function argument in the go statement function call that launches the thread.

  • the four needs:
  • — to be waited upon via [Go.Done]
  • — to submit non-fatal errors via [Go.AddError]
  • — to detect and initiate cancel via [Go.Context] [Go.Cancel]
  • [Go.Cancel] cancels:
  • — this Go’s parent thread-group’s context and
  • — this Go’s parent thread-group’s subordinate thread-groups’ contexts
  • The Go Context is canceled when
  • — the parent GoGroup thread-group’s context is Canceled or
  • —a thread in the parent GoGroup thread-group initiates Cancel
  • Cancel by threads in subordinate thread-groups do not Cancel this Go thread

Usage:

var threadGroup = g0.NewGoGroup(context.Background())
go someFunc(threadGroup.Go())
…
func someFunc(g parl.Go) {
  var err error
  defer g.Register().Done(&err)
  defer parl.RecoverErr(func() parl.DA { return parl.A() }, &err)
  …

type GoDebug added in v0.4.71

type GoDebug uint8
const (
	NoDebug GoDebug = iota
	DebugPrint
	AggregateThread
)

type GoEntityID added in v0.4.117

type GoEntityID uint64

GoEntityID is a unique named type for Go objects

  • GoEntityID is required becaue for Go objects, the thread ID is not available prior to the go statement and GoGroups do not have any other unique ID
  • GoEntityID is suitable as a map key
  • GoEntityID uniquely identifies any Go-thread GoGroup, SubGo or SubGroup

func (GoEntityID) String added in v0.4.117

func (i GoEntityID) String() (s string)

type GoError added in v0.4.12

type GoError interface {
	error // Error() string
	// Err retrieves the original error value
	Err() (err error)
	// ErrString returns string representation of error
	//   - if no error “OK”
	//   - if not debug or panic, short error with location
	//   - otherwise error with stack trace
	ErrString() (errString string)
	// Time provides when this error occurred
	Time() time.Time
	// IsThreadExit determines if this error is a thread exit, ie. GeExit GePreDoneExit
	//	- thread exits may have err nil
	//	- fatals are non-nil thread exits that may require specific actions such as
	//		application termination
	IsThreadExit() (isThreadExit bool)
	// IsFatal determines if this error is a fatal thread-exit, ie. a thread exiting with non-nil error
	IsFatal() (isThreadExit bool)
	// ErrContext returns in what situation this error occurred
	ErrContext() (errContext GoErrorContext)
	// Go provides the thread and goroutine emitting this error
	Go() (g0 Go)
	fmt.Stringer
}

GoError is an error or a thread exit associated with a goroutine

  • GoError encapsulates the original unadulterated error
  • GoError provides context for taking action on the error
  • Go provides the thread associated with the error. All GoErrors are associated with a Go object
  • because GoError is both error and fmt.Stringer, to print its string representation requires using the String() method, otherwise fmt.Printf defaults to the Error() method

type GoErrorContext added in v0.4.29

type GoErrorContext uint8
const (
	// GeNonFatal indicates a non-fatal error ocurring during processing.
	// err is non-nil
	GeNonFatal GoErrorContext = iota + 1
	// GePreDoneExit indicates an exit value of a subordinate goroutine,
	// other than the final exit of the last running goroutine.
	// err may be nil
	GePreDoneExit
	// A SubGroup with its own error channel is sending a
	// locally fatal error not intended to terminate the app
	GeLocalChan
	// A thread is requesting app termination without a fatal error.
	//	- this could be a callback
	GeTerminate
	// GeExit indicates exit of the last goroutine.
	// err may be nil.
	// The error channel may close after GeExit.
	GeExit
)

func (GoErrorContext) String added in v0.4.29

func (ge GoErrorContext) String() (s string)

type GoFactory added in v0.4.29

type GoFactory interface {
	// NewGo returns a light-weight thread-group.
	//	- GoGroup only receives Cancel from ctx, it does not cancel this context.
	NewGoGroup(ctx context.Context, onFirstFatal ...GoFatalCallback) (g0 GoGroup)
}

type GoFatalCallback added in v0.4.29

type GoFatalCallback func(goGen GoGen)

GoFatalCallback receives the thread-group on its first fatal thread-exit

  • GoFatalCallback is an optional onFirstFatal argument to
  • — NewGoGroup
  • — SubGo
  • — SubGroup

type GoGen added in v0.4.29

type GoGen interface {
	// Go returns a Go object to be provided as a go statement function argument.
	Go() (g0 Go)
	// SubGo returns a thread-group whose fatal errors go to GoGen’s parent.
	//   - both non-fatal and fatal errors in SubGo threads are sent to GoGen’s parent
	// 		like Go.AddError and Go.Done.
	//		- therefore, when a SubGo thread fails, the application will typically exit.
	//		- by awaiting SubGo, Go can delay its exit until SubGo has terminated
	//   - the SubGo thread-group terminates when the its thread exits
	SubGo(onFirstFatal ...GoFatalCallback) (subGo SubGo)
	// SubGroup creates a sub-ordinate thread-group
	SubGroup(onFirstFatal ...GoFatalCallback) (subGroup SubGroup)
	// Cancel terminates the threads in the Go consumer thread-group.
	Cancel()
	// Context will Cancel when the parent thread-group Cancels.
	// Subordinate thread-groups do not Cancel this context.
	Context() (ctx context.Context)
}

GoGen allows for new Go threads, new SubGo and SubGroup thread-groups and cancel of threads in the thread-group and its subordinate thread-groups.

  • GoGen is value from NewGoGroup GoGroup SubGo SubGroup Go, ie. any Go-interface object

type GoGroup added in v0.4.18

type GoGroup interface {
	// Go returns a Go object to be provided as a go statement function argument.
	Go() (g0 Go)
	// SubGo returns a thread-group whose fatal errors go to Go’s parent.
	//   - both non-fatal and fatal errors in SubGo threads are sent to Go’s parent
	// 		like Go.AddError and Go.Done.
	//		- therefore, when a SubGo thread fails, the application will typically exit.
	//		- by awaiting SubGo, Go can delay its exit until SubGo has terminated
	//   - the SubGo thread-group terminates when the its thread exits
	SubGo(onFirstFatal ...GoFatalCallback) (subGo SubGo)
	// SubGroup creates a sub-ordinate GoGroup.
	//	- SubGroup fatal and non-fatal errors are sent to the parent GoGroup.
	//	-	SubGroup-context initiated Cancel only affect threads in the SubGroup thread-group
	//	- parent-initiated Cancel terminates SubGroup threads
	//	- SubGroup only awaits SubGroup threads
	//	- parent await also awaits SubGroup threads
	SubGroup(onFirstFatal ...GoFatalCallback) (subGroup SubGroup)
	// GoError returns a channel returning fatal and not-fatal errors and thread exits.
	//	- see GoError for exact error categories
	//
	// Usage:
	//
	//	var goErrors = goGroup.GoError()
	//	for goError := goErrors.Init(); goErrors.Condition(&goError); {
	//	  …
	//	var goEndCh = goErrors.EmptyCh(parl.CloseAwaiter)
	//	for {
	//	  select {
	//	  case <-goEndCh:
	//	    goEndCh = nil
	//	    …
	//	  case <-goErrors.DataWaitCh():
	//	    var goError, hasValue = goErrors.Get()
	//	    if !hasValue {
	//	      continue
	//	    …
	GoError() (goErrors IterableSource[GoError])
	// Wait waits for this thread-group to end
	Wait()
	// EnableTermination controls temporarily preventing the GoGroup from
	// terminating.
	// EnableTermination is initially true.
	//	- invoked with no argument returns the current state of EnableTermination
	//	- invoked with [AllowTermination] again allows for termination and
	//		immediately terminates the thread-group if no threads are currently running.
	//	- invoked with [PreventTermination] allows for the number of managed
	//		threads to be temporarily zero without terminating the thread-group
	EnableTermination(allowTermination ...bool) (mayTerminate bool)
	// Cancel terminates the threads in this and subordinate thread-groups.
	Cancel()
	// Context will Cancel when the parent context Cancels.
	// Subordinate thread-groups do not Cancel this context.
	Context() (ctx context.Context)
	// the available data for all threads
	Threads() (threads []ThreadData)
	// threads that have been named ordered by name
	NamedThreads() (threads []ThreadData)
	// SetDebug enables debug logging on this particular instance
	//	- parl.NoDebug
	//	- parl.DebugPrint
	//	- parl.AggregateThread
	SetDebug(debug GoDebug, log ...PrintfFunc)
	fmt.Stringer
}

GoGroup manages a hierarchy of threads

  • GoGroup only terminates when:
  • — the last thread in its hierarchy exits
  • — [GoGroup.EnableTermination] is set to true when no Go threads exist in the hierarchy
  • the GoGroup hierarchy consists of:
  • — managed goroutines returned by [GoGroup.Go]
  • — a SubGo subordinate thread-group hierarchy returned by [GoGroup.SubGo] that allows for a group of threads to be canceled or waited upon separately
  • — a SubGroup subordinate thread-group hierarchy returned by [GoGroup.SubGroup] that allows for a group of threads to exit with fatal errors without canceling the GoGroup and for those threads to be canceled or waited upon separately
  • — each subordinate Go thread or SubGo or SubGroup subordinate thread-groups can create additional threads and subordinate thread-groups.
  • [GoGroup.Context] returns a context that is the context or parent context of all the Go threads, SubGo and SubGroup subordinate thread-groups in its hierarchy
  • [GoGroup.Cancel] cancels the GoGroup Context, thereby signaling to all threads in the GoGroup hierarchy to exit. This will eventually terminate the GoGroup
  • providing a parent context to the GoGroup implementation allows for terminating the GoGroup via this parent context
  • A thread invoking [Go.Cancel] will signal to all threads in its GoGroup or SubGo or SubGroup thread-groups to exit. It will also signal to all threads in its subordinate thread-groups to exit. This will eventually terminate its threadgroup and all that threadgroup’s subordinate threadgroups.
  • Alternatives to parl.Go is parl.NewGoResult and parl.NewGoResult2 that only provides being awaitable to a goroutine
  • from this thread-group will terminate all threads in this and subordinate thread-groups if this thread-group was provided the FirstFailTerminates option, which is default.
  • A fatal thread-termination in a sub thread-group only affects this thread-group if the sub thread-group was provided a nil fatal function, the FirstFailTerminates option, which is default, and no explicit FailChannel option.
  • Fatal thread terminations will propagate to parent thread-groups if this thread group did not have a fatal function provided and was not explicitly provided the FailChannel option.
  • A Cancel in this thread-group or in a parent context cancels threads in this and all subordinate thread-groups.
  • A Cancel in a subordinate thread-group does not affect this thread-group.
  • Wait in this thread-group wait for threads in this and all subordinate thread-groups.

type GoResult added in v0.4.134

type GoResult struct {
	// contains filtered or unexported fields
}

GoResult makes any number of goroutines awaitable

  • number of goroutines must be known at time of new
  • NewGoResult is the simplest, goroutines are awaited by [GoResult.ReceiveError]
  • NewGoResult2 also has [IsError] method indicating if any goroutine exited with fatal error
  • GoResult.IsValid true if the GoResult is initialized
  • [GoResult.SendError](errp *error) deferrable, how goroutine sends results
  • [GoResult.ReceiveError](errp *error, n ...int) (err error)
  • [GoResult.Count]() (count int) number of buffered errors
  • NewGoResult2 also has:
  • — [GoResult.IsError]() (isError bool) true if any goroutine returned error
  • — [GoResult.SetIsError]() sets the error flag manually
  • — [GoResult.Remaining]() (remaining int) number of goroutines that have yet to exit
  • passed by value
  • getting around that receiver cannot be interface
  • receiver is value struct with pointer in the form of an interface

func NewGoResult added in v0.4.134

func NewGoResult(n ...int) (goResult GoResult)

NewGoResult returns the minimum mechanic to make a goroutine awaitable

  • n is goroutine capacity, default 1
  • mechanic is buffered channel
  • a thread-launcher provides a GoResult value of sufficient capacity to its launched threads
  • exiting threads send an error value that may be nil
  • the thread-launcher awaits results one by one
  • to avoid threads blocking prior to exiting, the channel must have sufficient capacity

Usage:

func someFunc(text string) (err error) {
  var g = parl.NewGoResult()
  go goroutine(text, g)
  defer g.ReceiveError(&err)
  …
func goroutine(text string, g parl.GoResult) {
  var err error
  defer g.SendError(&err)
  defer parl.RecoverErr(func() parl.DA { return parl.A() }, &err)

  err = …

func NewGoResult2 added in v0.4.145

func NewGoResult2(n ...int) (goResult GoResult)

NewGoResult2 also has [GoResult.IsError] [GoResult.Remaining]

func (GoResult) IsValid added in v0.4.145

func (g GoResult) IsValid() (isValid bool)

type IdempotentCloser added in v0.4.166

type IdempotentCloser[C io.Closer] struct {
	// contains filtered or unexported fields
}

func NewIdemPotentCloser added in v0.4.166

func NewIdemPotentCloser[C io.Closer](closer C) (idempotentCloser *IdempotentCloser[C])

func (*IdempotentCloser[C]) Close added in v0.4.166

func (c *IdempotentCloser[C]) Close() (err error)

func (*IdempotentCloser[C]) IsClose added in v0.4.166

func (c *IdempotentCloser[C]) IsClose() (isClose bool)

type Interval added in v0.4.48

type Interval struct {
	// contains filtered or unexported fields
}

type Invocation added in v0.4.130

type Invocation[T any] struct {
	Prev, Next atomic.Pointer[Invocation[T]]
	ThreadID   ThreadID
	Value      T
	// contains filtered or unexported fields
}

a timed invocation created by [InvokeTimer.Invocation]

  • holds invocation instance-data for [InvocationTimer.invocationEnd]

func NewInvocation added in v0.4.130

func NewInvocation[T any](invocationEnd func(invocation *Invocation[T], duration time.Duration), value T) (invocation *Invocation[T])

NewInvocation adds a new invocation to [InvokeTimer]

  • holds invocation instance-data for [InvocationTimer.invocationEnd]

func (*Invocation[T]) Age added in v0.4.130

func (i *Invocation[T]) Age() (age time.Duration)

Age returns the current age of this invocation

func (*Invocation[T]) DeferFunc added in v0.4.130

func (i *Invocation[T]) DeferFunc()

DeferFunc ends an invocation

  • provides invocation instance-data for [InvocationTimer.invocationEnd]

type InvocationTimer added in v0.4.130

type InvocationTimer[T any] struct {
	// contains filtered or unexported fields
}

InvocationTimer monitors funtion invocations for parallelism and latency

  • callback is invoked on exceeding thresholds and reaching a new max
  • runs one thread per instance while an invocation is active

func NewInvocationTimer added in v0.4.130

func NewInvocationTimer[T any](
	callback CBFunc, endCb func(T),
	latencyWarningPoint time.Duration, parallelismWarningPoint uint64,
	timerPeriod time.Duration,
	goGen GoGen,
) (invokeTimer *InvocationTimer[T])

NewInvocationTimer returns an object alerting of max latency and parallelism

  • Do is used for new invocations

func (*InvocationTimer[T]) Invocation added in v0.4.130

func (i *InvocationTimer[T]) Invocation(value T) (deferFunc func())

Invocation registers a new invocation with callbacks for parallelism and latency

  • caller invokes deferFunc at end of invocation

Usage:

func someFunc() {
  defer invocationTimer.Invocation()()

func (*InvocationTimer[T]) Oldest added in v0.4.130

func (i *InvocationTimer[T]) Oldest() (age time.Duration, threadID ThreadID)

Oldest returns the oldest invocation

  • threadID is ID of oldest active thread, if any
  • age is longest ever invocation
  • if no invocation is active, age is 0, threadID invalid

type IterableSource added in v0.4.180

type IterableSource[T any] interface {
	ClosableSource1[T]
	Init() (value T)
	Condition(valuep *T) (hasValue bool)
}

Get DataWaitCh EmptyCh Init Condtion

type KeyEnum added in v0.4.33

type KeyEnum[K constraints.Ordered, T any] interface {
	IsKey(key K) (isKey bool)                  // IsKey checks whether key maps to an enumerated value
	Value(key K) (enum T, err error)           // Value looks up an enumerated value by key
	KeyIterator() (iterator iters.Iterator[K]) // KeyIterator returns an iterator that iterates over all keys in order of definition

	IsValid(enum T) (isEnumValue bool)      // IsValid checks whether value is among enumerated values
	Key(value T) (key K, err error)         // Key gets the key value for an enumerated T value
	ValueAny(value any) (enum T, err error) // ValueAny attempts to convert any value to a T enumerated value
	Iterator() (iterator iters.Iterator[T]) // Iterator returns an iterator that iterates over all enumerated values in order of definition
	Description(enum T) (desc string)       // Description gets a descriptive sentence for an enum value
	StringT(enum T) (s string)              // StringT provides a string representation for an enumeration value
	// Compare compares two T values.
	//	- result is 0 if the two values are considered equal
	//	- result is 1 if value1 is considered greater than value2
	//	- result is -1 if value1 is considered less than value2
	Compare(value1, value2 T) (result int)

	Name() (s string) // Name returns a short string naming this enumeration
	fmt.Stringer
}

KeyedEnum is an enumeration using a key type K mapping to a value type T.

  • T is a unique named type that separates the values of this enumeration from all other values.
  • K is a common type, often a single-word string, whose values are unique and maps one-to-one to a T value. K is constraints.Ordered and can be used as a map key.
  • The implementation’s stored type may be different from both K and T.

Some benefits with enumerations are:

  • unknown, illegal or value duplications are detected
  • integral values and their meanings can be printed
  • one set of integral values are not confused with another, eg. unix.AF_INET and unix.RTAX_BRD
  • Reveals the meaning when used as function arguments and other allowed values can be examined

type LacyChan added in v0.4.167

type LacyChan[T any] struct {
	// contains filtered or unexported fields
}

func (*LacyChan[T]) Get added in v0.4.167

func (c *LacyChan[T]) Get(n ...int) (ch chan T)

type LazyCyclic added in v0.4.173

type LazyCyclic struct {
	// if false, the cyclic is not in active use
	IsActive atomic.Bool
	// Lock atomizes operations Cyclic.Open and Cyclic.Close
	// with its justifying observations
	Lock sync.Mutex
	// Cyclic contains a closing channel
	Cyclic CyclicAwaitable
}

type Moderator

type Moderator struct {
	// contains filtered or unexported fields
}

Moderator invokes functions at a limited level of parallelism. It is a ticketing system

m := NewModerator(20, ctx)
m.Do(func() (err error) { // waiting here for a ticket
  // got a ticket!
  …
  return or panic // ticket automatically returned
m.String() → waiting: 2(20)

type ModeratorCore added in v0.4.26

type ModeratorCore struct {
	// contains filtered or unexported fields
}

ModeratorCore invokes functions at a limited level of parallelism

  • ModeratorCore is a ticketing system
  • ModeratorCore does not have a cancel feature
  • during low contention atomic performance
  • during high-contention lock performance

Usage:

m := NewModeratorCore(20, ctx)
defer m.Ticket()() // waiting here for a ticket
// got a ticket!
…
return or panic // ticket automatically returned
m.String() → waiting: 2(20)

func NewModeratorCore added in v0.4.26

func NewModeratorCore(parallelism uint64) (m *ModeratorCore)

NewModerator creates a new Moderator used to limit parallelism

func (*ModeratorCore) Status added in v0.4.26

func (m *ModeratorCore) Status() (parallelism, active, waiting uint64)

Status: values may lack integrity

func (*ModeratorCore) String added in v0.4.26

func (m *ModeratorCore) String() (s string)

when tickets available: “available: 2(10)”

  • 10 - 2 = 8 threads operating
  • when threads waiting “waiting 1(10)”
  • 10 threads operating, 1 thread waiting

func (*ModeratorCore) Ticket added in v0.4.106

func (m *ModeratorCore) Ticket() (returnTicket func())

Ticket returns a ticket possibly blocking until one is available

  • Ticket returns the function for returning the ticket

Usage:

defer moderator.Ticket()()

type MutexWait added in v0.4.54

type MutexWait struct {
	// contains filtered or unexported fields
}

MutexWait is maximum-lightweight observable single-fire Mutex. Thread-Safe

func NewMutexWait added in v0.4.54

func NewMutexWait() (mutexWait *MutexWait)

NewMutexWait returns a maximum-lightweight observable single-fire Mutex. Thread-Safe

func (*MutexWait) IsUnlocked added in v0.4.54

func (mw *MutexWait) IsUnlocked() (isUnlocked bool)

IsUnlocked returns whether the MutexWait has fired

func (*MutexWait) Unlock added in v0.4.54

func (mw *MutexWait) Unlock()

Unlock fires MutexWait

func (*MutexWait) Wait added in v0.4.54

func (mw *MutexWait) Wait()

Wait blocks until MutexWait has fired

type NoRowsAction added in v0.4.184

type NoRowsAction bool

[DB.QueryString] [DB.QueryInt] noRowsOK

  • indicates whether a zero-row result returns error
  • NoRowsOK NoRowsError
const (
	// [DB.QueryString] [DB.QueryInt] zero rows returns zero-value
	NoRowsOK NoRowsAction = true
	// [DB.QueryString] [DB.QueryInt] zero rows returns error
	NoRowsError NoRowsAction = false
)

type NotifierFunc added in v0.4.97

type NotifierFunc func(slice Stack)

a NotifierFunc receives a stack trace of function causing cancel

type OnError added in v0.4.117

type OnError func(err error)

OnError is a function that receives error values from an errp error pointer or a panic

deprecated: use ErrorSink

var NoOnError OnError

nil OnError function

  • public for RecoverAnnotation

deprecated: use ErrorSink

type OnErrorStrategy added in v0.4.135

type OnErrorStrategy uint8

how OnError is handled: recoverOnErrrorOnce recoverOnErrrorMultiple recoverOnErrrorNone

type Once added in v0.4.18

type Once struct {
	// contains filtered or unexported fields
}

parl.Once is an observable sync.Once with an alternative DoErr method

  • Once.DoErr invokes a function returning error recovering a panic
  • Once.IsDone returns whether the Once has been executed, atomic performance
  • Once.Result returns a possible Once.DoErr outcome, atomic performance
  • Once.Do is similar to sync.Once.Do
  • parl.Once is thread-safe and does not require initialization
  • No thread will return from Once.Do or Once.DoErr until once.Do or once.DoErr has completed

func (*Once) Do added in v0.4.18

func (o *Once) Do(doFuncArgument func())

Do calls the function if and only if Do or DoErr is being called for the first time for this instance of Once. Thread-safe

  • a panic is not recovered
  • thread-safe
  • once.Do must execute for happens before guarantee

Usage:

var once parl.Once
once.Do(myFunc)
…
if once.IsDone() …
func myFunc() { …

func (*Once) DoErr added in v0.4.43

func (o *Once) DoErr(doErrFuncArgument func() (err error)) (didOnce, isPanic bool, err error)

DoErr calls the function if and only if Do or DoErr is being called for the first time for this instance of Once

  • didOnce is true if this invocation first and actually invoked doErrFuncArgument
  • isPanic is true if this or previous invocation did panic
  • err is either the return value or the panic value from this or previous invocation
  • thread-safe
  • because sync.Once.Do has fixed signature, Do must be invoke a function wrapper
  • once.Do must execute for happens before guarantee

Usage:

var once parl.Once
var didTheClose, isPanic, err = once.DoErr(osFile.Close)

func (*Once) IsDone added in v0.4.18

func (o *Once) IsDone() (isDone bool)

IsDone returns true if Once did execute

  • thread-safe, atomic performance

func (*Once) Result added in v0.4.59

func (o *Once) Result() (isPanic bool, hasResult bool, err error)

Result returns the Once.DoErr outcome provided with atomic performance

  • values are only valid if hasResult is true
  • hasResult is false when:
  • — the Once has not triggered or
  • — the Once was triggered by Once.Do
  • thread-safe

type OnceCh added in v0.4.161

type OnceCh struct {
	// contains filtered or unexported fields
}

OnceCh implements a one-time execution filter

  • initialization free
  • OnceCh is similar to sync.Once with improvements:
  • — does not require an awkward function value to be provided. Method dunction-values cause allocation
  • — awaitable channel mechanic means threads can await multiple events
  • — is observable via OnceCh.IsInvoked OnceCh.IsClosed

Usage:

var o OnceCh
if isWinner, done := o.IsWinner(); !isWinner {
  return // thread already waited for winner thread completion
} else {
  defer done.Done()
}
…

func (*OnceCh) Ch added in v0.4.161

func (o *OnceCh) Ch() (ch AwaitableCh)

Ch returns a channel that closes once IsWinner and done have both been invoked

func (*OnceCh) IsClosed added in v0.4.161

func (o *OnceCh) IsClosed() (isClosed bool)

IsClosed indicates that a winner was selected and invoked done

func (*OnceCh) IsInvoked added in v0.4.164

func (o *OnceCh) IsInvoked() (isInvoked bool)

IsInvoked indicates that a winner was selected

func (*OnceCh) IsWinner added in v0.4.161

func (o *OnceCh) IsWinner(noWait ...bool) (isWinner bool, done Done)

IsWinner selects winner thread as the first of invokers

  • noWait missing or LoserWait: loser thread wait for winner thread invoking done.Done
  • noWait NoOnceWait: eventually consistent: loser threads immediately return
  • isWinner true: this is the winner first invocation.
  • — must invoke done.Done upon task completion
  • isWinner false: loser thread, done is nil. May have already awaited winner thread completion

type OnceWaiter added in v0.4.29

type OnceWaiter struct {
	// contains filtered or unexported fields
}

OnceWaiter allows any number of threads to wait for a single next occurrence.

  • the occurrence is trigger in any of two ways:
  • — a parent context may be passed in that on cancel triggers the wait
  • — the Cancel method is invoked
  • Ch returns a channel that sends one item on the occurrence but never closes
  • Done returns a channel that closes on the occurrence happens, similar to a context
  • Wait awaits the occurrence
  • a did-occurer object can be obtained that returns true once the cycle trigs.
  • a context can be obtained that cancels on the next trig
  • the cycles can be permanently canceled or trigged and rearmed

func NewOnceWaiter added in v0.4.29

func NewOnceWaiter(ctx context.Context) (onceReceiver *OnceWaiter)

NewOnceWaiter returns a channel that will send one item when the context cancels or immediately if the context was already canceled.

func (*OnceWaiter) Cancel added in v0.4.29

func (ow *OnceWaiter) Cancel()

Cancel triggers the occurrence

func (*OnceWaiter) Ch added in v0.4.29

func (ow *OnceWaiter) Ch() (ch <-chan struct{})

Ch returns a channel that will send one item on the occurrence.

  • the channel will not send anything else
  • the channel never closes.

func (*OnceWaiter) Context added in v0.4.29

func (cw *OnceWaiter) Context() (ctx context.Context)

Context returns a context that cancels on the occurrence

func (*OnceWaiter) DidOccur added in v0.4.29

func (ow *OnceWaiter) DidOccur() (didOccur bool)

DidOccur returns true when the occurrence has taken place

func (*OnceWaiter) Done added in v0.4.29

func (ow *OnceWaiter) Done() (done <-chan struct{})

Done returns a channel that will close on the occurrence.

  • Done is similar to the Done method of a context

func (*OnceWaiter) Wait added in v0.4.29

func (ow *OnceWaiter) Wait()

Wait waits until the ocurrence

type OnceWaiterRO added in v0.4.29

type OnceWaiterRO struct {
	// contains filtered or unexported fields
}

OnceWaiterRO allows any number of threads to wait for a single next occurrence. OnceWaiterRO is a OnceWaiter without the Cancel method

func NewOnceWaiterRO added in v0.4.29

func NewOnceWaiterRO(onceWaiter *OnceWaiter) (onceWaiterRO *OnceWaiterRO)

NewOnceWaiter returns a channel that will send one item when the context cancels or immediately if the context was already canceled.

func (*OnceWaiterRO) Ch added in v0.4.29

func (ow *OnceWaiterRO) Ch() (ch <-chan struct{})

func (*OnceWaiterRO) Context added in v0.4.29

func (ow *OnceWaiterRO) Context() (ctx context.Context)

func (*OnceWaiterRO) DidOccur added in v0.4.29

func (ow *OnceWaiterRO) DidOccur() (didOccur bool)

func (*OnceWaiterRO) Done added in v0.4.29

func (ow *OnceWaiterRO) Done() (done <-chan struct{})

func (*OnceWaiterRO) Wait added in v0.4.29

func (ow *OnceWaiterRO) Wait()

type Password

type Password interface {
	// true if a password can be obtained interactively
	HasPassword() (hasPassword bool)
	// blocking interactive password input
	Password() (password []byte, err error)
}

Password indicates if a password can be read interactively

  • [pterm.NoPassword] implements password input unavailable
  • [pterm.NewPassword] implements interactive password input

type PemBytes added in v0.4.26

type PemBytes []byte

PemBytes bytes is 7-bit ascii string representing keys or certificates

type PeriodWaiter added in v0.4.100

type PeriodWaiter struct {
	// contains filtered or unexported fields
}

PeriodWaiter temporarily holds threads invoking Wait

  • HoldWaiters causes any Wait invokers to block until ReleaseWaiters

that has not been succeeded by a ReleaseWaiters invocation. Thread-safe

func NewPeriodWaiter added in v0.4.100

func NewPeriodWaiter() (periodWaiter *PeriodWaiter)

NewPeriodWaiter returns an object that can hold threads temporarily. Thread-safe

func (*PeriodWaiter) Ch added in v0.4.167

func (p *PeriodWaiter) Ch() (ch AwaitableCh)

Ch returns a channel that closes on ReleaseWaiters

  • ch is nil if not currently waiting

func (*PeriodWaiter) Count added in v0.4.100

func (p *PeriodWaiter) Count() (waitingThreads int)

Count returns the number of threads currently in Wait

func (*PeriodWaiter) HoldWaiters added in v0.4.100

func (p *PeriodWaiter) HoldWaiters()

HoldWaiters causes a thread invoking PeriodWaiter.Wait to wait. Thread-safe

  • idempotent

func (*PeriodWaiter) IsHold added in v0.4.100

func (p *PeriodWaiter) IsHold() (isHold bool)

IsHold returns true if Wait will currently block

func (*PeriodWaiter) ReleaseWaiters added in v0.4.100

func (p *PeriodWaiter) ReleaseWaiters()

ReleaseWaiters releases any threads blocked in PeriodWaiter.Wait and lets new Wait invokers proceed. Thread-safe

func (*PeriodWaiter) Wait added in v0.4.100

func (p *PeriodWaiter) Wait()

Wait blocks the thread if a HoldWaiters invocation took place with no ReleaseWaiters succeeding it. Thread-safe

type Periodically added in v0.4.16

type Periodically struct {
	// contains filtered or unexported fields
}

func NewPeriodically added in v0.4.16

func NewPeriodically(fn func(t time.Time), ctx context.Context, period ...time.Duration) (periodically *Periodically)

func (*Periodically) Wait added in v0.4.26

func (p *Periodically) Wait()

type Pragmas added in v0.4.184

type Pragmas interface {
	// pragmas aee returned in a key-value map
	Map() (pragmaMap map[string]string)
	// pragmas are returned in a space-separated key-value string
	// sorted by 8-bit characters
	//	- “foreignKeys: 0 journal: memory timeout: 0”
	fmt.Stringer
}

Pragmas is the type of returned pragmas for an SQLite3 database

type PrintfFunc added in v0.4.29

type PrintfFunc func(format string, a ...any)

PrintfFunc is the signature for a printf-style function. This signature is implemented by:

  • parl.Sprintf
  • parl.Out parl.Outw parl.Log parl.Logw parl.Console parl.Consolew parl.Info parl.Debug parl.D parl.NoPrint
  • plog.(*LogInstance) similar methods
  • perrors.Errorf perrors.ErrorfPF
  • fmt.Printf fmt.Sprintf fmt.Errorf
  • pterm.(*StatusTerminal).Log pterm.(*StatusTerminal).LogTimeStamp

and compatible functions

type PriorityQueue added in v0.4.30

type PriorityQueue[V any, P constraints.Ordered] interface {
	// AddOrUpdate adds a new value to the prioirty queue or updates the priority of a value
	// that has changed.
	AddOrUpdate(value *V)
	// List returns the first n or default all values by priority
	List(n ...int) (valueQueue []*V)
}

PriorityQueue is a pointer-identity-to-value map of updatable values traversable by rank.

  • PriorityQueue operates directly on value by caching priority from the pritority function.
  • the AddOrUpdate method reprioritizes an updated value element
  • V is a value reference composite type that is comparable, ie. not slice map function. Preferrably, V is interface or pointer to struct type.
  • P is an ordered type such as Integer Floating-Point or string, used to rank the V values
  • values are added or updated using AddOrUpdate method distinguished by (computer science) identity
  • if the same comparable value V is added again, that value is re-ranked
  • priority P is computed from a value V using the priorityFunc function. The piority function may be examining field values of a struct
  • values can have the same rank. If they do, equal rank is provided in insertion order
  • pqs.NewPriorityQueue[V any, P constraints.Ordered]
  • pqs.NewRankingThreadSafe[V comparable, R constraints.Ordered]( ranker func(value *V) (rank R)))

type PrivateKey added in v0.4.26

type PrivateKey interface {
	crypto.Signer                                  // Public() Sign()
	DER() (privateKeyDer PrivateKeyDer, err error) // untyped key material, both private and public keys
	DERe() (privateKeyDer PrivateKeyDer)
	PEM() (pemBytes PemBytes, err error)
	PEMe() (pemBytes PemBytes)
	PublicKey() (publicKey PublicKey)
	Algo() (algo x509.PublicKeyAlgorithm)
	// validate ensures the private key is present, modeled after rsa.Validate
	Validate() (err error)
}

PrivateKey implements crypto.Signer and can therefore be used as tls.Certificate.PrivateKey

type PrivateKeyDer added in v0.4.26

type PrivateKeyDer []byte

PublicKeyDer is a binary encoding of a private and public key-pair

type PrivateKeyFactory added in v0.4.26

type PrivateKeyFactory interface {
	NewPrivateKey(algo x509.PublicKeyAlgorithm) (privateKey PrivateKey, err error)
}

type PublicKey added in v0.4.26

type PublicKey interface {
	DER() (publicKeyDer PublicKeyDer, err error)
	DERe() (publicKeyDer PublicKeyDer)
	PEM() (pemBytes PemBytes, err error)
	PEMe() (pemBytes PemBytes)
	Equal(x crypto.PublicKey) (isEqual bool)
	Algo() (algo x509.PublicKeyAlgorithm)
}

PublicKey contains a public key extracted from a KeyPair

type PublicKeyDer added in v0.4.26

type PublicKeyDer []byte

PublicKeyDer is a binary encoding of a public key

type Rate added in v0.4.29

type Rate interface {
	Clone() (rate Rate)
	Delta() (delta uint64)
	Duration() (duration time.Duration)
	HasValue() (hasValue bool)
	fmt.Stringer
}

Rate describes a rate datapoint.

  • may be positive or negative

type RateCounterValues added in v0.4.29

type RateCounterValues interface {
	CounterValues // Get() GetReset() Value() Running() Max()
	Rates() (rates map[RateType]float64)
}

CounterValues is the consumer interface for a rate counter. The rate counter provides rates over a provided time period in a map of int64 values.

  • ValueRate current rate of increase in value
  • ValueMaxRate the maxmimum seen rate of increase in value
  • ValueRateAverage the average rate of increase in value taken over up to 10 periods
  • RunningRate the current rate of change in running, may be negative
  • RunningMaxRate the max positive rate of increase seen in running
  • RunningMaxDecRate the max rate of decrease in running, a 0 or negative value
  • RunningAverage the average of running taken over up to 10 periods

type RateType added in v0.4.29

type RateType int
const (
	ValueRate        RateType = iota // current rate of increase in value
	ValueMaxRate                     // max seen rate of increase in value
	ValueRateAverage                 // average rate of increase in value during last 10 periods
	RunningRate
	RunningMaxRate
	RunningMaxDecRate
	// accumulated change in running over several intervals
	//	- running value goes up and down
	RunningAverage
	NotAValue // NotAValue is an internal stand-in value indicating a value not in use
)

type Send added in v0.4.167

type Send[T any] interface {
	// Send is typically a thread-safe non-blocking panic-free error-free
	// send hand-off of a single value to another thread implemented as a method
	//	- Send replaces Go channel send operation
	Send(value T)
}

Send declares an object with a Send method

  • the Send interface is similar to a callback function value
  • — passing a method as function value causes allocation
  • — such function values create difficult-to-follow stack traces
  • the Send interface is implemented by eg. github.com/haraldrudell/parl.NBChan
  • Send is intended to provide a trouble-free value sink transferring data to other threads

type ServerFactory added in v0.3.0

type ServerFactory interface {
	// Adb connects to an adb adb Android Debug Bridge server on a specified tcp socket
	Adb(address AdbSocketAddress, ctx context.Context) (server Serverette)
	// AdbLocalhost connects to an adb Android Debug Bridge server on the local computer
	AdbLocalhost(ctx context.Context) (server Serverette)
}

ServerFactory describes how AdbServer objects are obtained. Such servers may use duifferent protocol implementations from Adbette

type Serverette added in v0.3.0

type Serverette interface {
	AdbAdressProvider // AdbSocketAddress()
	// DeviceSerialList lists serials for the currently online Android devices
	DeviceSerialList() (serials []AndroidSerial, err error)
	// DeviceStati lists all serials currently known to the server along with
	// their current status.
	// The two slices correspond and are of the same length
	DeviceStati() (serials []AndroidSerial, stati []AndroidStatus, err error)
	// TrackDevices emits serial numbers for devices that come online.
	// serials are sent on the serials channel.
	// if err is non-nil, set-up of failed.
	// The errs channel closes when watching stops
	// Watching is stopped by calling cancel function or when the server’s context terminates
	TrackDevices() (tracker Trackerette, err error)
}

Serverette is a generic representation of an adb server running on a host.

command-line adb: As of Android 12, Android Debug Bridge version 1.0.41 Version 32.0.0-8006631 has the following commands are supported: devices connect disconnect pair forward ppp reverse mdns push pull sync shell emu install install-multiple install-multiple-package uninstall bugreport jdwp logcat disable-verity enable-verity keygen wait-for* get-state get-serialno get-devpath remount reboot sideload root unroot usb tcpip start-server kill-server reconnect attach detach

type ServeretteFactory added in v0.3.0

type ServeretteFactory interface {
	// Adb connects to an adb adb Android Debug Bridge server on a specified tcp socket.
	// address is a string default "localhost:5037" and default port ":5037".
	// adbetter is a factory for Adbette connections.
	NewServerette(address AdbSocketAddress, adbetter Adbetter, ctx context.Context) (server Serverette)
}

ServeretteFactory is a Server connection factory for Adbette implementations

type Sink added in v0.4.180

type Sink[T any] interface {
	Send(value T)
	SendSlice(values []T)
	SendClone(values []T)
}

Send SendSlice SendClone

type SlowDetector added in v0.4.41

type SlowDetector struct {
	// contains filtered or unexported fields
}

SlowDetector measures latency via Start-Stop invocations and prints max latency values above threshold to stderr. Thread-safe

func NewSlowDetector added in v0.4.41

func NewSlowDetector(label string, slowTyp slowType, printf PrintfFunc, goGen GoGen, threshold ...time.Duration) (slowDetector *SlowDetector)

NewSlowDetector returns a Start-Stop variable detecting slowness

  • label is a name for the measured activity, default the code location of caller
  • slowTyp is most commonly SlowDefault using a shared thread
  • default printf is parl.Log, ie. ouput to stderr
  • first optional duration is minimum latency to report, default 100 ms if first optional duration is 0, all max-slowness invocations are printed
  • second optional duration is reporting period of non-return, default 1 minute
  • output is to stderr

func (*SlowDetector) IsValid added in v0.4.51

func (sd *SlowDetector) IsValid() (isValid bool)

func (*SlowDetector) Start added in v0.4.41

func (sd *SlowDetector) Start(label string, value ...time.Time) (slowInvocation SlowInvocation)

func (*SlowDetector) Start0 added in v0.4.45

func (sd *SlowDetector) Start0() (slowInvocation SlowInvocation)

func (*SlowDetector) Status added in v0.4.48

func (sd *SlowDetector) Status() (s string)

func (*SlowDetector) Status0 added in v0.4.48

func (sd *SlowDetector) Status0() (s string)

last-duration / average duration / max duration

func (*SlowDetector) Values added in v0.4.49

func (sd *SlowDetector) Values() (last, average, max time.Duration, hasValue bool)

type SlowDetectorCore added in v0.4.44

type SlowDetectorCore struct {
	ID slowID
	// contains filtered or unexported fields
}

SlowDetectorCore measures latency via Start-Stop invocations

  • Thread-Safe and multi-threaded, parallel invocations
  • Separate thread measures time of non-returning, hung invocations

func NewSlowDetectorCore added in v0.4.44

func NewSlowDetectorCore(callback CbSlowDetector, slowTyp slowType, goGen GoGen, nonReturnPeriod ...time.Duration) (slowDetector *SlowDetectorCore)

NewSlowDetectorCore returns an object tracking nonm-returning or slow function invocations

  • callback receives offending slow-detector invocations, cannot be nil
  • slowTyp configures whether the support-thread is shared
  • goGen is used for a possible deferred thread-launch
  • optional values are:
  • — nonReturnPeriod: how often non-returning invocations are reported, default once per minute
  • — minimum slowness duration that is being reported, default 100 ms

func (*SlowDetectorCore) Start added in v0.4.44

func (sd *SlowDetectorCore) Start(invoLabel string, value ...time.Time) (invocation *SlowDetectorInvocation)

Start returns the effective start time for a new timing cycle

  • value is optional start time, default time.Now()

func (*SlowDetectorCore) Values added in v0.4.49

func (sd *SlowDetectorCore) Values() (
	last, average, max time.Duration,
	hasValue bool,
)

type SlowDetectorInvocation added in v0.4.44

type SlowDetectorInvocation struct {
	// contains filtered or unexported fields
}

SlowDetectorInvocation is a container used by SlowDetectorCore

func (*SlowDetectorInvocation) Interval added in v0.4.48

func (sdi *SlowDetectorInvocation) Interval(label string, t ...time.Time)

Stop ends an invocation part of SlowDetectorCore

func (*SlowDetectorInvocation) Intervals added in v0.4.56

func (sdi *SlowDetectorInvocation) Intervals() (intervalStr string)

func (*SlowDetectorInvocation) Label added in v0.4.44

func (sdi *SlowDetectorInvocation) Label() (label string)

Label returns the label for this invocation

func (*SlowDetectorInvocation) Stop added in v0.4.44

func (sdi *SlowDetectorInvocation) Stop(value ...time.Time)

Stop ends an invocation part of SlowDetectorCore

func (*SlowDetectorInvocation) T0 added in v0.4.44

func (sdi *SlowDetectorInvocation) T0() (t0 time.Time)

T0 returns the effective time of the invocation of Start

func (*SlowDetectorInvocation) ThreadID added in v0.4.44

func (sdi *SlowDetectorInvocation) ThreadID() (threadID ThreadID)

ThreadID returns the thread ID dor the thread invoking Start

func (*SlowDetectorInvocation) Time added in v0.4.44

func (sdi *SlowDetectorInvocation) Time(t time.Time) (previousT time.Time)

T0 returns the effective time of the invocation of Start

type SlowDetectorThread added in v0.4.44

type SlowDetectorThread struct {
	// contains filtered or unexported fields
}

func NewSlowDetectorThread added in v0.4.44

func NewSlowDetectorThread(slowTyp slowType, nonReturnPeriod time.Duration, goGen GoGen) (sdt *SlowDetectorThread)

func (*SlowDetectorThread) Start added in v0.4.44

func (sdt *SlowDetectorThread) Start(sdi *SlowDetectorInvocation)

func (*SlowDetectorThread) Stop added in v0.4.44

type SlowInvocation added in v0.4.44

type SlowInvocation interface {
	Stop(value ...time.Time)
	Interval(label string, t ...time.Time)
}

type Source added in v0.4.180

type Source[T any] interface {
	// Get DataWaitCh
	Source1[T]
	GetSlice() (values []T)
}

Get GetSlice DataWaitCh

type Source1 added in v0.4.180

type Source1[T any] interface {
	Get() (value T, hasValue bool)
	DataWaitCh() (ch AwaitableCh)
	AwaitValue() (value T, hasValue bool)
}

Get DataWaitCh

type SourceSink added in v0.4.180

type SourceSink[T any] interface {
	AllSource[T]
	Sink[T]
}

type Stack added in v0.4.20

type Stack interface {
	// thread ID 1… for the thread requesting the stack trace
	//	- ThreadID is comparable and has IsValid and String methods
	//	- ThreadID is typically an incremented 64-bit integer with
	//		main thread having ID 1
	ID() (threadID ThreadID)
	// a word indicating thread status, typically word “running”
	Status() (threadStatus ThreadStatus)
	// true if the thread is the main thread
	//	- false for a launched goroutine
	IsMain() (isMain bool)
	// A list of code locations for this thread
	//	- index [0] is the most recent code location, typically the invoker requesting the stack trace
	//	- includes invocation argument values
	Frames() (frames []pruntime.Frame)
	// the goroutine function used to launch this thread
	//	- if IsMain is true, zero-value. Check using GoFunction().IsSet()
	//	- never nil
	GoFunction() (goFunction *pruntime.CodeLocation)
	// the code location of the go statement creating this thread
	//	- if IsMain is true, zero-value. Check with Creator().IsSet()
	//	- never nil
	//	- goRoutineRef: “in goroutine 9”
	Creator() (creator *pruntime.CodeLocation, creatorID ThreadID, goRoutineRef string)
	// Shorts lists short code locations for all stack frames, most recent first:
	// Shorts("prepend") →
	//  prepend Thread ID: 1
	//  prepend main.someFunction()-pruntime.go:84
	//  prepend main.main()-pruntime.go:52
	Shorts(prepend string) (s string)
	// String is a multi-line stack trace, most recent code location first:
	//  ID: 18 IsMain: false status: running␤
	//  main.someFunction({0x100dd2616, 0x19})␤
	//  ␠␠pruntime.go:64␤
	//  cre: main.main-pruntime.go:53␤
	fmt.Stringer
}

Stack contains a stack trace parsed into basic type only datapoints

  • stack trace from [pdebug.Stack]

type Statuser added in v0.3.0

type Statuser interface {
	Set(status string) (statuser Statuser)
	Shutdown()
}

Statuser prints threads that do not react within a specified time frame. This is useful during early multi-threaded design when it is uncertain why work is not progressing. Is it your code or their code? Once the program concludes without hung threads, Tracer is the better tool to identify issues.

type StatuserFactory added in v0.3.0

type StatuserFactory interface {
	NewStatuser(useStatuser bool, d time.Duration) (statuser Statuser)
}

type SubGo added in v0.4.15

type SubGo interface {
	// Go returns a [Go] object managing a thread of the GoGroup thread-group
	// by providing the g value as a go-statement function argument.
	Go() (g Go)
	// SubGo returns a thread-group whose fatal errors go to Go’s parent.
	//   - both non-fatal and fatal errors in SubGo threads are sent to Go’s parent
	// 		like Go.AddError and Go.Done.
	//		- therefore, when a SubGo thread fails, the application will typically exit.
	//		- by awaiting SubGo, Go can delay its exit until SubGo has terminated
	//   - the SubGo thread-group terminates when the its thread exits
	SubGo(onFirstFatal ...GoFatalCallback) (subGo SubGo)
	// SubGroup creates a sub-ordinate GoGroup.
	//	- SubGroup fatal and non-fatal errors are sent to the parent GoGroup.
	//	-	SubGroup-context initiated Cancel only affect threads in the SubGroup thread-group
	//	- parent-initiated Cancel terminates SubGroup threads
	//	- SubGroup only awaits SubGroup threads
	//	- parent await also awaits SubGroup threads
	SubGroup(onFirstFatal ...GoFatalCallback) (subGroup SubGroup)
	// Wait waits for all threads of this thread-group to terminate.
	Wait()
	// returns a channel that closes on subGo end similar to Wait
	WaitCh() (ch AwaitableCh)
	// EnableTermination controls temporarily preventing the GoGroup from
	// terminating.
	// EnableTermination is initially true.
	//	- invoked with no argument returns the current state of EnableTermination
	//	- invoked with [AllowTermination] again allows for termination and
	//		immediately terminates the thread-group if no threads are currently running.
	//	- invoked with [PreventTermination] allows for the number of managed
	//		threads to be temporarily zero without terminating the thread-group
	EnableTermination(allowTermination ...bool) (mayTerminate bool)
	// Cancel terminates the threads in this and subordinate thread-groups.
	Cancel()
	// Context will Cancel when the parent context Cancels.
	// Subordinate thread-groups do not Cancel this context.
	Context() (ctx context.Context)
	// the available data for all threads
	Threads() (threads []ThreadData)
	// threads that have been named ordered by name
	NamedThreads() (threads []ThreadData)
	// SetDebug enables debug logging on this particular instance
	//   - parl.NoDebug
	//   - parl.DebugPrint
	//   - parl.AggregateThread
	SetDebug(debug GoDebug, log ...PrintfFunc)
	fmt.Stringer
}

type SubGroup added in v0.4.29

type SubGroup interface {
	SubGo
	// GoError returns a channel returning fatal and not-fatal errors and thread exits.
	//	- see GoError for exact error categories
	//
	// Usage:
	//
	//	var goErrors = goGroup.GoError()
	//	for goError := goErrors.Init(); goErrors.Condition(&goError); {
	//	  …
	//	var goEndCh = goErrors.EmptyCh(parl.CloseAwaiter)
	//	for {
	//	  select {
	//	  case <-goEndCh:
	//	    goEndCh = nil
	//	    …
	//	  case <-goErrors.DataWaitCh():
	//	    var goError, hasValue = goErrors.Get()
	//	    if !hasValue {
	//	      continue
	//	    …
	GoError() (goErrors IterableSource[GoError])
	// FirstFatal allows to await or inspect the first thread terminating with error.
	// it is valid if this SubGo has LocalSubGo or LocalChannel options.
	// To wait for first fatal error using multiple-semaphore mechanic:
	//  firstFatal := g0.FirstFatal()
	//  for {
	//    select {
	//    case <-firstFatal.Ch():
	//    …
	// To inspect first fatal:
	//  if firstFatal.DidOccur() …
	FirstFatal() (firstFatal *OnceWaiterRO)
}

type SyncAdd added in v0.4.20

type SyncAdd interface {
	Add(delta int)
}

SyncWait provides sync.WaitGroup.Add()

type SyncDone added in v0.4.20

type SyncDone interface {
	Done()
}

SyncDone provides sync.WaitGroup.Done()

type SyncWait added in v0.4.20

type SyncWait interface {
	Wait()
}

SyncWait provides sync.WaitGroup.Wait()

type TFunc added in v0.4.60

type TFunc[T any] func() (value T, err error)

TFunc is a function that returns value, err and may panic

type TResult added in v0.4.60

type TResult[T any] struct {
	Value   T
	IsPanic bool
	Err     error
}

TResult is a value-container for value, isPanic and error

func NewTResult added in v0.4.60

func NewTResult[T any](tFunc ...TFunc[T]) (tResult *TResult[T])

NewTResult creates a result container

  • if tFunc is present, it is invoked prior to returning storing its result
  • recovers tFunc panic

func NewTResult3 added in v0.4.122

func NewTResult3[T any](value *T, isPanic *bool, errp *error) (tResult *TResult[T])

NewTResult3 creates a TResult from pointers at the time values are available

  • value is considered valid if errp is nil or *errp is nil
  • any arguments may be nil

type ThreadData added in v0.4.43

type ThreadData interface {
	// threadID is the ID of the running thread assigned by the go runtime
	//	- IsValid method checks if value is present
	//	- zero value is empty string
	//	- .ThreadID().String(): "5"
	ThreadID() (threadID ThreadID)
	// createLocation is the code line of the go statement function-call
	// creating the goroutine thread
	// - IsSet method checks if value is present
	//	- Create().Short(): "g0.(*SomeType).SomeCode()-thread-data_test.go:73"
	Create() (createLocation *pruntime.CodeLocation)
	// Func returns the code line of the function of the running thread.
	// - IsSet method checks if value is present
	//	- .Func().Short(): "g0.(*SomeType).SomeFunction()-thread-data_test.go:80"
	Func() (funcLocation *pruntime.CodeLocation)
	// optional thread-name assigned by consumer
	//	- zero-value: empty string "" for threads that have not been named
	Name() (label string)
	// Short returns a short description of the thread "label:threadID" or fmt.Stringer
	//	- "myThreadName:4"
	//	- zero-value: "[empty]" ThreadDataEmpty
	//	- nil value: "threadData:nil" ThreadDataNil
	Short() (short string)
	// all non-empty fields: [label]:[threadID]_func:[Func]_cre:[Create]
	//	- "myThreadName:5_func:g0.(*SomeType).SomeFunction()-thread-data_test.go:80_cre:g0.(*SomeType).SomeCode()-thread-data_test.go:73"
	//	- zero-value: "[empty]" ThreadDataEmpty
	fmt.Stringer
}

ThreadData is information about a Go object thread.

  • initially, only Create is present
  • Name is only present for threads that have been named

type ThreadID added in v0.4.9

type ThreadID uint64

ThreadID is an opaque type that uniquley identifies a thread, ie. a goroutine.

  • goid.GoID obtains ThreadID for the executing thread.
  • in runtime.g, goid is uint64
  • ThreadID is comparable, ie. can be used as a map key.
  • ThreadID is fmt.Stringer
  • ThreadID has IsValid method

func (ThreadID) IsValid added in v0.4.40

func (threadID ThreadID) IsValid() (isValid bool)

func (ThreadID) String added in v0.4.12

func (threadID ThreadID) String() (s string)

type ThreadStatus added in v0.4.12

type ThreadStatus string

ThreadStatus indicates the current stat of a thread most often it is "running"

type Timer

type Timer struct {
	Label string
	// contains filtered or unexported fields
}

Timer is a simple request timer

func NewTimer

func NewTimer(label string) (t *Timer)

NewTimer gets a simple timer with duration or string output

func (*Timer) End

func (t *Timer) End() (d time.Duration)

End gets duration

func (*Timer) Endms

func (t *Timer) Endms() (ms string)

Endms gets tring with duration in ms

type Tracer added in v0.3.0

type Tracer interface {
	// AssignTaskToThread assigns a Thread to a task
	AssignTaskToThread(threadID ThreadID, task TracerTaskID) (tracer Tracer)
	// RecordTaskEvent adds an event to the task threadID is currently assigned to.
	// If threadID is not assigned, a new task is created.
	RecordTaskEvent(threadID ThreadID, text string) (tracer Tracer)
	// Records returns the current map of tasks and their events
	Records(clear bool) (records map[TracerTaskID][]TracerRecord)
}

Tracer lists events in terms of tasks rather than per time or thread. A task is executed by threads assigned to it. Threads are uniquely identified by threadID. A task can have zero or one threads assigned at any one time. A thread can be assigned to zero or one tasks. Each task has an ID, a name and a list of events and thread assignments Tracer can record branching in the code and return that for a particular item being processed. For an item processed incorrectly, or when threads hang, Tracer will find unfavorable branching and last known locations.

type TracerFactory added in v0.3.0

type TracerFactory interface {
	// NewTracer creates tracer storage.
	// use false indicates a nil Tracer whose output will not be used
	NewTracer(use bool) (tracer Tracer)
}

type TracerRecord added in v0.3.0

type TracerRecord interface {
	Values() (at time.Time, text string)
}

type TracerTaskID added in v0.3.0

type TracerTaskID string

type Trackerette added in v0.3.0

type Trackerette interface {
	// Serials emit serial number as online devices become available
	Serials() (serials <-chan AndroidSerial)
	// Errs is available once Serials close. It returns any errors
	Errs() (err error)
	// Cancel shuts down the Tracker
	Cancel()
}

Trackerette represents a server connection emitting device serial numbers

type UniqueID added in v0.4.26

type UniqueID[T ~string] uint64

UniqueID is an executable-invocation-unique identifier generator. The format is a named-type small-integer numeric string suitable to distinguish multiple instances of a type. The type is ordered and can be converted to string.

Usage:

type MyType string
var generator parl.UniqueID[MyType]
someID := generator.ID()

func (*UniqueID[T]) ID added in v0.4.26

func (u *UniqueID[T]) ID() (unique T)

ID generates a unique string identifier. thread-safe

type UniqueIDTypedUint64 added in v0.4.29

type UniqueIDTypedUint64[T ~uint64] uint64

UniqueIDTypedUint64 generates integer-based opaque uniquely-typed IDs. Thread-safe.

  • Different named-type series have their own unique type and can be told apart.
  • The named types are ordered integer-based with String method implementing fmt.Stringer.

Usage:

type T uint64
var generator parl.UniqueIDTypedUint64[T]
func (t T) String() string { return generator.StringT(t) }
someID := generator.ID()
var GoEntityIDs UniqueIDTypedUint64[GoEntityID]

GoEntityIDs is a generator for Go Object IDs

func (*UniqueIDTypedUint64[T]) ID added in v0.4.29

func (u *UniqueIDTypedUint64[T]) ID() (uniqueT T)

ID generates a unique ID of integral type. Thread-safe

func (*UniqueIDTypedUint64[T]) StringT added in v0.4.29

func (u *UniqueIDTypedUint64[T]) StringT(t T) (s string)

"parl.T:2", zero-value t prints "parl.T:0"

type UniqueIDUint64 added in v0.4.29

type UniqueIDUint64 uint64

UniqueIDUint64 generates executable-invocation-unique uint64 numbers 1… Different series of uint64 from different generators does not have identity, ie. they cannot be told apart. Consider UniqueIDTypedUint64 to have identity.

Usage:

var generator parl.UniqueIDUint64
id := generator.ID()

func (*UniqueIDUint64) ID added in v0.4.29

func (u *UniqueIDUint64) ID() (uniqueUint64 uint64)

ID generates a unique uint64 1…. thread-safe

func (*UniqueIDUint64) String added in v0.4.29

func (u *UniqueIDUint64) String() (s string)

type UniqueIDint added in v0.4.30

type UniqueIDint int

UniqueIDUint64 generates executable-invocation-unique uint64 numbers 1… Different series of uint64 from different generators does not have identity, ie. they cannot be told apart. Consider UniqueIDTypedUint64 to have identity.

Usage:

var generator parl.UniqueIDUint64
id := generator.ID()

func (*UniqueIDint) ID added in v0.4.30

func (u *UniqueIDint) ID() (uniqueID int)

ID generates a unique uint64 1…. thread-safe

func (*UniqueIDint) String added in v0.4.30

func (u *UniqueIDint) String() (s string)

type ValueSink added in v0.4.176

type ValueSink[T any] interface {
	// Send submits single value
	Send(value T)
	// SendSlice submits any number of values
	SendSlice(values []T)
	// EmptyCh provides close-like behavior
	//	- this thread invoking EmptyCh() signals end of values
	//	- other threads invoking EmptyCh(parl.CloseAwaiter) awaits:
	//	- — the EmptyCh() invocation and
	//	- — the AwaitableSlice becoming empty
	EmptyCh(doNotInitialize ...bool) (ch AwaitableCh)
}

ValueSink allows a thread to submit data to other threads

type ValueSource added in v0.4.176

type ValueSource[T any] interface {
	// Get obtains a single value
	Get() (value T, hasValue bool)
	// GetSlice obtains values by the slice
	//	- nil slice means source is currently empty
	GetSlice() (values []T)
	// GetAll receives a combined slice of all values
	//	- may cause allocation
	GetAll() (values []T)
	// Init allows for AwaitableSlice to be used in a for clause
	//   - returns zero-value for a short variable declaration in
	//     a for init statement
	//
	// Usage:
	//
	//	var a AwaitableSlice[…] = …
	//	for value := a.Init(); a.Condition(&value); {
	//	  // process received value
	//	}
	//	// the AwaitableSlice closed
	Init() (value T)
	// Condition allows for AwaitableSlice to be used in a for clause
	//   - updates a value variable and returns whether values are present
	//
	// Usage:
	//
	//	var a AwaitableSlice[…] = …
	//	for value := a.Init(); a.Condition(&value); {
	//	  // process received value
	//	}
	//	// the AwaitableSlice closed
	Condition(valuep *T) (hasValue bool)
	// DataWaitCh returns an awaitable closing channel that
	//	- is closed if data is available or
	//	- closes once data becomes available
	DataWaitCh() (ch AwaitableCh)
	// EmptyCh provides close-like behavior
	//	- this thread invoking EmptyCh(parl.CloseAwaiter) returns a
	//		channel that closes on stream completion:
	//	- — EmptyCh() invocation with no value by other thread and
	//	- — the AwaitableSlice becoming empty
	EmptyCh(doNotInitialize ...bool) (ch AwaitableCh)
}

DataSink allows a thread to submit data to other threads

type WaitGroupCh added in v0.4.148

type WaitGroupCh struct {
	// contains filtered or unexported fields
}

WaitGroupCh is like a sync.WaitGroup with channel-wait mechanic. therefore, unlike sync, WaitGroupCh is wait-free and observable. WaitGroupCh waits for a collection of goroutines to finish. The main goroutine increments the counter for each goroutine. Then each of the goroutines decrements the counter until zero. Wait or Ch can be used to block until all goroutines have finished. A WaitGroup must not be copied after first use. In the terminology of the Go memory model, decrementing “synchronizes before” the return of any Wait call or Ch read that it unblocks.

  • counter is increased by WaitGroupCh.Add or WaitGroupCh.Counts
  • counter is decreased by WaitGroupCh.Done WaitGroupCh.Add WaitGroupCh.DoneBool or WaitGroupCh.Counts
  • counter zero is awaited by WaitGroupCh.Ch or WaitGroupCh.Wait
  • observability if provided by WaitGroupCh.Counts WaitGroupCh.DoneBool and WaitGroupCh.IsZero
  • panic: negative counter from invoking decreasing methods is panic
  • panic: adjusting away from zero after invocation of WaitGroupCh.Ch or WaitGroupCh.Wait is panic. NOTE: all Add should happen prior to invoking Ch or Wait
  • WaitGroupCh is wait-free, observable, initialization-free, thread-safe
  • channel-wait mechanic allows the consumer to be wait-free Progress by the consumer-thread is not prevented since:
  • — the channel can be read non-blocking
  • — consumers can wait for multiple channel events
  • — consumers are not contending for a lock with any other thread
  • WaitGroupCh method-set is a superset of sync.WaitGroup
  • there are race conditions:
  • — writing a zero-counter record with unclosed channel, therefore closing the channel
  • — reading channel event while a zero-counter record exists, therefore closing the channel
  • — impact is that a panic might be missed

Usage:

var w parl.WaitGroupCh
w.Add(1)
go someFunc(&w)
…
<-w.Ch()
func someFunc(w parl.Doneable) {
  defer w.Done()

func (*WaitGroupCh) Add added in v0.4.148

func (w *WaitGroupCh) Add(delta int)

Add adds delta, which may be negative, to the WaitGroup counter

  • If the counter becomes zero, all goroutines blocked on Wait are released
  • If the counter goes negative, Add panics

func (*WaitGroupCh) Ch added in v0.4.148

func (w *WaitGroupCh) Ch() (awaitableCh AwaitableCh)

Ch returns a channel that closes once the counter reaches zero

func (*WaitGroupCh) Count added in v0.4.148

func (w *WaitGroupCh) Count() (currentCount int)

Counts returns the current number of threads

func (*WaitGroupCh) Counts added in v0.4.180

func (w *WaitGroupCh) Counts(delta ...int) (currentCount, totalAdds int)

Counts returns the current state optionally adjusting the counter

  • delta is optional counter adjustment
  • currentCount is current remaining count
  • totalAdds is cumulative positive adds over WaitGroup lifetime

func (*WaitGroupCh) Done added in v0.4.148

func (w *WaitGroupCh) Done()

Done decrements the WaitGroup counter by one.

func (*WaitGroupCh) DoneBool added in v0.4.148

func (w *WaitGroupCh) DoneBool() (isExit bool)

Done decrements the WaitGroup counter by one.

func (*WaitGroupCh) IsZero added in v0.4.148

func (w *WaitGroupCh) IsZero() (isZero bool)

IsZero returns whether the counter is currently zero

func (*WaitGroupCh) Reset added in v0.4.148

func (w *WaitGroupCh) Reset() (w2 *WaitGroupCh)

Reset triggers the current channel and resets the WaitGroup

func (*WaitGroupCh) String added in v0.4.148

func (w *WaitGroupCh) String() (s string)

func (*WaitGroupCh) Wait added in v0.4.148

func (w *WaitGroupCh) Wait()

Wait blocks until the WaitGroup counter is zero.

type Waitable added in v0.4.12

type Waitable interface {
	Wait() // similar to sync.WaitGroup.Wait()
}

Waitable is the invoker’s Wait part of sync.WaitGroup and other implementations. Waitable is a many-to-many relation. Waitable allows the caller to await exit and free of all invocations.

waitsForLots parl.WaitGroup
shutsDownLots parl.OnceChan
… = NewSomething(&waitsForLots, &shutsDownLots)
go someThread(&waitsForLots, &shutsDownLots)
func someThread(Doneable w, context.Context ctx) {
  defer w.Done()
  w.Add(2)
  go somethingElse()

type WaitedOn added in v0.4.20

type WaitedOn interface {
	SyncAdd
	SyncDone
	DoneBool() (isExit bool)
	IsZero() (isZero bool)
}

type Waiter added in v0.4.20

type Waiter interface {
	WaitedOn
	WaitingFor
}

waiter allows to use any of observable parl.WaitGroup or parl.TraceGroup

type WaitingFor added in v0.4.20

type WaitingFor interface {
	SyncAdd
	IsZero() (isZero bool)
	Counters() (adds, dones int)
	SyncWait
	String() (s string)
}

type WinOrWaiterCore added in v0.4.29

type WinOrWaiterCore struct {
	// contains filtered or unexported fields
}

WinOrWaiter picks a winner thread to carry out some task used by many threads.

  • threads in WinOrWait for an idle WinorWaiter may become winners completing the task
  • threads in WinOrWait while a calculation is in progress are held waiting using RWLock and atomics until the calculation completes
  • the calculation is completed on demand, but only by the first requesting thread
  • mechanic is closing channel inside Awaitable inside Future
  • threads await a renewable Future until a new-enough result occurs
  • of the threads requring a newer calculation, one is selected to perform the calculation

func NewWinOrWaiterCore added in v0.4.29

func NewWinOrWaiterCore(strategy WinOrWaiterStrategy, calculator func() (err error), ctx ...context.Context) (winOrWaiter *WinOrWaiterCore)

WinOrWaiter returns a semaphore used for completing an on-demand task by the first thread requesting it, and that result shared by subsequent threads held waiting for the result.

  • strategy: WinOrWaiterAnyValue WinOrWaiterMustBeLater
  • ctx allows foir cancelation of the WinOrWaiter

func (*WinOrWaiterCore) IsCancel added in v0.4.29

func (ww *WinOrWaiterCore) IsCancel() (isCancel bool)

func (*WinOrWaiterCore) WinOrWait added in v0.4.29

func (ww *WinOrWaiterCore) WinOrWait() (err error)

WinOrWaiter picks a winner thread to carry out some task used by many threads.

  • threads arriving to an idle WinorWaiter are winners that complete the task
  • threads arriving to a WinOrWait in progress are held waiting at RWMutex
  • the task is completed on demand, but only by the first thread requesting it

type WinOrWaiterStrategy added in v0.4.41

type WinOrWaiterStrategy uint8
const (
	// WinOrWaiterAnyValue allows a thread to accept any calculated value
	WinOrWaiterAnyValue WinOrWaiterStrategy = iota
	// WinOrWaiterMustBeLater forces a calculation commencing after a thread arrives
	//	- WinOrWaiter caclulations are serialized, ie. a new calculation does not start prior to
	//		the conclusion of the previous calulation
	//	- thread arrival time is prior to acquiring the lock
	WinOrWaiterMustBeLater
)

func (WinOrWaiterStrategy) IsValid added in v0.4.50

func (ws WinOrWaiterStrategy) IsValid() (isValid bool)

func (WinOrWaiterStrategy) String added in v0.4.41

func (ws WinOrWaiterStrategy) String() (s string)

Source Files

Directories

Path Synopsis
Package counter provides simple and rate counters and tracked datapoints
Package counter provides simple and rate counters and tracked datapoints
Package enum provides concrete types to create enumerated collections
Package enum provides concrete types to create enumerated collections
g0
Package g0 provides Go threads and thread-groups
Package g0 provides Go threads and thread-groups
g0debug
Package g0debug provides troubleshooting types for Go threads and thread-groups
Package g0debug provides troubleshooting types for Go threads and thread-groups
Package goid obtaines a thread’s parl.ThreadID unique goroutine identifier.
Package goid obtaines a thread’s parl.ThreadID unique goroutine identifier.
Package halt detects Go runtime execution halts.
Package halt detects Go runtime execution halts.
Package iana provides iana-standardized Internet protocols, port numbers, address families and an Internet-socket identifier
Package iana provides iana-standardized Internet protocols, port numbers, address families and an Internet-socket identifier
internal
cyclebreaker
© 2020–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/) ISC License
© 2020–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/) ISC License
Package ints provide manipulation of integer types.
Package ints provide manipulation of integer types.
Package iters provides thread-safe iterators usable with Go’s for statement
Package iters provides thread-safe iterators usable with Go’s for statement
Package mains contains functions for implementing a service or command-line utility
Package mains contains functions for implementing a service or command-line utility
omaps module
Package parlca provides a self-signed certificate authority
Package parlca provides a self-signed certificate authority
Package parli provides certain interface declarations.
Package parli provides certain interface declarations.
Package pbytes provides byte-slice related functions TrimNewline.
Package pbytes provides byte-slice related functions TrimNewline.
Package pdebug provides a portable, parsed stack trace.
Package pdebug provides a portable, parsed stack trace.
Package perrors enrichens error values with string data, stack traces, associated errors, less severe warnings, thread-safe containers and comprehensive error string representations.
Package perrors enrichens error values with string data, stack traces, associated errors, less severe warnings, thread-safe containers and comprehensive error string representations.
errorglue
Package errorglue contains non-essential error declarations
Package errorglue contains non-essential error declarations
Package pexec provides streaming, context-cancelable system command execution
Package pexec provides streaming, context-cancelable system command execution
Package pflags provides declarative options and a string-slice option type.
Package pflags provides declarative options and a string-slice option type.
Package pfmt provides an fmt.Printf %v function that does not use the fmt.Stringer.String method
Package pfmt provides an fmt.Printf %v function that does not use the fmt.Stringer.String method
Package pfs provides a symlink-following file-systemtraverser and other file-system functions.
Package pfs provides a symlink-following file-systemtraverser and other file-system functions.
Package pids provides a typed process identifier.
Package pids provides a typed process identifier.
Package pio provides a context-cancelable stream copier, a closable buffer, line-based reader and other io functions
Package pio provides a context-cancelable stream copier, a closable buffer, line-based reader and other io functions
plog provides thread-safe log instances for any writer
plog provides thread-safe log instances for any writer
Package pmaps provides an unordered, thread-safe, RWMutex-mechanic map pmaps.RWMap.
Package pmaps provides an unordered, thread-safe, RWMutex-mechanic map pmaps.RWMap.
pmaps2
Package pmaps2 contains resusable map types for other parl packages.
Package pmaps2 contains resusable map types for other parl packages.
Package pnet provides IP-related functions with few dependencies beyond the net package
Package pnet provides IP-related functions with few dependencies beyond the net package
Package parlos provides simplified functions for home-directory and hostname.
Package parlos provides simplified functions for home-directory and hostname.
pqs provides legacy priority-queue implementation likely to be deprecated
pqs provides legacy priority-queue implementation likely to be deprecated
Package prand provides a fast and thread-safe random number generation.
Package prand provides a fast and thread-safe random number generation.
Package preflect obtains underlying type using reflection
Package preflect obtains underlying type using reflection
process module
Package pruntime provides an interface to the Go standard library’s runtime package using only serializable simple types
Package pruntime provides an interface to the Go standard library’s runtime package using only serializable simple types
Package pslices provides somehat outdated slice functions such as ordered slices
Package pslices provides somehat outdated slice functions such as ordered slices
psql module
Package pstrings provides string fitting, filtered join and quoting of a string slice.
Package pstrings provides string fitting, filtered join and quoting of a string slice.
pterm module
Package ptesting provides a platform identifier for Go benchmarks.
Package ptesting provides a platform identifier for Go benchmarks.
Package parltime provides on-time timers, 64-bit epoch, formaatting and other time functions.
Package parltime provides on-time timers, 64-bit epoch, formaatting and other time functions.
Package punix examines unix.Errno errors and identifies Unix-like platforms.
Package punix examines unix.Errno errors and identifies Unix-like platforms.
Set provides a collection of unique elements of a particular type T that are printable, type convertible and have verifiable validity.
Set provides a collection of unique elements of a particular type T that are printable, type convertible and have verifiable validity.
sqliter module
watchfs module
yaml module
yamler module
Package yamlo allows the yamler package to unmarshal unexported types and finds yaml configuration files.
Package yamlo allows the yamler package to unmarshal unexported types and finds yaml configuration files.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL