parl

package module
v0.4.33 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2022 License: ISC Imports: 22 Imported by: 14

README

Parl


A Go library for parallel programming of command-line utilities and system services

features: moderator, sqlite interface, package-selectable logging, debouncer, self-signed certificate authority, watchers, thread management, file system scan and operations, …


© 2018–present Harald Rudell (https://haraldrudell.github.io/haraldrudell/)
ISC License

“justice, peace and massive virtual parallelism”

How to use

import "github.com/haraldrudell/parl"

Documentation

Go Reference  Documentation in the Go Package Index

On March 16th, 2022, parl was open-sourced under an ISC License
Parl is about 15,000 lines of Go code with first line written on November 21, 2018

© 2018–present Harald Rudell harald.rudell@gmail.com (https://haraldrudell.github.io/haraldrudell/)

Documentation

Overview

Package parl handles inter-thread communication and controls parallelism

parl has sub-packages augmenting the Go standard library:

perrors pfs plog pnet pos pruntime psql pstrings
psyscall pterm ptime

parl has feature packages:

ev — handling of goroutine-based functions
goid — unique goroutine IDs
mains — functions for writing command-line utilities and services
parlca — self-signed certificate authority
progress — monitor work progress for a large number of threads
sqlite — local SQL database
threadprof — profiling and counters for what threads are doing
// statuser: thread hang detector
tracer — event lists by task rather than by time or thread

parl features per-writer thread-safe logging with topic and per-package output control:

Logging is to stderr except for the Out function. parl logging uses comma separator for numbers. One argument is output as string, two or more arguments is Printf. The location matched against the regular expression is full package path, optional type receiver and the funtion name: “github.com/haraldrudell/mypackage.(*MyType).MyFunc”

Out(string, ...interface{}) — Standard output
Log(string, ...interface{}) — Always outputs to stderr
parl.D(string, ...interface{}) — Same as Log, intended for temporary use

Info(string, ...interface{}) — Informational progress messages
SetSilent(true) — removes Info output
IsSilent() — deteremines if Info printing applies

Debug(string, ...interface{}) — only prints for locations where SetDebug(true)
SetDebug(true) — Control Debug() globally, code location for all prints, long stack traces
SetRegexp(regExp string) (err error) — Regular expression controlling local Debug() printing
IsThisDebug() — Determines if debug is active for the executing function

Console(string, ...interface{}) — terminal interactivity output

parl.Recover() and parl.Recover2() thread recovery and mains.Executable.Recover() process recovery:

Threads can provide their errors via the perrors.ParlError thread-safe error store, plain error channels, parl.NBChan[error] or parl.ClosableChan[error]. parl.Recover and parl.Recover2 convert thread panic to error along with regular errors, annotating, retrieving and storing those errors and invoking error handling functions for them. mains.Recover is similar for the process.

func thread(errCh *parl.NBChan[error]) { // real-time non-blocking error channel
  defer errCh.Close() // non-blocking close effective on send complete
  var err error
  defer parl.Recover2(parl.Annotation(), &err, errCh.Send)
  errCh.Ch() <- err // non-blocking
  if err = someFunc(); err != nil {
    err = perrors.Errorf("someFunc: %w", err) // labels and attaches a stack
    return
…
func myThreadSafeThread(wg *sync.WaitGroup, errs *perrors.ParlError) { // ParlError: thread-safe error store
  defer wg.Done()
  var err error
  defer parl.Recover(parl.Annotation(), &err, errs.AddErrorProc)
…

parl package features:

AtomicBool — Thread-safe boolean
Closer — Deferrable, panic-free channel close
ClosableChan — Initialization-free channel with observable deferrable panic-free close
Moderator — A ticketing system for limited parallelism
NBChan — A non-blocking channel with trillion-size dynamic buffer
SerialDo — Serialization of invocations
WaitGroup —Observable WaitGroup
Debouncer — Invocation debouncer, pre-generics
Sprintf — Supporting thousands separator

Parl is about 15,000 lines of Go code with first line written on November 21, 2018

On March 16th, 2022, parl was open-sourced under an ISC License

© 2018–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)

© 2020–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/) ISC License

Index

Constants

View Source
const (
	Rfc3339s  = "2006-01-02 15:04:05Z07:00"
	Rfc3339ms = "2006-01-02 15:04:05.000Z07:00"
	Rfc3339us = "2006-01-02 15:04:05.000000Z07:00"
	Rfc3339ns = "2006-01-02 15:04:05.000000000Z07:00"
)

Variables

View Source
var ErrEndCallbacks = errors.New("end callbacks error")

ErrEndCallbacks indicates upon retun from a callback function that no more callbacks are desired. It does not indicate an error and is not returned as an error by any other function than the callback.

callback invocations may be thread-safe, re-entrant and panic-handling but this deopends on the callback-invoking implementation used.

if errors.Is(err, parl.ErrEndCallbacks) { …
View Source
var ErrErrpNil = errors.New("errp cannot be nil")

ErrErrpNil indicates that a function with an error pointer argument received an errp nil value.

if errors.Is(err, ErrErrpNil) …
View Source
var ErrNotCancelContext = errors.New("context chain does not have CancelContext")

ErrNotCancelContext indicates that InvokeCancel was provided a context not a return value from NewCancelContext or NewCancelContextFunc.

Tets for ErrNotCancelContext:

if errors.Is(err, parl.ErrNotCancelContext) …

Functions

func AddNotifier added in v0.4.29

func AddNotifier(ctx context.Context, notifierFn func(slice pruntime.StackSlice)) (
	ctx2 context.Context)

func AddToPanic

func AddToPanic(panicValue interface{}, additionalErr error) (err error)

AddToPanic takes a recover() value and adds it to additionalErr.

func Annotation

func Annotation() (annotation string)

Annotation provides a default annotation [base package].[function]: "mypackage.MyFunc"

func CancelOnError added in v0.4.28

func CancelOnError(errp *error, ctx context.Context)

CancelOnError invokes InvokeCancel if errp has an error. CancelOnError is deferrable and thread-safe. ctx must have been returned by either NewCancelContext or NewCancelContextFunc.

  • errp == nil or *errp == nil means no error
  • ctx nil is panic
  • ctx not from NewCancelContext or NewCancelContextFunc is panic
  • thread-safe, idempotent

func Close added in v0.4.15

func Close(closable io.Closer, errp *error)

Close is a deferrable function that closes an io.Closer object. Close handles panics. if errp is non-nil, panic values updates it using errors.AppendError.

func Closer added in v0.4.0

func Closer[T any](ch chan T, errp *error)

Closer is a deferrable function that closes a channel. Closer handles panics. if errp is non-nil, panic values updates it using errors.AppendError.

func CloserSend added in v0.4.15

func CloserSend[T any](ch chan<- T, errp *error)

CloserSend is a deferrable function that closes a send-channel. CloserSend handles panics. if errp is non-nil, panic values updates it using errors.AppendError.

func Console

func Console(format string, a ...interface{})

Console always prints to stdout, intended for command-line interactivity. if debug is enabled, code location is appended.

func Consolew added in v0.4.12

func Consolew(format string, a ...interface{})

Consolew always prints to stdout intended for command-line interactivity. Consolew does not append a newline.

func D

func D(format string, a ...interface{})

D always prints to stderr with code location and is thread safe. D is meant for temporary output using invocations that are removed prior to source code repository check-in.

func Debug

func Debug(format string, a ...interface{})

Debug outputs only if debug is configured or the code location package matches regexp. Debug outputs to stderr and code location is appended.

func DoGoGetError added in v0.4.26

func DoGoGetError(op func() (err error), g0 Go) (err error)

DoGoGetError executes op in a thread. err contains any error, error are not submitted to Go object. DoGoGetError blocks until the goroutine completes.

func DoProcThread added in v0.4.26

func DoProcThread(op func(), g0 Go)

func DoThread added in v0.4.26

func DoThread(op func() (err error), g0 Go)

DoThread is invoked in a go statement and executes op. g0 receives errors and is the wait-for function.

func DoThreadError added in v0.4.26

func DoThreadError(op func() (err error), errCh chan<- error, g0 Go)

DoThreadError is a goroutine that returns its error separately.

func EnsureError

func EnsureError(panicValue interface{}) (err error)

AddToPanic ensures that a recover() value is an error or nil.

func GetD added in v0.4.26

func GetD(skipFrames int) (debug func(format string, a ...interface{}))

GetD obtains always printing D based on the invocation location for later execution

func GetDebug added in v0.4.25

func GetDebug(skipFrames int) (debug func(format string, a ...interface{}))

GetDebug obtains a Debug function based on the invocation location adjusted by skipFrames. The return function is used for later execution. The returned function outputs only if debug is configured or the code location package matches regexp. The returned function outputs to stderr and code location is appended.

func HandleErrp added in v0.2.2

func HandleErrp(fn func(), errp *error)

HandleErrp recovers from a panic in fn storing at *errp. HandleErrp is deferable.

func HandlePanic

func HandlePanic(fn func()) (err error)

HandlePanic recovers from panic in fn returning error.

func HandleParlError added in v0.2.2

func HandleParlError(fn func(), storeError func(err error))

HandleParlError recovers from panic in fn invoking an error callback. HandleParlError is deferable storeError can be the thread-safe perrors.ParlError.AddErrorProc()

func HasCancel added in v0.4.29

func HasCancel(ctx context.Context) (hasCancel bool)

func ImportNewStack added in v0.4.29

func ImportNewStack(pdebugNewStack func(skipFrames int) (stack Stack))

func Infallible added in v0.4.12

func Infallible(err error)

func Info

func Info(format string, a ...interface{})

Info prints unless silence has been configured with SetSilence(true). Info outputs to stderr. IsSilent() deteremines the state of silence. if debug is enabled, code location is appended.

func InitFutureCore added in v0.4.26

func InitFutureCore[T any](f *FutureCore[T],
	resolver Resolver[T],
)

InitFutureCore initializes a future and executes fn

func InitWaitGroup added in v0.4.29

func InitWaitGroup(wgp *WaitGroup)

func InvokeCancel added in v0.4.28

func InvokeCancel(ctx context.Context)

InvokeCancel finds the cancel method in the context chain and invokes it. ctx must have been returned by either NewCancelContext or NewCancelContextFunc.

  • ctx nil is panic
  • ctx not from NewCancelContext or NewCancelContextFunc is panic
  • thread-safe, idempotent

func InvokeIf added in v0.4.29

func InvokeIf[T comparable](tp *T, fn func())

InvokeIf is a deferrable function invoking its function argument when:

  • the pointer tp is non-nil and the function fn is non-nil
  • what tp points to is not a T zero-value

Usage:

someFlag := false
defer InvokeIf(&someFlag, someFunction)
…
someFlag = someValue

func IsSilent

func IsSilent() (isSilent bool)

IsSilent if true it means that Info does not print

func IsThisDebug

func IsThisDebug() bool

IsThisDebug returns true when debug is globally set using Debug(true) or when debug logging is configured for the code location using SetRegexp().

func IsThisDebugN added in v0.4.25

func IsThisDebugN(skipFrames int) (isDebug bool)

IsThisDebugN returns true when debug is globally set using Debug(true) or when debug logging is configured for the code location adjusted by skipFrames using SetRegexp().

func Log

func Log(format string, a ...interface{})

Log invocations always print and output to stderr. if debug is enabled, code location is appended.

func Logw added in v0.4.12

func Logw(format string, a ...interface{})

Logw invocations always print and outoput to stderr. Logw outputs without appending newline.

func NewCancelContext added in v0.4.12

func NewCancelContext(ctx context.Context) (cancelCtx context.Context)

NewCancelContext creates a context that can be provided to InvokeCancel. the return value encapsulates a cancel function.

ctx := NewCancelContext(context.Background())
…
InvokeCancel(ctx)

func NewCancelContextFunc added in v0.4.26

func NewCancelContextFunc(ctx context.Context, cancel context.CancelFunc) (cancelCtx context.Context)

NewCancelContextFunc stores the cancel function cancel in the context ctx. the returned context can be provided to InvokeCancel to cancel the context.

func NewThreadResult added in v0.4.12

func NewThreadResult(err error) (failure error)

func NoOnError added in v0.4.12

func NoOnError(err error)

NoOnError is used with Recover to silence the default error logging

func OnCancel added in v0.4.12

func OnCancel(fn func(), ctx context.Context)

OnCancel invokes fn when work done on behalf of context ctx should be canceled

func Out

func Out(format string, a ...interface{})

Out always prints payload output to stdout.

func Outw added in v0.4.12

func Outw(format string, a ...interface{})

Outw always prints payload output to stdout without terminating newline.

func Recover

func Recover(annotation string, errp *error, onError func(error))

Recover recovers from a panic invoking a function no more than once. If there is *errp does not hold an error and there is no panic, onError is not invoked. Otherwise, onError is invoked exactly once. *errp is updated with a possible panic.

func Recover2

func Recover2(annotation string, errp *error, onError func(error))

Recover2 recovers from a panic and may invoke onError multiple times. onError is invoked if there is an error at *errp and on a possible panic. *errp is updated with a possible panic.

func RecoverInvocationPanic added in v0.4.29

func RecoverInvocationPanic(fn func(), errp *error)

RecoverInvocationPanic is intended to wrap callback invocations in the callee in order to recover from panics in the callback function. when an error occurs, perrors.AppendError appends the callback error to *errp. if fn is nil, a recovered panic results. if errp is nil, a panic is thrown, can be check with:

if errors.Is(err, ErrErrpNil) …

func SetDebug

func SetDebug(debug bool)

if SetDebug is true, all Debug prints everywhere produce output. More selective debug printing can be achieved using SetInfoRegexp that matches package names.

func SetRegexp

func SetRegexp(regExp string) (err error)

SetRegexp defines a regular expression for selective debug printing to stderr. SetRegexp affects Debug() GetDebug() IsThisDebug() IsThisDebugN() functions.

Regular Expression

Regular expression is the RE2 syntax used by golang. command-line documentation: go doc regexp/syntax. The string the regular expression is matched against is a fully qualified function name, ie.

Code Location Format

The regular expression is matched agains the fully qualified function name for the code line being evaluated. This is a fully qualified golang package path, ".", a possible type name in parenthesis ending with "." and the function name.

"github.com/haraldrudell/parl/mains.(*Executable).AddErr"

To obtain the fully qualified function name for a particular location:

parl.Log(pruntime.NewCodeLocation(0).String())

func SetSilent

func SetSilent(silent bool)

SetSilent(true) prevents Info() invocations from printing.

func Short added in v0.4.26

func Short(tim ...time.Time) (s string)

Short provides a brief time-stamp in compact second-precision including time-zone.

  • sample: 060102_15:04:05-08
  • The timestamp does not contain space.
  • time zone is what is included in tim, typically time.Local
  • if tim is not specified, time.Now() in local time zone

func ShortMs added in v0.4.27

func ShortMs(tim ...time.Time) (s string)

ShortMs provides a brief time-stamp in compact milli-second-precision including time-zone.

  • sample: 060102_15:04:05.123-08
  • The timestamp does not contain space.
  • time zone is what is included in tim, typically time.Local
  • if tim is not specified, time.Now() in local time zone

func ShortSpace added in v0.4.28

func ShortSpace(tim ...time.Time) (s string)

Short provides a brief time-stamp in compact second-precision including time-zone.

  • sample: 060102 15:04:05-08
  • date is 6-digit separated from time by a space
  • time zone is what is included in tim, typically time.Local
  • if tim is not specified, time.Now() in local time zone

func Sprintf added in v0.2.0

func Sprintf(format string, a ...interface{}) string

Sprintf is a printer that supports comma in large numbers

Types

type AdbAdressProvider added in v0.4.0

type AdbAdressProvider interface {
	// AdbSocketAddress retrievs the tcp socket address used
	// by a near Adbette implementation
	AdbSocketAddress() (socketAddress AdbSocketAddress)
}

AdressProvider retrieves the address from an adb server or device so that custom devices can be created

type AdbRequest added in v0.3.0

type AdbRequest string

AdbRequest is a string formatted as an adb request. AdbRequest is only required for implementations using the Adbette protocol impementation

type AdbResponseID added in v0.4.0

type AdbResponseID string

type AdbSocketAddress added in v0.4.0

type AdbSocketAddress string

AdbSocketAddress is a tcp socket address accessible to the local host. The format is two parts separated by a colon. The first part is an IP address or hostname. The second part is a numeric port number. The empty string "" represents "localhost:5037". If the port part is missing, such as "localhost" it implies port 5037. If the host part is missing, it implies "localhost". Note that locahost is valid both for IPv4 and IPv6.

type AdbSyncRequest added in v0.4.0

type AdbSyncRequest string

type Adbette added in v0.3.0

type Adbette interface {
	// SendReadOkay sends a request to a remote adb endpoint.
	// if anything else than OKAY is received back from the
	// remote endpoint, err is non-nil.
	SendReadOkay(request AdbRequest) (err error)
	// ReadString reads utf-8 text up to 64 KiB-1 in length
	ReadString() (s string, err error)
	// ConnectToDevice sends a forwarding request to an adb
	// server to connect to one of its devices
	ConnectToDevice(serial AndroidSerial) (err error)
	// Shell executes a shell command on a device connected to the adb server.
	// out is a combination of stderr and stdout.
	// The status code from an on-device command cannot be obtained
	Shell(command string, reader ...func(conn io.ReadWriteCloser) (err error)) (out string, err error)
	// TrackDevices orders a server to emit serial number as they become available
	TrackDevices() (err error)
	// Devices lists the currently online serials
	Devices() (serials []AndroidSerial, err error)
	// DeviceStati returns all available serials and their status
	// The two slices correspond and are of the same length
	DeviceStati() (serials []AndroidSerial, stati []AndroidStatus, err error)
	// Closer closes an adb connection, meant to be a deferred function
	Closer(errp *error)
	// SetSync sets the protocol mode of an adb device connection to sync
	SetSync() (err error)
	// LIST is a sync request that lists file system entries in a directory of an adb device
	LIST(remoteDir string, dentReceiver func(mode uint32, size uint32, time uint32, byts []byte) (err error)) (err error)
	// RECV fetches the contents of a file on an adb device
	RECV(remotePath string, blobReceiver func(data []byte) (err error)) (err error)
	// CancelError is a value that a LIST or RECV callback routines can return to
	// cancel further invocations
	CancelError() (cancelError error)

	SendBlob(syncRequest AdbSyncRequest, data []byte) (err error)
	ReadBlob() (byts []byte, err error)
	ReadResponseID() (responseID AdbResponseID, err error)
	ReadBytes(byts []byte) (err error)
	SendBytes(byts []byte) (err error)
}

Adbette is a minimal implementation of the adb Android debug bridge protocol. Adbette include both adb server and Android device functions. Adbette is extensible in that additional protocol features are easily implemented without concern for protocol details. to shutdown an Adbette and release its resouces, invoke the Closer method

type Adbetter added in v0.3.0

type Adbetter interface {
	NewConnection(address AdbSocketAddress, ctx context.Context) (conn Adbette, err error)
}

Adbetter is a factory instance for connections featuring Adbette context is used for terminating a connection attempt. context is not retained in the connection.

type AggregatePriority added in v0.4.30

type AggregatePriority[V any, P constraints.Ordered] interface {
	// Aggregator returns the aggregator associated with this AggregatePriority
	Aggregator() (aggregator Aggregator[V, P])
	// Update caches the current priority from the aggregator
	Update()
	// Priority returns the effective cached priority
	//	- Priority is used by consumers of the AggregatingPriorityQueue
	CachedPriority() (priority P)
}

AggregatePriority caches the priority value from an aggregator for priority.

  • V is the value type used as a pointer
  • P is the priority type descending order, ie. Integer Floating-Point string

type AggregatingPriorityQueue added in v0.4.30

type AggregatingPriorityQueue[V any, P constraints.Ordered] interface {
	// Get retrieves a possible value container associated with valuep
	Get(valuep *V) (aggregator Aggregator[V, P], ok bool)
	// Put stores a new value container associated with valuep
	//   - the valuep is asusmed to not have a node in the queue
	Put(valuep *V, aggregator Aggregator[V, P])
	// Update re-prioritizes a value
	Update(valuep *V)
	// Clear empties the priority queue. The hashmap is left intact.
	Clear()
	// List returns the first n or default all values by pirority
	List(n ...int) (aggregatorQueue []AggregatePriority[V, P])
}

pmaps.AggregatingPriorityQueue uses cached priority obtained from Aggregators that operates on the values outside of the AggregatingPriorityQueue.

  • the Update method reprioritizes an updated value element

type Aggregator added in v0.4.30

type Aggregator[V any, P constraints.Ordered] interface {
	// Value returns the value object this Aggregator is associated with
	//	- the Value method is used by consumers of the AggregatingPriorityQueue
	Value() (valuep *V)
	// Aggregate aggregates and snapshots data values from the value object.
	//	- Aggregate is invoked outside of AggregatingPriorityQueue
	Aggregate()
	// Priority returns the current priority for the associated value
	//	- this priority is cached by AggregatePriority
	Priority() (priority P)
}

Aggregator aggregates, snapshots and assigns priority to an associated value.

  • V is the value type used as a pointer
  • V may be a thread-safe object whose values change in real-time
  • P is the priority type descending order, ie. Integer Floating-Point string

type AndroidSerial added in v0.3.0

type AndroidSerial string

AndroidSerial uniquely identities an Android device. It is typically a string of a dozen or so 8-bit chanacters consisting of lower and upper case a-zA-Z0-9

type AndroidStatus added in v0.3.0

type AndroidStatus string

AndroidStatus indicates the current status of a device known to a Server or Serverette it is a single word of ANSII-set characters

const AndroidOnline AndroidStatus = "device"

AndroidOnline is the Android device status that indicates an online device

type AssignedPriority added in v0.4.30

type AssignedPriority[V any, P constraints.Ordered] interface {
	SetPriority(priority P)
}

AssignedPriority contains the assigned priority for a priority-queue element

  • V is the element value type whose pointer-value provides identity
  • P is the priority, a descending-ordered type: Integer Floating-Point string

type AtomicBool

type AtomicBool struct {
	// contains filtered or unexported fields
}

AtomicBool is a thread-safe flag. AtomicBool requires no initialization

var isDone parl.AtomicBool
if isDone.Set() // isDone was not set, but is set now
…
if !isDone.IsTrue() // isDone is not set

func (*AtomicBool) Clear

func (ab *AtomicBool) Clear() (wasSet bool)

Clear sets the flag to false and returns true if the flag was not already false. thread-safe

func (*AtomicBool) IsTrue

func (ab *AtomicBool) IsTrue() (isTrue bool)

IsTrue returns the flag’s current bool value. thread-safe

func (*AtomicBool) Set

func (ab *AtomicBool) Set() (wasNotSet bool)

Set sets the flag to true and returns true if the flag was not already true. thread-safe

type AtomicReferece added in v0.4.29

type AtomicReferece[T any] struct {
	// contains filtered or unexported fields
}

func MakeAtomicReferece added in v0.4.29

func MakeAtomicReferece[T any]() (reference AtomicReferece[T])

func (*AtomicReferece[T]) Get added in v0.4.29

func (ref *AtomicReferece[T]) Get() (reference *T)

func (*AtomicReferece[T]) Put added in v0.4.29

func (ref *AtomicReferece[T]) Put(reference *T)

type Certificate added in v0.4.26

type Certificate interface {
	DER() (der CertificateDer)
	PEM() (pemBytes PemBytes)
	ParseCertificate() (certificate *x509.Certificate, err error)
}

type CertificateAuthority added in v0.4.26

type CertificateAuthority interface {
	Check() (cert *x509.Certificate, err error) // gets x509.Certificate version
	DER() (certificateDer CertificateDer)       // untyped bytes, der: Distinguished Encoding Rules binary format
	Sign(template *x509.Certificate, publicKey crypto.PublicKey) (certDER CertificateDer, err error)
	PEM() (pemBytes PemBytes)
	Private() (privateKey PrivateKey)
}

type CertificateDer added in v0.4.26

type CertificateDer []byte

CertificateDer is a binary encoding of a certificate. der: Distinguished Encoding Rules is a binary format based on asn1.

type ClosableChan added in v0.4.0

type ClosableChan[T any] struct {
	// contains filtered or unexported fields
}

ClosableChan wraps channel close. ClosableChan is an initialization-free channel with a deferable, thread-safe, idempotent and observable Close method. Close closes the channel exactly once, recovering panics. IsClosed provides wether the Close method did execute close.

var errCh parl.ClosableChan[error]
go thread(&errCh)
err, ok := <-errCh.Ch()
if errCh.isClosed() { // can be inspected
…
func thread(errCh *parl.ClosableChan[error]) {
  defer errCh.Close(nil) // will not terminate the process
  errCh.Ch() <- err

func NewClosableChan added in v0.4.5

func NewClosableChan[T any](ch ...chan T) (cl *ClosableChan[T])

NewClosableChan ensures a chan does not throw

func (*ClosableChan[T]) Ch added in v0.4.0

func (cl *ClosableChan[T]) Ch() (ch chan T)

Ch retrieves the channel

func (*ClosableChan[T]) Close added in v0.4.0

func (cl *ClosableChan[T]) Close(errp ...*error) (didClose bool, err error)

Close ensures the channel is closed. Close does not panic. Close is thread-safe. Close does not return until the channel is closed. Upon return, all invocations have a possible close error in err. if errp is non-nil, it is updated with a possible error didClose indicates whether this invocation closed the channel

func (*ClosableChan[T]) IsClosed added in v0.4.0

func (cl *ClosableChan[T]) IsClosed() (isClosed bool)

IsClosed indicates whether the Close method has been invoked

type Closers added in v0.4.26

type Closers struct {
	// contains filtered or unexported fields
}

Closers collects io.Closer objects so they can be closed all at once. Closer is required for servers that may have the server itself and

func (*Closers) Add added in v0.4.26

func (cl *Closers) Add(closer io.Closer)

func (*Closers) Close added in v0.4.26

func (cl *Closers) Close() (err error)

func (*Closers) EnsureClosed added in v0.4.26

func (cl *Closers) EnsureClosed(closer io.Closer) (err error)

func (*Closers) Remove added in v0.4.26

func (cl *Closers) Remove(closer io.Closer) (ok bool)

type Conduit added in v0.4.12

type Conduit[T any] interface {
	Sender[T]
	Receiver[T]
}

func NewConduit added in v0.4.12

func NewConduit[T any](done Doneable, ctx context.Context) (conduit Conduit[T])

type ConduitDo added in v0.4.12

type ConduitDo[T any] struct {
	// contains filtered or unexported fields
}

func (*ConduitDo[T]) Count added in v0.4.12

func (ct *ConduitDo[T]) Count() (count int)

func (*ConduitDo[T]) Get added in v0.4.12

func (ct *ConduitDo[T]) Get() (value T, ok bool)

func (*ConduitDo[T]) GetSlice added in v0.4.12

func (ct *ConduitDo[T]) GetSlice(max int) (values []T, ok bool)

func (*ConduitDo[T]) IsCanceled added in v0.4.12

func (ct *ConduitDo[T]) IsCanceled() (IsCanceled bool)

func (*ConduitDo[T]) IsEmpty added in v0.4.12

func (ct *ConduitDo[T]) IsEmpty() (isEmpty bool)

func (*ConduitDo[T]) Put added in v0.4.12

func (ct *ConduitDo[T]) Put(value T) (IsCanceled bool)

func (*ConduitDo[T]) PutSlice added in v0.4.12

func (ct *ConduitDo[T]) PutSlice(values []T) (IsCanceled bool)

func (*ConduitDo[T]) WaitCount added in v0.4.12

func (ct *ConduitDo[T]) WaitCount() (waitCount int)

type Counter added in v0.3.0

type Counter interface {
	Inc() (counter Counter) // Inc increments the counter. Supports method chaining
	Dec() (counter Counter) // Dec decrements the counter but not beyond zero. Supports method chaining
	// Add adds a positive or negative delta
	Add(delta int64) (counter Counter)
}

Counter is the data provider interface for a counter with Inc Dec SetValue operations.

  • value is a monotonously increasing value counting Inc and positive Add occurrences. value is used to count the total occurrences of something.
  • running is the sum of Inc Dec and Add operations, but alaways 0 or greater. running is used to count the currently operating instances of something.
  • max is the maxmimum value reached by running

Counter is thread-safe.

type CounterID added in v0.3.0

type CounterID string

CounterID is a unique type containing counter names

type CounterSet added in v0.4.29

type CounterSet interface {
	// GetCounters gets a list and a map for consuming counter data
	GetCounters() (orderedKeys []CounterID, m map[CounterID]any)
	// ResetCounters resets all counters to their initial state
	ResetCounters(stopRateCounters bool)
}

CounterSet is the consumer interface for a counter set

type CounterValues added in v0.3.0

type CounterValues interface {
	Clone() (counterValues CounterValues)                           // Clone takes a snapshot of a counter state.
	CloneReset(stopRateCounters bool) (counterValues CounterValues) // CloneReset takes a snapshot of a counter state and resets it to its initial state.
	Get() (value, running, max uint64)
	Value() (value uint64)
	Running() (running uint64)
	Max() (max uint64)
}

CounterValues is the consumer interface for a counter.

  • value holds the current value from Inc or SetValue operations
  • running hold the combining result of Inc and Dec operations. It is not affected by SetValue
  • max is the maximum of running counter or SetValue operations
  • values are uint64, Counter is thread-safe

type Counters added in v0.3.0

type Counters interface {
	// GetOrCreateCounter is used by the data provider of a counter.
	//	- A counter has Inc and Dec operations.
	//	- Counters can be used to track number of currently operating invocations.
	//	- if period is present and positive, the counter created is a RateCounter.
	// Rate counters use threads, one per distinct period requested.
	// Rate counter threads are monitored by the provided Go group and failure
	// may force a counter restart or cause application termination. Failures
	// are typically caused by panics in the counter update task.
	GetOrCreateCounter(name CounterID, period ...time.Duration) (counter Counter)
	// GetOrCreateCounter is used by the data provider of a datapoint.
	// A datapoint supports SetValue operation.
	// A datapoint tracks a quantity such as a latency value.
	GetOrCreateDatapoint(name CounterID, period time.Duration) (datapoint Datapoint)
}

Counters is the data provider interface for a counter set.

  • max and running values are offered
  • Counters and datapointy are thread-safe
  • counters may be used to determine that code abide by intended paralellism and identifying hangs or abnormalisms.
  • Printing counters every second can verify adequate progress and possibly identify blocking of threads or swapping and garbage collection outages.

type CountersFactory added in v0.3.0

type CountersFactory interface {
	// NewCounters returns a counter container.
	// is useCounters is false, the container does not actually do any counting.
	NewCounters(useCounters bool, g0 GoGen) (counters Counters)
}

CountersFactory is an abstract counter factory. CountersFactory enables providing of different counter implementations.

type CyclicWait added in v0.4.29

type CyclicWait struct {
	// contains filtered or unexported fields
}

CyclicWait allows any number of threads to wait for a next occurrence.

  • a parent context may be passed in that on cancel triggers the wait and prevents further cycles
  • a channel can be obtained that sends one item on the next trig but never closes
  • a channel can be obtained that closes on next trig
  • next trig can be awaited
  • a did-occurer object can be obtained that returns true once the cycle trigs.
  • a context can be obtained that cancels on the next trig
  • the cycles can be permanently canceled or trigged and rearmed

func NewCyclicWait added in v0.4.29

func NewCyclicWait(ctx context.Context) (onceReceiver *CyclicWait)

NewCyclicWait returns a channel that will send one item when the context cancels or immediately if the context was already canceled.

func (*CyclicWait) Cancel added in v0.4.29

func (cw *CyclicWait) Cancel()

Cancel cancels the object and prevents rearming.

func (*CyclicWait) CancelAndRearm added in v0.4.29

func (cw *CyclicWait) CancelAndRearm() (wasRearmed bool)

CancelAndRearm trigs the object and then rearms unless a possible parent context has been canceled.

func (*CyclicWait) Ch added in v0.4.29

func (cw *CyclicWait) Ch() (ch <-chan struct{})

Ch returns a channel that will emit one item on the next trig. It will then not send anything else. the channel never closes.

func (*CyclicWait) Context added in v0.4.29

func (cw *CyclicWait) Context() (ctx context.Context)

Context returns a context that cancels on the next trig.

func (*CyclicWait) DidOccurer added in v0.4.29

func (cw *CyclicWait) DidOccurer() (didOccurer *OnceWaiterRO)

DidOccurer returns an object with a DidOccur method returning true after this cycle has trigged.

func (*CyclicWait) Done added in v0.4.29

func (cw *CyclicWait) Done() (done <-chan struct{})

Done returns a channel that will close on the next trig or parent context cancel. Similar to the Done method of a context.

func (*CyclicWait) IsCancel added in v0.4.29

func (cw *CyclicWait) IsCancel() (isCancel bool)

IsCancel returns whether Cancel has been invoked. ISCancel will return false during CancelAndRearm cycles.

func (*CyclicWait) Wait added in v0.4.29

func (cw *CyclicWait) Wait()

Wait waits until the next trig or parent context cancel.

type DB added in v0.4.12

type DB interface {
	Exec(partition DBPartition, query string, ctx context.Context,
		args ...any) (execResult ExecResult, err error)
	Query(partition DBPartition, query string, ctx context.Context,
		args ...any) (sqlRows *sql.Rows, err error)
	QueryRow(partition DBPartition, query string, ctx context.Context,
		args ...any) (sqlRow *sql.Row, err error)
	QueryString(partition DBPartition, query string, ctx context.Context,
		args ...any) (value string, err error)
	QueryInt(partition DBPartition, query string, ctx context.Context,
		args ...any) (value int, err error)
	Close() (err error)
}

type DBFactory added in v0.4.12

type DBFactory interface {
	NewDB(
		dsnr DataSourceNamer,
		schema func(dataSource DataSource, ctx context.Context) (err error)) (db DB)
}

DBFactory is a standardized way to obtain DB objects

type DBPartition added in v0.4.14

type DBPartition string

type DSNrFactory added in v0.4.14

type DSNrFactory interface {
	NewDSNr(appName string) (dsnr DataSourceNamer)
}

type DataSource added in v0.4.12

type DataSource interface {
	PrepareContext(ctx context.Context, query string) (stmt *sql.Stmt, err error)
	Close() (err error)
}

type DataSourceNamer added in v0.4.12

type DataSourceNamer interface {
	DSN(partition ...DBPartition) (dataSourceName string)
	DataSource(dsn string) (dataSource DataSource, err error)
}

type Datapoint added in v0.4.29

type Datapoint interface {
	// SetValue records a value at time.Now().
	// SetValue supports method chaining.
	SetValue(value uint64) (datapoint Datapoint)
}

Datapoint tracks a value with average, max-min, and increase/decrease rates.

type DatapointValue added in v0.4.29

type DatapointValue interface {
	CloneDatapoint() (datapoint Datapoint)      // Clone takes a snapshot of a counter state.
	CloneDatapointReset() (datapoint Datapoint) // CloneReset takes a snapshot of a counter state and resets it to its initial state.
	GetDatapoint() (value, max, min uint64, isValid bool, average float64, n uint64)
	DatapointValue() (value uint64)
	DatapointMax() (max uint64)
	DatapointMin() (min uint64)
}

type Debouncer added in v0.4.26

type Debouncer[T any] struct {
	// contains filtered or unexported fields
}

func NewDebouncer

func NewDebouncer[T any](d time.Duration, in <-chan T,
	sender func([]T),
	errFn func(err error), ctx context.Context) (db *Debouncer[T])

Debouncer debounces event stream values. sender and errFb functions must be thread-safe. Debouncer is shutdown by in channel close or via context.

func (*Debouncer[T]) Wait added in v0.4.26

func (db *Debouncer[T]) Wait()

type Dent added in v0.3.0

type Dent interface {
	// Name is utf-8 base path in device file system.
	// Name is base name, ie. file name and extension.
	Name() (name string)
	// Modified time, the time file contents was changed, second precision, continuous time
	Modified() (modified time.Time)
	// IsDir indicates directory.
	// LIST only support symbolic link, directory and regular file types
	IsDir() (isDir bool)
	// IsRegular indicates regular file, ie. not a directory or symbolic link.
	// LIST only support symbolic link, directory and regular file types
	IsRegular() (isRegular bool) // ie.not directory or symlink
	// Perm returns os.FileMode data.
	// 9-bit Unix permissions per os.FilePerm.
	// LIST also supports directory and symlink bits
	Perm() (perm fs.FileMode)
	// Size is limited to 4 GiB-1
	Size() (size uint32)
}

Dent is the information returned by adb ls or LIST

type Devicette added in v0.3.0

type Devicette interface {
	// Serial returns the serial number for this device
	Serial() (serial AndroidSerial)
	// Shell executes a shell command on the device.
	// The resulting socket can be obtained either using the reader callback,
	// which is a socket connection to the device,
	// or by collecting the out string.
	Shell(command string, reader func(conn io.ReadWriteCloser) (err error)) (out string, err error)
	/*
		Pull copies a remote file or directory on the Android device to a local file system location.
		the local file must not exist.
		Pull refuses certain files like product apks. shell cat works
	*/
	Pull(remotePath, nearPath string) (err error)
	/*
		List has some peculiarities:
		If remoteDir is not an existing directory, an empty list is returned.
		Entries with insufficient permisions are ignored.
		Update: . and .. are removed, adb LIST ortherwise do return those.
		File mode: the only present type bits beyond 9-bit Unix permissions are
		symlink, regular file and directory.
		File size is limited to 4 GiB-1.
		Modification time resolution is second and range is confined to a 32-bit Unix timestamp.
	*/
	List(remoteDir string) (dFileInfo []Dent, err error)
}

Devicette is a generic implementation of the capabilities of a device implementing the adb Android debug bridge protocol

type DevicetteFactory added in v0.4.0

type DevicetteFactory interface {
	// NewDevicette creates a Devicette interacting with remote adb Android Debug Bridge
	// devices via an adb server available at the socket address address
	NewDevicette(address AdbSocketAddress, serial AndroidSerial, ctx context.Context) (devicette Devicette)
}

DevicetteFactory describes how Devicette objects are obtained.

type DoErr added in v0.4.26

type DoErr interface {
	DoErr(op func() (err error)) (err error)
}

type Doneable added in v0.4.12

type Doneable interface {
	Add(delta int)
	Done()
}

Doneable is the callee part of sync.Waitgroup and other implementations Doneable is a many-to-many relation. Doneable allows the callee to instatiate and invoke any number of things that are awaitable by the caller.

… = NewSomething(&waitsForLots, &shutsDownLots)
go someThread(&waitsForLots, &shutsDownLots)
func someThread(Doneable w, context.Context ctx) {
  defer w.Done()
  w.Add(2)
  go somethingElse()

func EnsureDoneable added in v0.4.12

func EnsureDoneable(done Doneable) (done2 Doneable)

type DoneableNil added in v0.4.12

type DoneableNil struct{}

func (*DoneableNil) Add added in v0.4.12

func (de *DoneableNil) Add(delta int)

func (*DoneableNil) Done added in v0.4.12

func (de *DoneableNil) Done()

type Element added in v0.4.29

type Element[T comparable] interface {
	Value() (value T)
	fmt.Stringer
}

Element represents an element of a set that has a unique value and is printable. set element values are unique but not necessarily ordered. set.Element is an implementation.

type ElementFull added in v0.4.33

type ElementFull[T comparable] interface {
	Element[T]
	Description() (full string)
}

type Enum added in v0.4.33

type Enum[T any] interface {
	KeyEnum[string, T]
}

Enum is an enumeration using string keys mapping to a value type T. T is a unique named type that separates the values of this enumeration from all other values.

type EnumItem added in v0.4.33

type EnumItem[K constraints.Ordered, V any] interface {
	Key() (key K)               // Key returns the key for this enumeration value
	Description() (desc string) // Description returns a descriptive sentence for this enumeration value
	Value() (value V)           // Value returns this enumeration value’s value using the restricted type

	fmt.Stringer
}

EnumItem is a generic interface for enumeration item implementations. Enumeration items are ordered by the K key type.

  • K is a key type whose values map to restricted type V values one-to-one.
  • V is a restricted type for enumeration values that may store more efficiently compared to a portable type.

type Err added in v0.4.12

type Err interface {
	error
}

Err is a public error

type ErrorCloser added in v0.4.20

type ErrorCloser interface {
	InvokeIfError(addError func(err error))
	Close()
}

type ErrorManager added in v0.4.20

type ErrorManager interface {
	Ch() (ch <-chan GoError)
}

type ErrorSink added in v0.4.20

type ErrorSink interface {
	AddError(err error)
}

type ExecResult added in v0.4.14

type ExecResult interface {
	Get() (ID int64, rows int64)
	String() (s string)
}

type FSLocation

type FSLocation interface {
	Directory() (directory string)
}

type FanOut

type FanOut struct {
	ErrCh   chan error
	Results chan interface{}
	// contains filtered or unexported fields
}

func NewFanOut

func NewFanOut() (fo *FanOut)

func (*FanOut) Do

func (cr *FanOut) Do(name string, proc FanProc)

Do executes a procedure in a goroutine that has no result other than a possible non-nil error

func (*FanOut) Run

func (cr *FanOut) Run(name string, thunk FanThunk)

Run executes a thunk in a goroutine with a possible non-nil result and a possible non-nil error

func (*FanOut) Wait

func (cr *FanOut) Wait()

Wait waits for all Do and Run invocations to complete, then shuts down

type FanProc

type FanProc func() (err error)

type FanThunk

type FanThunk func() (result interface{}, err error)

type Frame added in v0.4.20

type Frame interface {
	Loc() (location *pruntime.CodeLocation)
	Args() (args string)
	String() (s string)
}

type FutureCore added in v0.4.26

type FutureCore[T any] struct {
	// contains filtered or unexported fields
}

FutureCore implements a future with value and error obtain in the Wait method. Thread-safe

  • any number of threads can use IsDone and Wait methods
  • the function fn calculating the result must be thread-safe
  • — fn execution is in a new thread
  • Wait waits for fn to complete and returns its value and error
  • IsDone checks whether the value is present
  • FutureValue is in a separate type so that it can be sent on a channel

func NewFutureCore added in v0.4.26

func NewFutureCore[T any](resolver Resolver[T]) (futureCore *FutureCore[T])

NewFutureCore executes fn and returns a future for its result.

func (*FutureCore[T]) IsDone added in v0.4.26

func (f *FutureCore[T]) IsDone() (isDone bool)

IsDone returns whether the future result is present

func (*FutureCore[T]) Wait added in v0.4.26

func (f *FutureCore[T]) Wait() (value T, isPanic bool, err error)

Wait block until the result is available and returns it

type FutureValue added in v0.4.26

type FutureValue[T any] struct {
	// contains filtered or unexported fields
}

FutureValue is a container for the value of the future.

type Go added in v0.4.12

type Go interface {
	// Register performs no function but allows the Go object to collect
	// information on the new thread.
	Register()
	// AddError emits a non-fatal error.
	AddError(err error)
	// Go returns a Go object to be provided as a go-statement function-argument
	//	in a function call invocation launching a new gorotuine thread.
	//	- the new thread belongs to the same GoGroup thread-group as the Go
	//		object whose Go method was invoked.
	Go() (g1 Go)
	// SubGo returns a GoGroup thread-group whose fatal and non-fatel errors go to
	//	the Go object’s parent thread-group.
	//	- a SubGo is used to ensure sub-threads exiting prior to their parent thread
	//		or to facilitate separate cancelation of the threads in the subordinate thread-group.
	//	- fatal errors from SubGo threads are handled in the same way as those of the
	//		Go object, typically terminating the application.
	//   - the SubGo thread-group terminates when both its own threads have exited and
	//	- the threads of its subordinate thread-groups.
	SubGo(onFirstFatal ...GoFatalCallback) (subGo SubGo)
	// SubGroup returns a thread-group with its own error channel.
	//	- a SubGroup is used for threads whose fatal errors should be handled
	//		in the Go thread.
	//	- The threads of the Subgroup can be canceled separately.
	//		- SubGroup’s error channel collects fatal thread terminations
	//		- the SubGroup’s error channel needs to be read in real-time or after
	//		SubGroup termination
	//   - non-fatal errors in SubGroup threads are sent to the Go object’s parent
	//		similar to the AddError method
	//   - the SubGroup thread-group terminates when both its own threads have exited and
	//	- the threads of its subordinate thread-groups.
	SubGroup(onFirstFatal ...GoFatalCallback) (subGroup SubGroup)
	// Done indicates that this goroutine has finished.
	//	- err == nil means successful exit
	//	- non-nil err indicates a fatal error.
	// 	- Done is deferrable.
	Done(errp *error)
	// Wait awaits exit of this Go thread.
	Wait()
	// CancelGo signals to this Go thread to exit.
	CancelGo()
	// Cancel signals for the threads in this Go thread’s parent GoGroup thread-group
	// and any subordinate thread-groups to exit.
	Cancel()
	// Context will Cancel when the parent thread-group Cancels
	//	or Cancel is invoked on this Go object.
	// Subordinate thread-groups do not Cancel the context of the Go thread.
	Context() (ctx context.Context)
	ThreadInfo() (threadID ThreadID, createLocation pruntime.CodeLocation,
		funcLocation pruntime.CodeLocation, isValid bool)
	fmt.Stringer
}

Go provides methods for a running goroutione thread to be provided as a function argument in the go statement function call launching the thread.

  • Go.Cancel affects this Go thread only.
  • The Go Context is canceled when
  • — the parent GoGroup thread-group’s context is Canceled or
  • —a thread in the parent GoGroup thread-group initiates Cancel
  • Cancel by threads in sub ordinate thread-groups do not Cancel this Go thread

type GoError added in v0.4.12

type GoError interface {
	error // Error() string
	// Err retrieves the original error value
	Err() (err error)
	// Time provides when this error occurred
	Time() time.Time
	// IsThreadExit determines if this error is a thread exit
	//	- thread exits may have err nil
	//	- fatals are non-nil thread exits that may require specific actions such as
	//		applicaiton termination
	IsThreadExit() (isThreadExit bool)
	// IsFatal determines if this error is a fatal thread-exit, ie. a thread exiting with non-nil error
	IsFatal() (isThreadExit bool)
	// ErrContext returns in what situation this error occurred
	ErrContext() (errContext GoErrorContext)
	// Go provides the thread and goroutine emitting this error
	Go() (g0 Go)
	fmt.Stringer
}

GoError is an error or a thread exit associated with a goroutine Goer returns the Goer object handling the goroutine that originated the error

type GoErrorContext added in v0.4.29

type GoErrorContext uint8
const (
	// GeNonFatal indicates a non-fatal error ocurring during processing.
	// err is non-nil
	GeNonFatal GoErrorContext = iota + 1
	// GePreDoneExit indicates an exit value of a subordinate goroutine,
	// other than the final exit of the last running goroutine.
	// err may be nil
	GePreDoneExit
	GeLocalChan
	// GeExit indicates exit of the last goroutine.
	// err may be nil.
	// The error channel may close after GeExit.
	GeExit
)

func (GoErrorContext) String added in v0.4.29

func (ge GoErrorContext) String() (s string)

type GoFactory added in v0.4.29

type GoFactory interface {
	// NewG1 returns a light-weight thread-group.
	//	- G1Group only receives Cancel from ctx, it does not cancel this context.
	NewGoGroup(ctx context.Context, onFirstFatal ...GoFatalCallback) (g1 GoGroup)
}

type GoFatalCallback added in v0.4.29

type GoFatalCallback func(goGen GoGen)

GoFatalCallback receives the thread-group on its first fatal thread-exit

  • GoFatalCallback is an optional onFirstFatal argument to
  • —NewGoGroup
  • — SubGo
  • — SubGroup

type GoGen added in v0.4.29

type GoGen interface {
	// Go returns a G1 object to be provided as a go statement function argument.
	Go() (g1 Go)
	// SubGo returns a thread-group whose fatal errors go to GoGen’s parent.
	//   - both non-fatal and fatal errors in SubGo threads are sent to GoGen’s parent
	// 		like Go.AddError and Go.Done.
	//		- therefore, when a SubGo thread fails, the application will typically exit.
	//		- by awaiting SubGo, Go can delay its exit until SubGo has terminated
	//   - the SubGo thread-group terminates when the its thread exits
	SubGo(onFirstFatal ...GoFatalCallback) (subGo SubGo)
	// SubGroup creates a sub-ordinate thread-group
	SubGroup(onFirstFatal ...GoFatalCallback) (subGroup SubGroup)
	// Cancel terminates the threads in the Go consumer thread-group.
	Cancel()
	// Context will Cancel when the parent thread-group Cancels.
	// Subordinate thread-groups do not Cancel this context.
	Context() (ctx context.Context)
}

GoGen allows for new Go threads, new SubGo and SubGroup thread-groups and cancel of threads in the thread-group and its subordinate thread-groups.

  • GoGen can be a GoGroup or a Go object

type GoGroup added in v0.4.18

type GoGroup interface {
	// Go returns a G1 object to be provided as a go statement function argument.
	Go() (g1 Go)
	// SubGo returns athread-group whose fatal errors go to G1’s parent.
	//   - both non-fatal and fatal errors in SubGo threads are sent to G1’s parent
	// 		like G1.AddError and G1.Done.
	//		- therefore, when a SUbGo thread fails, the application will typically exit.
	//		- by awaiting SubGo, G1 can delay its exit until SubGo has terminated
	//   - the SubGo thread-group terminates when the its thread exits
	SubGo(onFirstFatal ...GoFatalCallback) (subGo SubGo)
	// SubGroup creates a sub-ordinate G1Group.
	//	- SubGroup fatal and non-fatal errors are sent to the parent G1Group.
	//	-	SubGroup-context initiated Cancel only affect threads in the SubGroup thread-group
	//	- parent-initiated Cancel terminates SubGroup threads
	//	- SubGroup only awaits SubGroup threads
	//	- parent await also awaits SubGroup threads
	SubGroup(onFirstFatal ...GoFatalCallback) (subGroup SubGroup)
	// Ch returns a channel sending the all fatal termination errors when
	// the FailChannel option is present, or only the first when both
	// FailChannel and StoreSubsequentFail options are present.
	Ch() (ch <-chan GoError)
	// Wait waits for all threads of this thread-group to terminate.
	Wait()
	// Cancel terminates the threads in this and subordinate thread-groups.
	Cancel()
	// Context will Cancel when the parent context Cancels.
	// Subordinate thread-groups do not Cancel this context.
	Context() (ctx context.Context)
	fmt.Stringer
}

GoGroup manages a thread-group.

  • A thread from this thread-group will terminate all threads in this and subordinate thread-groups if this thread-group was provided the FirstFailTerminates option, which is default.
  • A fatal thread-termination in a sub thread-group only affects this thread-group if the sub thread-group was provided a nil fatal function, the FirstFailTerminates option, which is default, and no explicit FailChannel option.
  • Fatal thread terminations will propagate to parent thread-groups if this thread group did not have a fatal function provided and was not explicitly provided the FailChannel option.
  • A Cancel in this thread-group or in a parent context cancels threads in this and all subordinate thread-groups.
  • A Cancel in a subordinate thread-group does not affect this thread-group.
  • Wait in this thread-group wait for threads in this and all subordinate thread-groups.

type Iterator added in v0.4.33

type Iterator[T any] interface {
	// Next advances to next item and returns it.
	// if the next item does exist, value is valid and hasValue is true.
	// if no next item exists, value is the data type zero-value and hasValue is false.
	Next() (value T, hasValue bool)
	// HasNext advances to next item and returns hasValue true if this next item does exists.
	HasNext() (hasValue bool)
	// NextValue advances to next item and returns it.
	// If no next value exists, the data type zero-value is returned.
	NextValue() (value T)
	// Same returns the same value again.
	// If a value does exist, it is returned in value and hasValue is true.
	// If a value does not exist, the data type zero-value is returned and hasValue is false.
	// If Next, FindNext or HasNext have not been invoked, Same first advances to the first item.
	Same() (value T, hasValue bool)
	// Has returns true if Same() or SameValue will return items.
	// If Next, FindNext or HasNext have not been invoked, Has first advances to the first item.
	Has() (hasValue bool)
	// SameValue returns the same value again.
	// If a value does not exist, the data type zero-value is returned.
	// If Next, FindNext or HasNext have not been invoked, SameValue first advances to the first item.
	SameValue() (value T)
	// Cancel release resources for this iterator.
	// Not every iterator requires a Cancel invocation.
	Cancel() (err error)
}

Iterator allows traversal of values. The iterators in parly.iterator are thread-safe and re-entrant, but generally, this depends on the iterator implementation used.

// triple-expression works for Iterator that do not require Cancel
for iterator := NewIterator(); iterator.HasNext(); {
	v := iterator.SameValue()
}

// conditional expression can be used with all iterators
iterator := NewIterator()
for iterator.HasNext() {
	v := iterator.SameValue()
}
if err = iterator.Cancel(); …

type KeyEnum added in v0.4.33

type KeyEnum[K constraints.Ordered, T any] interface {
	IsKey(key K) (isKey bool)            // IsKey checks whether key maps to an enumerated value
	Value(key K) (enum T, err error)     // Value looks up an enumerated value by key
	KeyIterator() (iterator Iterator[K]) // KeyIterator returns an iterator that iterates over all keys in order of definition

	IsValid(enum T) (isEnumValue bool)      // IsValid checks whether value is among enumerated values
	Key(value T) (key K, err error)         // Key gets the key value for an enumerated T value
	ValueAny(value any) (enum T, err error) // ValueAny attempts to convert any value to a T enumerated value
	Iterator() (iterator Iterator[T])       // Iterator returns an iterator that iterates over all enumerated values in order of definition
	Description(enum T) (desc string)       // Description gets a descriptive sentence for an enum value
	StringT(enum T) (s string)              // StringT provides a string representation for an enumeration value
	// Compare compares two T values.
	//	- result is 0 if the two values are considered equal
	//	- result is 1 if value1 is considered greater than value2
	//	- result is -1 if value1 is considered less than value2
	Compare(value1, value2 T) (result int)

	Name() (s string) // Name returns a short string naming this enumeration
	fmt.Stringer
}

KeyedEnum is an enumeration using a key type K mapping to a value type T.

  • T is a unique named type that separates the values of this enumeration from all other values.
  • K is a common type, often a single-word string, whose values are unique and maps one-to-one to a T value. K is constraints.Ordered and can be used as a map key.
  • The implementation’s stored type may be different from both K and T.

Some benefits with enumerations are:

  • unknown, illegal or value duplications are detected
  • integral values and their meanings can be printed
  • one set of integral values are not confused with another, eg. unix.AF_INET and unix.RTAX_BRD
  • Reveals the meaning when used as function arguments and other allowed values can be examined

type Moderator

type Moderator struct {
	// contains filtered or unexported fields
}

Moderator invokes functions at a limited level of parallelism. It is a ticketing system

m := NewModerator(20, ctx)
m.Do(func() (err error) { // waiting here for a ticket
  // got a ticket!
  …
  return or panic // ticket automatically returned
m.String() → waiting: 2(20)

func NewModerator

func NewModerator(parallelism uint64, ctx context.Context) (mo *Moderator)

NewModerator creates a cancelable Moderator used to limit parallelism

func NewModeratorFromCore added in v0.4.26

func NewModeratorFromCore(mc *ModeratorCore, ctx context.Context) (mo *Moderator)

NewModeratorFromCore allows multiple cancelable Moderators to share a ModeratorCore ticket pool

func (*Moderator) Do

func (mo *Moderator) Do(fn func())

Do calls fn limited by the moderator’s parallelism. If the moderator is shut down, ErrModeratorShutdown is returned

func (*Moderator) DoErr added in v0.4.26

func (mo *Moderator) DoErr(fn func() error) (err error)

func (*Moderator) Status

func (mo *Moderator) Status() (parallelism uint64, active uint64, waiting uint64, isShutdown bool)

func (*Moderator) String

func (mo *Moderator) String() (s string)

type ModeratorCore added in v0.4.26

type ModeratorCore struct {
	// contains filtered or unexported fields
}

ModeratorCore invokes functions at a limited level of parallelism. ModeratorCore is a ticketing system. ModeratorCore does not have a cancel feature.

m := NewModeratorCore(20, ctx)
m.Do(func() (err error) { // waiting here for a ticket
  // got a ticket!
  …
  return or panic // ticket automatically returned
m.String() → waiting: 2(20)

func NewModeratorCore added in v0.4.26

func NewModeratorCore(parallelism uint64) (mo *ModeratorCore)

NewModerator creates a new Moderator used to limit parallelism

func (*ModeratorCore) Do added in v0.4.26

func (mo *ModeratorCore) Do(fn func())

Do calls fn limited by the moderator’s parallelism. Do blocks until a ticket is available Do uses the same thread.

func (*ModeratorCore) Status added in v0.4.26

func (mo *ModeratorCore) Status() (parallelism uint64, active uint64, waiting uint64)

func (*ModeratorCore) String added in v0.4.26

func (mo *ModeratorCore) String() (s string)

type NBChan added in v0.4.0

type NBChan[T any] struct {
	perrors.ParlError // thread panics
	// contains filtered or unexported fields
}

NBChan is a non-blocking send channel with trillion-size buffer.

  • NBChan can be used as an error channel where the thread does not block from a delayed or missing reader.
  • errors can be read from the channel or fetched all at once using GetAll
  • NBChan is initialization-free, thread-safe, idempotent, deferrable and observable.
  • Ch(), Send(), Close() CloseNow() IsClosed() Count() are not blocked by channel send and are panic-free.
  • Close() CloseNow() are deferrable.
  • WaitForClose() waits until the underlying channel has been closed.
  • NBChan implements a thread-safe error store perrors.ParlError.
  • NBChan.GetError() returns thread panics and close errors.
  • No errors are added to the error store after the channel has closed.
  • NBChan does not generate errors. When it does, errors are thread panics or a close error. Neither is expected to occur
  • the underlying channel is closed after Close is invoked and the channel is emptied
  • cautious consumers may collect errors using the GetError method when:
  • — the Ch receive-only channel is detected as being closed or
  • — await using WaitForClose returns or
  • — IsClosed method returns true

Usage:

var errCh parl.NBChan[error]
go thread(&errCh)
err, ok := <-errCh.Ch()
errCh.WaitForClose()
errCh.GetError()
…
func thread(errCh *parl.NBChan[error]) {
defer errCh.Close() // non-blocking close effective on send complete
var err error
defer parl.Recover(parl.Annotation(), &err, errCh.AddErrorProc)
errCh.Ch() <- err // non-blocking
if err = someFunc(); err != nil {
err = perrors.Errorf("someFunc: %w", err)
return

func NewNBChan added in v0.4.0

func NewNBChan[T any](ch ...chan T) (nbChan *NBChan[T])

NewNBChan instantiates a non-blocking trillion-size buffer channel. NewNBChan allows initialization based on an existing channel. NewNBChan does not need initialization and can be used like:

var nbChan NBChan[error]
go thread(&nbChan)

func (*NBChan[T]) Ch added in v0.4.0

func (nb *NBChan[T]) Ch() (ch <-chan T)

Ch obtains the receive-only channel

func (*NBChan[T]) Close added in v0.4.0

func (nb *NBChan[T]) Close() (didClose bool)

Close orders the channel to close once pending sends complete. Close is thread-safe, non-blocking and panic-free.

func (*NBChan[T]) CloseNow added in v0.4.5

func (nb *NBChan[T]) CloseNow(errp ...*error) (err error, didClose bool)

CloseNow closes without waiting for sends to complete. Close does not panic. Close is thread-safe. Close does not return until the channel is closed. Upon return, all invocations have a possible close error in err. if errp is non-nil, it is updated with error status

func (*NBChan[T]) Count added in v0.4.0

func (nb *NBChan[T]) Count() (unsentCount int)

Count returns number of unsent values

func (*NBChan[T]) DidClose added in v0.4.20

func (nb *NBChan[T]) DidClose() (didClose bool)

func (*NBChan[T]) GetAll added in v0.4.29

func (nb *NBChan[T]) GetAll() (allItems []T)

GetAll returns a slice of all available items held by the channel.

func (*NBChan[T]) IsClosed added in v0.4.0

func (nb *NBChan[T]) IsClosed() (isClosed bool)

IsClosed indicates whether the channel has actually closed.

func (*NBChan[T]) Send added in v0.4.0

func (nb *NBChan[T]) Send(value T)

Send sends non-blocking on the channel

func (*NBChan[T]) WaitForClose added in v0.4.5

func (nb *NBChan[T]) WaitForClose()

type Once added in v0.4.18

type Once struct {
	// contains filtered or unexported fields
}

parl.Once is an observable sync.Once parl.Once is thread-safe and does not require initialization No thread will return from Once.Do until once.Do has completed

func (*Once) Do added in v0.4.18

func (o *Once) Do(f func())

func (*Once) IsDone added in v0.4.18

func (o *Once) IsDone() (isDone bool)

type OnceWaiter added in v0.4.29

type OnceWaiter struct {
	// contains filtered or unexported fields
}

OnceWaiter allows any number of threads to wait for a single next occurrence.

  • the occurrence is trigger in any of two ways:
  • — a parent context may be passed in that on cancel triggers the wait
  • — the Cancel method is invoked
  • Ch returns a channel that sends one item on the occurrence but never closes
  • Done returns a channel that closes on the occurrence happens, similar to a context
  • Wait awaits the occurrence
  • a did-occurer object can be obtained that returns true once the cycle trigs.
  • a context can be obtained that cancels on the next trig
  • the cycles can be permanently canceled or trigged and rearmed

func NewOnceWaiter added in v0.4.29

func NewOnceWaiter(ctx context.Context) (onceReceiver *OnceWaiter)

NewOnceWaiter returns a channel that will send one item when the context cancels or immediately if the context was already canceled.

func (*OnceWaiter) Cancel added in v0.4.29

func (ow *OnceWaiter) Cancel()

Cancel triggers the occurrence

func (*OnceWaiter) Ch added in v0.4.29

func (ow *OnceWaiter) Ch() (ch <-chan struct{})

Ch returns a channel that will send one item on the occurrence.

  • the channel will not send anything else
  • the channel never closes.

func (*OnceWaiter) Context added in v0.4.29

func (cw *OnceWaiter) Context() (ctx context.Context)

Context returns a context that cancels on the occurrence

func (*OnceWaiter) DidOccur added in v0.4.29

func (ow *OnceWaiter) DidOccur() (didOccur bool)

DidOccur returns true when the occurrence has taken place

func (*OnceWaiter) Done added in v0.4.29

func (ow *OnceWaiter) Done() (done <-chan struct{})

Done returns a channel that will close on the occurrence.

  • Done is similar to the Done method of a context

func (*OnceWaiter) Wait added in v0.4.29

func (ow *OnceWaiter) Wait()

Wait waits until the ocurrence

type OnceWaiterRO added in v0.4.29

type OnceWaiterRO struct {
	// contains filtered or unexported fields
}

OnceWaiterRO allows any number of threads to wait for a single next occurrence. OnceWaiterRO is a OnceWaiter without the Cancel method

func NewOnceWaiterRO added in v0.4.29

func NewOnceWaiterRO(onceWaiter *OnceWaiter) (onceWaiterRO *OnceWaiterRO)

NewOnceWaiter returns a channel that will send one item when the context cancels or immediately if the context was already canceled.

func (*OnceWaiterRO) Ch added in v0.4.29

func (ow *OnceWaiterRO) Ch() (ch <-chan struct{})

func (*OnceWaiterRO) Context added in v0.4.29

func (ow *OnceWaiterRO) Context() (ctx context.Context)

func (*OnceWaiterRO) DidOccur added in v0.4.29

func (ow *OnceWaiterRO) DidOccur() (didOccur bool)

func (*OnceWaiterRO) Done added in v0.4.29

func (ow *OnceWaiterRO) Done() (done <-chan struct{})

func (*OnceWaiterRO) Wait added in v0.4.29

func (ow *OnceWaiterRO) Wait()

type Ordered added in v0.4.29

type Ordered[E any] interface {
	Slice[E] // Element() Length() Clear() List()
	// Insert adds an element to the ordered slice.
	// The implementation may allow duplicates.
	Insert(element E)
	// Delete removes an element to the ordered slice.
	// If the ordered slice does not contain element, the slice is not changed.
	Delete(element E)
	// Index returns index of the first occurrence of element in the ordered slice
	// or -1 if element is not in the slice.
	Index(element E) (index int)
	// Clone returns a clone of the ordered slice
	Clone() (ordered Ordered[E])
}

Ordered[E any] is an ordered list of values.

  • Ordered should be used when E is interface type or a small-sized value.
  • pslices.NewOrdered[E constraints.Ordered]() instantiates for comparable types, ie. not func map slice
  • pslices.NewOrderedAny[E any](cmp func(a, b E) (result int)) instantiates for any type or for custom sort order
  • pslices.NewOrderedPointers[E constraints.Ordered]() orders pointer to value, to be used for large structs or order of in-place data
  • those list implementations have Index O(log n)

type OrderedPointers added in v0.4.26

type OrderedPointers[E any] interface {
	Ordered[*E]
}

OrderedPointers[E any] is an ordered list of pointers sorted by the referenced values. OrderedPointers should be used when E is a large struct. pslices.NewOrderedPointers[E constraints.Ordered]() instantiates for pointers to comparable types, ie. not func slice map. pslices.NewOrderedAny instantiates for pointers to any type or for custom sort orders.

type Password

type Password interface {
	HasPassword() (hasPassword bool)
	Password() (password string)
}

type PemBytes added in v0.4.26

type PemBytes []byte

PemBytes bytes is 7-bit ascii string representing keys or certificates

type Periodically added in v0.4.16

type Periodically struct {
	// contains filtered or unexported fields
}

func NewPeriodically added in v0.4.16

func NewPeriodically(fn func(t time.Time), ctx context.Context, period ...time.Duration) (periodically *Periodically)

func (*Periodically) Wait added in v0.4.26

func (p *Periodically) Wait()

type PrintfFunc added in v0.4.29

type PrintfFunc func(format string, a ...any)

PrintfFunc is the signature for a printf-style function. This signature is implemented by:

  • parl.Sprintf
  • parl.Out parl.Outw parl.Log parl.Logw parl.Console parl.Consolew parl.Info parl.Debug parl.D parl.NoPrint
  • plog.(*LogInstance) similar methods
  • perrors.Errorf perrors.ErrorfPF
  • fmt.Printf fmt.Sprintf fmt.Errorf
  • pterm.(*StatusTerminal).Log pterm.(*StatusTerminal).LogTimeStamp

and compatible functions

type PriorityQueue added in v0.4.30

type PriorityQueue[V any, P constraints.Ordered] interface {
	// AddOrUpdate adds a new value to the prioirty queue or updates the priority of a value
	// that has changed.
	AddOrUpdate(value *V)
	// List returns the first n or default all values by priority
	List(n ...int) (valueQueue []*V)
}

PriorityQueue is a pointer-identity-to-value map of updatable values traversable by rank.

  • PriorityQueue operates directly on value by caching priority from the pritority function.
  • the AddOrUpdate method reprioritizes an updated value element
  • V is a value reference composite type that is comparable, ie. not slice map function. Preferrably, V is interface or pointer to struct type.
  • P is an ordered type such as Integer Floating-Point or string, used to rank the V values
  • values are added or updated using AddOrUpdate method distinguished by (computer science) identity
  • if the same comparable value V is added again, that value is re-ranked
  • priority P is computed from a value V using the priorityFunc function. The piority function may be examining field values of a struct
  • values can have the same rank. If they do, equal rank is provided in insertion order
  • pmaps.NewPriorityQueue[V any, P constraints.Ordered]
  • pmaps.NewRankingThreadSafe[V comparable, R constraints.Ordered]( ranker func(value *V) (rank R)))

type PrivateKey added in v0.4.26

type PrivateKey interface {
	crypto.Signer                                  // Public() Sign()
	DER() (privateKeyDer PrivateKeyDer, err error) // untyped key material, both private and public keys
	DERe() (privateKeyDer PrivateKeyDer)
	PEM() (pemBytes PemBytes, err error)
	PEMe() (pemBytes PemBytes)
	PublicKey() (publicKey PublicKey)
	Algo() (algo x509.PublicKeyAlgorithm)
	// validate ensures the private key is present, modeled after rsa.Validate
	Validate() (err error)
}

PrivateKey implements crypto.Signer and can therefore be used as tls.Certificate.PrivateKey

type PrivateKeyDer added in v0.4.26

type PrivateKeyDer []byte

PublicKeyDer is a binary encoding of a private and public key-pair

type PrivateKeyFactory added in v0.4.26

type PrivateKeyFactory interface {
	NewPrivateKey(algo x509.PublicKeyAlgorithm) (privateKey PrivateKey, err error)
}

type PublicKey added in v0.4.26

type PublicKey interface {
	DER() (publicKeyDer PublicKeyDer, err error)
	DERe() (publicKeyDer PublicKeyDer)
	PEM() (pemBytes PemBytes, err error)
	PEMe() (pemBytes PemBytes)
	Equal(x crypto.PublicKey) (isEqual bool)
	Algo() (algo x509.PublicKeyAlgorithm)
}

PublicKey contains a public key extracted from a KeyPair

type PublicKeyDer added in v0.4.26

type PublicKeyDer []byte

PublicKeyDer is a binary encoding of a public key

type Rate added in v0.4.29

type Rate interface {
	Clone() (rate Rate)
	Delta() (delta uint64)
	Duration() (duration time.Duration)
	HasValue() (hasValue bool)
	fmt.Stringer
}

Rate describes a rate datapoint.

  • may be positive or negative

type RateCounterValues added in v0.4.29

type RateCounterValues interface {
	CounterValues // Clone() CloneReset() Get() Value() Running() Max() DidSetValue()
	Rates() (rates map[RateType]int64)
}

CounterValues is the consumer interface for a rate counter. The rate counter provides rates over a provided time period in a map of int64 values.

  • ValueRate current rate of increase in value
  • ValueMaxRate the maxmimum seen rate of increase in value
  • ValueRateAverage the average rate of increase in value taken over up to 10 periods
  • RunningRate the current rate of change in running, may be negative
  • RunningMaxRate the max positive rate of increase seen in running
  • RunningMaxDecRate the max rate of decrease in running, a 0 or negative value
  • RunningAverage the average of running taken over up to 10 periods

type RateType added in v0.4.29

type RateType int
const (
	ValueRate RateType = iota
	ValueMaxRate
	ValueRateAverage
	RunningRate
	RunningMaxRate
	RunningMaxDecRate
	RunningAverage
	NotAValue
)

type Receiver added in v0.4.12

type Receiver[T any] interface {
	// Get receives one element from the conduit.
	// ok is true if Get did receive a valid element.
	// Get blocks if no element is available.
	// Get returns the zero-value of T and ok false if the coundit
	// is closed and empty.
	Get() (value T, ok bool)
	// GetSlice receives one or more elements from the conduit.
	// ok is true if GetSlice did receive valid elements.
	// GetSlice blocks if no elements are available.
	// GetSlice returns the zero-value of T and ok false if the coundit
	// is closed and empty.
	GetSlice(max int) (values []T, ok bool)
	// IsCanceled determines if the conduit has been canceled.
	// The conudit may still have data elements.
	IsCanceled() (IsCanceled bool)
	// IsEmpty determines if the conduit is empty
	IsEmpty() (isEmpty bool)
	// Count retrieves how many data elements are waiting in the counduit
	Count() (count int)
	// WaitCount retrieves how many conduit receive invocations are waiting for data
	WaitCount() (waitCount int)
}

type Resolver added in v0.4.29

type Resolver[T any] func() (value T, err error)

type Sender added in v0.4.12

type Sender[T any] interface {
	// Put is thread-safe, non-blocking, panic-free.
	Put(value T) (IsCanceled bool)
	// PutSlice is thread-safe, non-blocking, panic-free.
	// PutSlice(nil) determines if the counduit is canceled.
	PutSlice(values []T) (IsCanceled bool)
}

type SerialDo

type SerialDo struct {
	// contains filtered or unexported fields
}

SerialDo serializes a thunk.

func NewSerialDo

func NewSerialDo(
	thunk func(at time.Time),
	eventFn func(event *SerialDoEvent),
	errFn func(err error),
	ctx context.Context) (sdo *SerialDo)

NewSerialDo serializes .Do invocations eventFn must be thread-safe. errFn must be thread-safe.

func (*SerialDo) Do

func (sdo *SerialDo) Do(now ...time.Time) (isPending bool)

Do invokes the thunk serially if SerialDo is idle, Do launches thunk via a thread if SerialDo is busy, Do makes it pending if SerialDo is already pending, Do does nothing Do returns true if SerialDo state is pending Do is non-blocking and thread-safe

func (*SerialDo) State added in v0.4.26

func (sdo *SerialDo) State() (
	busySince time.Time,
	pendingSince time.Time,
	isCancel bool,
	isWaitComplete bool,
)

func (*SerialDo) String added in v0.4.26

func (sdo *SerialDo) String() (s string)

func (*SerialDo) Wait added in v0.4.26

func (sdo *SerialDo) Wait()

type SerialDoCore added in v0.4.26

type SerialDoCore struct {
	// contains filtered or unexported fields
}

func NewSerialDoCore added in v0.4.26

func NewSerialDoCore(thunk func(at time.Time), errFn func(err error), ctx context.Context) (serialDoCore *SerialDoCore)

func (*SerialDoCore) Do added in v0.4.26

func (sdo *SerialDoCore) Do(now ...time.Time) (isPending bool, isShutdown bool)

func (*SerialDoCore) Wait added in v0.4.26

func (sdo *SerialDoCore) Wait(at time.Time)

type SerialDoEvent

type SerialDoEvent struct {
	ID SerialDoID
	SerialDoType
	time.Time
	*SerialDo
}

func NewSerialDoEvent added in v0.4.26

func NewSerialDoEvent(typ SerialDoType, t time.Time, serialDo *SerialDo) (event *SerialDoEvent)

func (*SerialDoEvent) String added in v0.4.26

func (e *SerialDoEvent) String() (s string)

type SerialDoID added in v0.4.26

type SerialDoID string

func (SerialDoID) String added in v0.4.26

func (id SerialDoID) String() (s string)

type SerialDoType added in v0.4.26

type SerialDoType uint8
const (
	// The SerialDo is invoking thunk from idle, now time
	SerialDoLaunch SerialDoType = 1 + iota
	// The SerialDo enqueued a future invocation, request time
	SerialDoPending
	// The SerialDo is launching a pending invocation, request time
	SerialDoPendingLaunch
	// The SerialDo returned to being idle, time is busy since
	SerialDoIdle
)

func (SerialDoType) IsValid added in v0.4.26

func (e SerialDoType) IsValid() (isValid bool)

func (SerialDoType) String added in v0.4.26

func (e SerialDoType) String() (s string)

type ServerFactory added in v0.3.0

type ServerFactory interface {
	// Adb connects to an adb adb Android Debug Bridge server on a specified tcp socket
	Adb(address AdbSocketAddress, ctx context.Context) (server Serverette)
	// AdbLocalhost connects to an adb Android Debug Bridge server on the local computer
	AdbLocalhost(ctx context.Context) (server Serverette)
}

ServerFactory describes how AdbServer objects are obtained. Such servers may use duifferent protocol implementations from Adbette

type Serverette added in v0.3.0

type Serverette interface {
	AdbAdressProvider // AdbSocketAddress()
	// DeviceSerialList lists serials for the currently online Android devices
	DeviceSerialList() (serials []AndroidSerial, err error)
	// DeviceStati lists all serials currently known to the server along with
	// their current status.
	// The two slices correspond and are of the same length
	DeviceStati() (serials []AndroidSerial, stati []AndroidStatus, err error)
	// TrackDevices emits serial numbers for devices that come online.
	// serials are sent on the serials channel.
	// if err is non-nil, set-up of failed.
	// The errs channel closes when watching stops
	// Watching is stopped by calling cancel function or when the server’s context terminates
	TrackDevices() (tracker Trackerette, err error)
}

Serverette is a generic representation of an adb server running on a host.

command-line adb: As of Android 12, Android Debug Bridge version 1.0.41 Version 32.0.0-8006631 has the following commands are supported: devices connect disconnect pair forward ppp reverse mdns push pull sync shell emu install install-multiple install-multiple-package uninstall bugreport jdwp logcat disable-verity enable-verity keygen wait-for* get-state get-serialno get-devpath remount reboot sideload root unroot usb tcpip start-server kill-server reconnect attach detach

type ServeretteFactory added in v0.3.0

type ServeretteFactory interface {
	// Adb connects to an adb adb Android Debug Bridge server on a specified tcp socket.
	// address is a string default "localhost:5037" and default port ":5037".
	// adbetter is a factory for Adbette connections.
	NewServerette(address AdbSocketAddress, adbetter Adbetter, ctx context.Context) (server Serverette)
}

ServeretteFactory is a Server connection factory for Adbette implementations

type Set added in v0.4.29

type Set[T comparable] interface {
	// IsValid returns whether value is part of this set
	IsValid(value T) (isValid bool)
	// Element returns the element representation for value or
	// nil if value is not an element of the set.
	Element(value T) (element Element[T])
	// StringT returns a string representation for an element of this set.
	// if value is not a valid element, a fmt.Printf value is output like ?'%v'
	StringT(value T) (s string)
	Description(value T) (s string)
	fmt.Stringer
}

Set contains unique set elements of a particular type T that are printable. A set stores unique values without any particular order. The reasons for using a set over const are:

  • set memberhip enforcement
  • available string representation for elements
  • additional fields or methods assigned to elements
  • optional type safety

Usage:

const IsSame NextAction = 0
type NextAction uint8
func (na NextAction) String() (s string) {return nextActionSet.StringT(na)}
var nextActionSet = set.NewSet(yslices.ConvertSliceToInterface[
  set.Element[NextAction],
  parly.Element[NextAction],
]([]set.Element[NextAction]{{ValueV: IsSame, Name: "IsSame"}}))

type SetFactory added in v0.4.29

type SetFactory[T comparable] interface {
	// NewSet returns a set of a finite number of elements.
	// Usage:
	//
	//	const IsSame NextAction = 0
	//	type NextAction uint8
	//	func (na NextAction) String() (s string) {return nextActionSet.StringT(na)}
	//	var nextActionSet = set.NewSet(yslices.ConvertSliceToInterface[
	//	  set.Element[NextAction],
	//	  parly.Element[NextAction],
	//	]([]set.Element[NextAction]{{ValueV: IsSame, Name: "IsSame"}}))
	NewSet(elements []Element[T]) (interfaceSet Set[T])
}

type Slice added in v0.4.29

type Slice[E any] interface {
	// Element returns element by index.
	// if index is negative or the length of the slice or larger, the E zero-value is returned.
	Element(index int) (element E)
	// Length returns number of elements in the slice
	Length() (length int)
	// Clear empties the ordered slice
	Clear()
	// List returns a clone of the internal slice
	List(n ...int) (list []E)
}

Slice is an embedded interface for slice interface types

type Stack added in v0.4.20

type Stack interface {
	ID() ThreadID
	Status() ThreadStatus
	IsMain() (isMain bool)
	Frames() (frames []Frame)
	Creator() (creator *pruntime.CodeLocation)
	Shorts(prepend string) (s string)
	String() (s string)
}

type Statuser added in v0.3.0

type Statuser interface {
	Set(status string) (statuser Statuser)
	Shutdown()
}

Statuser prints threads that do not react within a specified time frame. This is useful during early multi-threaded design when it is uncertain why work is not progressing. Is it your code or their code? Once the program concludes without hung threads, Tracer is the better tool to identify issues.

type StatuserFactory added in v0.3.0

type StatuserFactory interface {
	NewStatuser(useStatuser bool, d time.Duration) (statuser Statuser)
}

type SubGo added in v0.4.15

type SubGo interface {
	// Go returns a G1 object to be provided as a go statement function argument.
	Go() (g1 Go)
	// SubGo returns athread-group whose fatal errors go to G1’s parent.
	//   - both non-fatal and fatal errors in SubGo threads are sent to G1’s parent
	// 		like G1.AddError and G1.Done.
	//		- therefore, when a SUbGo thread fails, the application will typically exit.
	//		- by awaiting SubGo, G1 can delay its exit until SubGo has terminated
	//   - the SubGo thread-group terminates when the its thread exits
	SubGo(onFirstFatal ...GoFatalCallback) (subGo SubGo)
	// SubGroup creates a sub-ordinate G1Group.
	//	- SubGroup fatal and non-fatal errors are sent to the parent G1Group.
	//	-	SubGroup-context initiated Cancel only affect threads in the SubGroup thread-group
	//	- parent-initiated Cancel terminates SubGroup threads
	//	- SubGroup only awaits SubGroup threads
	//	- parent await also awaits SubGroup threads
	SubGroup(onFirstFatal ...GoFatalCallback) (subGroup SubGroup)
	// Wait waits for all threads of this thread-group to terminate.
	Wait()
	// Cancel terminates the threads in this and subordinate thread-groups.
	Cancel()
	// Context will Cancel when the parent context Cancels.
	// Subordinate thread-groups do not Cancel this context.
	Context() (ctx context.Context)
	fmt.Stringer
}

type SubGroup added in v0.4.29

type SubGroup interface {
	SubGo
	// Ch returns a receive channel for fatal errors if this SubGo has LocalChannel option.
	Ch() (ch <-chan GoError)
	// FirstFatal allows to await or inspect the first thread terminating with error.
	// it is valid if this SubGo has LocalSubGo or LocalChannel options.
	// To wait for first fatal error using multiple-semaphore mechanic:
	//  firstFatal := g1.FirstFatal()
	//  for {
	//    select {
	//    case <-firstFatal.Ch():
	//    …
	// To inspect first fatal:
	//  if firstFatal.DidOccur() …
	FirstFatal() (firstFatal *OnceWaiterRO)
}

type SyncAdd added in v0.4.20

type SyncAdd interface {
	Add(delta int)
}

SyncWait provides sync.WaitGroup.Add()

type SyncDone added in v0.4.20

type SyncDone interface {
	Done()
}

SyncDone provides sync.WaitGroup.Done()

type SyncWait added in v0.4.20

type SyncWait interface {
	Wait()
}

SyncWait provides sync.WaitGroup.Wait()

type ThreadID added in v0.4.9

type ThreadID string

ThreadID is an opaque type that uniquley identifies a thread, ie. a goroutine. goid.GoID obtains ThreadID for the executing thread. ThreadID is comparable, ie. can be used as a map key. ThreadID can be cast to string using .String() func (threadID ThreadID) String() (s string)

func (ThreadID) String added in v0.4.12

func (threadID ThreadID) String() (s string)

type ThreadResult added in v0.4.9

type ThreadResult struct {
	Err // Error() string
}

type ThreadSafeMap added in v0.4.29

type ThreadSafeMap[K comparable, V any] interface {
	// Get returns the value mapped by key or the V zero-value otherwise.
	//   - the ok return value is true if a mapping was found.
	//   - O(1)
	Get(key K) (value V, ok bool)
	// GetOrCreate returns an item from the map if it exists otherwise creates it.
	//   - newV or makeV are invoked in the critical section, ie. these functions
	//     may not access the map or deadlock
	//   - if a key is mapped, its value is returned
	//   - otherwise, if newV and makeV are both nil, nil is returned.
	//   - otherwise, if newV is present, it is invoked to return a pointer ot a value.
	//     A nil return value from newV causes panic. A new mapping is created using
	//     the value pointed to by the newV return value.
	//   - otherwise, a mapping is created using whatever makeV returns
	//   - value insert is O(log n)
	GetOrCreate(
		key K,
		newV func() (value *V),
		makeV func() (value V),
	) (value V, ok bool)
	// Put saves or replaces a mapping
	Put(key K, value V)
	// Delete removes mapping using key K.
	//   - if key K is not mapped, the map is unchanged.
	//   - O(log n)
	Delete(key K)
	// Clear empties the map
	Clear()
	// Length returns the number of mappings
	Length() (length int)
	// Clone returns a shallow clone of the map
	Clone() (clone ThreadSafeMap[K, V])
	// List provides the mapped values, undefined ordering
	//   - O(n)
	List() (list []V)
}

ThreadSafeMap is a one-liner thread-safe mapping.

  • pmaps.NewRWMap[K comparable, V any]()

type ThreadStatus added in v0.4.12

type ThreadStatus string

ThreadStatus indicates the current stat of a thread most often it is "running"

type Timer

type Timer struct {
	Label string
	// contains filtered or unexported fields
}

Timer is a simple request timer

func NewTimer

func NewTimer(label string) (t *Timer)

NewTimer gets a simple timer with duration or string output

func (*Timer) End

func (t *Timer) End() (d time.Duration)

End gets duration

func (*Timer) Endms

func (t *Timer) Endms() (ms string)

Endms gets tring with duration in ms

type TraceGroup added in v0.4.20

type TraceGroup struct {
	WaitGroup
	// contains filtered or unexported fields
}

parl.TraceGroup is an observable sync.Waitgroup.

TraceGroup cannot be in parl because WaitAction imports goid

func (*TraceGroup) Add added in v0.4.20

func (wg *TraceGroup) Add(delta int)

func (*TraceGroup) Done added in v0.4.20

func (wg *TraceGroup) Done()

func (*TraceGroup) DoneBool added in v0.4.20

func (wg *TraceGroup) DoneBool() (isZero bool)

func (*TraceGroup) String added in v0.4.20

func (wg *TraceGroup) String() (s string)

type Tracer added in v0.3.0

type Tracer interface {
	// AssignTaskToThread assigns a Thread to a task
	AssignTaskToThread(threadID ThreadID, task TracerTaskID) (tracer Tracer)
	// RecordTaskEvent adds an event to the task threadID is currently assigned to.
	// If threadID is not assigned, a new task is created.
	RecordTaskEvent(threadID ThreadID, text string) (tracer Tracer)
	// Records returns the current map of tasks and their events
	Records(clear bool) (records map[TracerTaskID][]TracerRecord)
}

Tracer lists events in terms of tasks rather than per time or thread. A task is executed by threads assigned to it. Threads are uniquely identified by threadID. A task can have zero or one threads assigned at any one time. A thread can be assigned to zero or one tasks. Each task has an ID, a name and a list of events and thread assignments Tracer can record branching in the code and return that for a particular item being processed. For an item processed incorrectly, or when threads hang, Tracer will find unfavorable branching and last known locations.

type TracerFactory added in v0.3.0

type TracerFactory interface {
	// NewTracer creates tracer storage.
	// use false indicates a nil Tracer whose output will not be used
	NewTracer(use bool) (tracer Tracer)
}

type TracerRecord added in v0.3.0

type TracerRecord interface {
	Values() (at time.Time, text string)
}

type TracerTaskID added in v0.3.0

type TracerTaskID string

type Trackerette added in v0.3.0

type Trackerette interface {
	// Serials emit serial number as online devices become available
	Serials() (serials <-chan AndroidSerial)
	// Errs is available once Serials close. It returns any errors
	Errs() (err error)
	// Cancel shuts down the Tracker
	Cancel()
}

Trackerette represents a server connection emitting device serial numbers

type UniqueID added in v0.4.26

type UniqueID[T ~string] uint64

UniqueID is an executable-invocation-unique identifier generator. The format is a named-type small-integer numeric string suitable to distinguish multiple instances of a type. The type is ordered and can be converted to string.

Usage:

type MyType string
var generator parl.UniqueID[MyType]
someID := generator.ID()

func (*UniqueID[T]) ID added in v0.4.26

func (u *UniqueID[T]) ID() (unique T)

ID generates a unique string identifier. thread-safe

type UniqueIDTypedUint64 added in v0.4.29

type UniqueIDTypedUint64[T ~uint64] uint64

UniqueIDTypedUint64 generates integer-based opaque uniquely-typed IDs. Thread-safe.

  • Different named-type series have their own unique type and can be told apart.
  • The named types are ordered integer-based with String method implementing fmt.Stringer.

Usage:

type T uint64
var generator parl.UniqueIDTypedUint64[T]
func (t T) String() string { return generator.StringT(t) }
someID := generator.ID()

func (*UniqueIDTypedUint64[T]) ID added in v0.4.29

func (u *UniqueIDTypedUint64[T]) ID() (uniqueT T)

ID generates a unique ID of integral type. Thread-safe

func (*UniqueIDTypedUint64[T]) StringT added in v0.4.29

func (u *UniqueIDTypedUint64[T]) StringT(t T) (s string)

"parl.T:2", zero-value t prints "parl.T:0"

type UniqueIDUint64 added in v0.4.29

type UniqueIDUint64 uint64

UniqueIDUint64 generates executable-invocation-unique uint64 numbers 1… Different series of uint64 from different generators does not have identity, ie. they cannot be told apart. Consider UniqueIDTypedUint64 to have identity.

Usage:

var generator parl.UniqueIDUint64
id := generator.ID()

func (*UniqueIDUint64) ID added in v0.4.29

func (u *UniqueIDUint64) ID() (uniqueUint64 uint64)

ID generates a unique uint64 1…. thread-safe

func (*UniqueIDUint64) String added in v0.4.29

func (u *UniqueIDUint64) String() (s string)

type UniqueIDint added in v0.4.30

type UniqueIDint int

UniqueIDUint64 generates executable-invocation-unique uint64 numbers 1… Different series of uint64 from different generators does not have identity, ie. they cannot be told apart. Consider UniqueIDTypedUint64 to have identity.

Usage:

var generator parl.UniqueIDUint64
id := generator.ID()

func (*UniqueIDint) ID added in v0.4.30

func (u *UniqueIDint) ID() (uniqueID int)

ID generates a unique uint64 1…. thread-safe

func (*UniqueIDint) String added in v0.4.30

func (u *UniqueIDint) String() (s string)

type WaitAction added in v0.4.20

type WaitAction struct {
	ID     ThreadID
	Loc    pruntime.CodeLocation
	IsDone bool
	Delta  int
}

func NewWaitAction added in v0.4.20

func NewWaitAction(skipFrames int, delta int, isDone bool) (waitAction *WaitAction)

func (*WaitAction) String added in v0.4.20

func (wa *WaitAction) String() (s string)

type WaitDo added in v0.4.20

type WaitDo struct {
	Waiter
}

func (*WaitDo) IsExit added in v0.4.20

func (w *WaitDo) IsExit() (isExit bool)

type WaitGroup added in v0.3.0

type WaitGroup struct {
	sync.WaitGroup // Wait()
	// contains filtered or unexported fields
}

parl.WaitGroup is like a sync.Waitgroup that can be inspected. The Waiting method returns the number of threads waited for. parl.WaitGroup requires no initialization.

var wg parl.WaitGroup
wg.Add(1)
…
wg.Waiting()

func (*WaitGroup) Add added in v0.3.0

func (wg *WaitGroup) Add(delta int)

func (*WaitGroup) Counters added in v0.4.12

func (wg *WaitGroup) Counters() (adds int, dones int)

func (*WaitGroup) Done added in v0.3.0

func (wg *WaitGroup) Done()

func (*WaitGroup) DoneBool added in v0.4.20

func (wg *WaitGroup) DoneBool() (isExit bool)

func (*WaitGroup) IsZero added in v0.4.5

func (wg *WaitGroup) IsZero() (isZero bool)

func (*WaitGroup) String added in v0.4.20

func (wg *WaitGroup) String() (s string)

type Waitable added in v0.4.12

type Waitable interface {
	Wait() // similar to sync.WaitGroup.Wait()
}

Waitable is the invoker’s Wait part of sync.WaitGroup and other implementations. Waitable is a many-to-many relation. Waitable allows the caller to await exit and free of all invocations.

waitsForLots parl.WaitGroup
shutsDownLots parl.OnceChan
… = NewSomething(&waitsForLots, &shutsDownLots)
go someThread(&waitsForLots, &shutsDownLots)
func someThread(Doneable w, context.Context ctx) {
  defer w.Done()
  w.Add(2)
  go somethingElse()

type WaitedOn added in v0.4.20

type WaitedOn interface {
	SyncAdd
	SyncDone
	DoneBool() (isExit bool)
	IsZero() (isZero bool)
}

type Waiter added in v0.4.20

type Waiter interface {
	WaitedOn
	WaitingFor
}

waiter allows to use any of observable parl.WaitGroup or parl.TraceGroup

type WaitingFor added in v0.4.20

type WaitingFor interface {
	SyncAdd
	IsZero() (isZero bool)
	Counters() (adds, dones int)
	SyncWait
	String() (s string)
}

type WinOrWaiter added in v0.4.29

type WinOrWaiter struct {
	WinOrWaiterCore
}

WinOrWaiter picks a winner thread to carry out some task used by many threads.

  • threads arriving to an idle WinorWaiter are winners that complete the task
  • After a winning thread completes the task, it invokes WinnerDone
  • threads arriving to a WinOrWait in progress are held waiting until WinnerDone
  • the task is completed on demand, but only by the first thread requesting it

func NewWinOrWaiter added in v0.4.29

func NewWinOrWaiter(mustBeLater bool, g0 GoGen) (winOrWaiter *WinOrWaiter)

WinOrWaiter returns a semaphore used for completing an on-demand task by the first thread requesting it, and that result shared by subsequent threads held waiting for the result.

type WinOrWaiterCore added in v0.4.29

type WinOrWaiterCore struct {
	sync.Cond
	// contains filtered or unexported fields
}

WinOrWaiter picks a winner thread to carry out some task used by many threads.

  • threads arriving to an idle WinorWaiter are winners that complete the task
  • After a winning thread completes the task, it invokes WinnerDone
  • threads arriving to a WinOrWait in progress are held waiting until WinnerDone
  • the task is completed on demand, but only by the first thread requesting it

func NewWinOrWaiterCore added in v0.4.29

func NewWinOrWaiterCore(mustBeLater bool, g0 ...Go) (winOrWaiter *WinOrWaiterCore)

WinOrWaiter returns a semaphore used for completing an on-demand task by the first thread requesting it, and that result shared by subsequent threads held waiting for the result.

func (*WinOrWaiterCore) IsCancel added in v0.4.29

func (ww *WinOrWaiterCore) IsCancel() (isCancel bool)

func (*WinOrWaiterCore) WinOrWait added in v0.4.29

func (ww *WinOrWaiterCore) WinOrWait() (isWinner bool)

WinOrWaiter picks a winner thread to carry out some task used by many threads.

  • threads arriving to an idle WinorWaiter are winners that complete the task
  • After a winning thread completes the task, it invokes WinnerDone
  • threads arriving to a WinOrWait in progress are held waiting until WinnerDone
  • the task is completed on demand, but only by the first thread requesting it

func (*WinOrWaiterCore) WinnerDone added in v0.4.29

func (ww *WinOrWaiterCore) WinnerDone(errp *error)

WinOrWait allows a winning thread to announce completion of the task. Deferrable.

Directories

Path Synopsis
Package errorglue contains non-essential error declarations
Package errorglue contains non-essential error declarations
goid.GoID() provides a unique goroutine identifier.
goid.GoID() provides a unique goroutine identifier.
iana provides Address Family Numbers for the Internet.
iana provides Address Family Numbers for the Internet.
mains module
omaps module
Package parlca provides a self-signed certificate authority
Package parlca provides a self-signed certificate authority
Package perrors enrichens error values with string data, stack traces, associated errors, less severe warnings, thread-safe containers and comprehensive error string representations.
Package perrors enrichens error values with string data, stack traces, associated errors, less severe warnings, thread-safe containers and comprehensive error string representations.
Package pfs provides file-system related functions
Package pfs provides file-system related functions
plog provides log instances with Log Logw Info Debug D SetDebug SetRegexp SetRegexp SetSilent IsThisDebug IsSilent.
plog provides log instances with Log Logw Info Debug D SetDebug SetRegexp SetRegexp SetSilent IsThisDebug IsSilent.
Ranking is a pointer-identity-to-value map of updatable values traversable by rank.
Ranking is a pointer-identity-to-value map of updatable values traversable by rank.
Package pnet provides IP-related functions with few dependencies beyond the net package
Package pnet provides IP-related functions with few dependencies beyond the net package
Package parlos provides simplified functions related to the os package
Package parlos provides simplified functions related to the os package
process module
Package progress provides printable progress reporting for multi-threaded operations
Package progress provides printable progress reporting for multi-threaded operations
Package pruntime provides an interface to the Go standard library’s runtime package using only serializable simple types
Package pruntime provides an interface to the Go standard library’s runtime package using only serializable simple types
ConvertSliceToInterface converts a slice of a struct type to a slice of an interface type.
ConvertSliceToInterface converts a slice of a struct type to a slice of an interface type.
Package psql augments database/sql
Package psql augments database/sql
pstrings provides FilteredJoin QuoteList StrSliceContains
pstrings provides FilteredJoin QuoteList StrSliceContains
pterm module
Package parltime provides time utility functions
Package parltime provides time utility functions
punix has functions to examine unix.Errno errors
punix has functions to examine unix.Errno errors
sqliter module
Package threadprof provide profiling of threading
Package threadprof provide profiling of threading
tracer has events by task then time rather than time or thread
tracer has events by task then time rather than time or thread
watchfs module
yaml module
yamler module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL