utils

package
v0.0.0-...-73798e1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: AGPL-3.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RFC3339Milli = "2006-01-02T15:04:05.999Z07:00"
)

Variables

This section is empty.

Functions

func AddJitter

func AddJitter(d time.Duration, pct ...float64) time.Duration

AddJitter adds random jitter in the range (-pct, +pct). If pct is not provided, will use 0.15 as the default value.

func AppendMap

func AppendMap[K comparable, V any](dest, source map[K]V)

AppendMap appends source map items to dest map.

func CloneMap

func CloneMap[K comparable, V any](orig map[K]V) map[K]V

CloneMap returns a surface-only map clone.

func CloneProto

func CloneProto[T proto.Message](orig T) T

func CloneProtoMap

func CloneProtoMap[K comparable, V proto.Message](orig map[K]V) map[K]V

func CloneProtoMapValues

func CloneProtoMapValues[K comparable, V proto.Message](orig map[K]V) []V

func CloneProtoSlice

func CloneProtoSlice[T proto.Message](orig []T) []T

func ContainsDuplicates

func ContainsDuplicates[T comparable](slice []T) bool

ContainsDuplicates returns true if the slice contains duplicate items.

func DebounceChan

func DebounceChan[T any](ctx context.Context, source <-chan T, pause time.Duration) <-chan T

DebounceChan wraps the channel to debounce its output and emits only when it observes a pause in activity. The result chan will be closed when the source is closed or the context is canceled.

func EnsureAddressPort

func EnsureAddressPort(addr string, defaultPort int) string

EnsureAddressPort adds the defaultPort if the addr is missing the port.

func EnsureEnvPath

func EnsureEnvPath(targetPath string) error

EnsureEnvPath adds the target path to the PATH env variable.

func EnsureFile

func EnsureFile(filePath string) error

EnsureFile creates an empty file if it does not already exist.

func GetBindAddress

func GetBindAddress() (string, error)

GetBindAddress returns the hosts bind address.

func GetLastSuffix

func GetLastSuffix(dirPath, prefix string) (int, error)

GetLastSuffix finds the last/max suffix in the specified dir.

func GetMonoTime

func GetMonoTime() time.Time

GetMonoTime returns current monotonic system time.

func GetOptionalParameter

func GetOptionalParameter[T any](defaultValue T, values []T) T

GetOptionalParameter is used by variadic functions to specify a single optional parameter.

func GetSingleMapKey

func GetSingleMapKey[K comparable, V any](m map[K]V) (k K, v V, ok bool)

GetSingleMapKey returns true if the map contains a single key.

func GetTickerChan

func GetTickerChan(ticker *time.Ticker) <-chan time.Time

GetTickerChan returns the ticker.C chan if ticker is not nil.

func GetTimerChan

func GetTimerChan(timer *time.Timer) <-chan time.Time

GetTimerChan returns the timer.C chan if timer is not nil.

func GracefulShutdown

func GracefulShutdown(message string)

GracefulShutdown allows shutting down the process in a graceful and controlled fashion.

func JoinHostPort

func JoinHostPort(addr string, port int) string

JoinHostPort returns the host:port formatted string.

func LifecycleRun

func LifecycleRun(ctx context.Context, log logging.Logger, instance Lifecycle) error

LifecycleRun starts the instance and runs until a stop signal is received.

func LifecycleStart

func LifecycleStart[T Lifecycle](ctx context.Context, log logging.Logger, instances ...T) error

LifecycleStart starts the provided instances.

func LifecycleStop

func LifecycleStop[T Lifecycle](log logging.Logger, instances ...T)

LifecycleStop stops the provided instances.

func LookupHost

func LookupHost(addr string) (string, error)

LookupHost resolve names to IP addresses.

func MakeDebounceChan

func MakeDebounceChan[T any](ctx context.Context, pause time.Duration, bufferSize ...int) (chan<- T, <-chan T)

MakeDebounceChan will make a new souce chan along with a debounced counterpart.

func MakeKeySlice

func MakeKeySlice[K comparable, V any](m map[K]V) []K

MakeKeySlice returns map keys slice.

func MakeSet

func MakeSet[T comparable](slice []T) map[T]bool

MakeSet returns a new set for the input slice.

func MakeThrottleChan

func MakeThrottleChan[T any](interval time.Duration, bufferSize ...int) (chan<- T, <-chan T)

MakeThrottleChan will make a new souce chan along with a throttled counterpart.

func MakeThrottleChanContext

func MakeThrottleChanContext[T any](ctx context.Context, interval time.Duration, bufferSize ...int) (chan<- T, <-chan T)

MakeThrottleChanContext will make a new souce chan along with a throttled counterpart.

func MakeValueSlice

func MakeValueSlice[K comparable, V any](m map[K]V) []V

MakeValueSlice returns map values slice.

func MarshalProto

func MarshalProto(msg proto.Message) ([]byte, error)

MarshalProto returns the proto bytes.

func MkdirAll

func MkdirAll(path string) error

func MkdirsAll

func MkdirsAll(paths ...string) error

func MustSetDefaults

func MustSetDefaults[T any](instance *T) *T

MustSetDefaults sets struct field values as specified via the 'default' tag.

func NewRateLimiter

func NewRateLimiter(limit int, interval time.Duration) *rate.Limiter

NewRateLimiter creates a new rate limiter.

func NewTicker

func NewTicker(d time.Duration) *time.Ticker

NewTicker creates a new stopped ticker.

func NewTimer

func NewTimer(d time.Duration) *time.Timer

NewTicker creates a new stopped timer.

func PathExists

func PathExists(path string) (bool, error)

func PointerOf

func PointerOf[T any](val T) *T

PointerOf is a helper that returns the pointer of input value.

func RandElem

func RandElem[T any](slice []T) T

RandElem returns a random slice element.

func RepeatElem

func RepeatElem[T any](val T, count int) []T

RepeatElem returns a new slice filled with the provided element.

func RetryB

func RetryB(policy RetryPolicy, work func() error) bool

RetryB retries the provided work func. Returns true if the function returned with success. If backoff is not provided will use the default values.

func RetryContextB

func RetryContextB(ctx context.Context, policy RetryPolicy, work func() error) bool

RetryContextB executes the provided work func for a max number of retries or until ctx is canceled. Returns true if the function returned with success. If backoff is not provided will use the default values.

func RetryContextE

func RetryContextE(ctx context.Context, policy RetryPolicy, work func() error) error

RetryContextE executes the provided work func for a max number of retries or until ctx is canceled. Returns nil if the function returned with success. If backoff is not provided will use the default values.

func RetryContextR

func RetryContextR[R any](ctx context.Context, policy RetryPolicy, work func() (R, error)) (R, error)

RetryContextR executes the provided work func for a max number of retries or until ctx is canceled. Returns the work result if the function returned with success. If backoff is not provided will use the default values.

func RetryE

func RetryE(policy RetryPolicy, work func() error) error

RetryE retries the provided work func. Returns nil if the function returned with success. If backoff is not provided will use the default values.

func RetryForeverB

func RetryForeverB(ctx context.Context, b *Backoff, work func() error) bool

RetryForeverB executes the provided work func until success or until ctx is canceled. Returns true if the function returned with success. If backoff is not provided will use the default values.

func RetryForeverE

func RetryForeverE(ctx context.Context, backoff *Backoff, work func() error) error

RetryForeverE executes the provided work func until success or until ctx is canceled. Returns nil if the function returned with success. If backoff is not provided will use the default values.

func RetryForeverR

func RetryForeverR[R any](ctx context.Context, backoff *Backoff, work func() (R, error)) (R, error)

RetryForeverR executes the provided work func until success or until ctx is canceled. Returns the work result if the function returned with success. If backoff is not provided will use the default values.

func RetryR

func RetryR[R any](policy RetryPolicy, work func() (R, error)) (R, error)

RetryR retries the provided work func. Returns the work result if the function returned with success. If backoff is not provided will use the default values.

func RunAsync

func RunAsync(ctx context.Context, work func(ctx context.Context)) context.CancelFunc

func SetDefaults

func SetDefaults[T any](instance *T) error

SetDefaults sets struct field values as specified via the 'default' tag.

func SetTime

func SetTime(value time.Time) error

SetTime sets the system time.

func SetValues

func SetValues[T any](instance *T, values map[string]any) error

SetValues sets struct field values as specified via the values map.

func ShuffleSlice

func ShuffleSlice[T any](slice []T) []T

ShuffleSlice is a generic helper for rand.Shuffle.

func ShutdownNow

func ShutdownNow(message string)

ShutdownNow triggers process shutdown and never returns.

func ShutdownNowf

func ShutdownNowf(format string, args ...any)

ShutdownNowf triggers process shutdown and never returns.

func SliceLast

func SliceLast[T any](slice []T) T

func SplitHostPort

func SplitHostPort(addr string) (string, int, error)

SplitHostPort retruns the host, port pair.

func ThrottleChan

func ThrottleChan[T any](ctx context.Context, ch <-chan T, interval time.Duration) <-chan T

ThrottleChan wraps the channel to throttle its output to max once per provided interval. The result chan will be closed when the source is closed or the context is canceled.

func TimeMapFromProto

func TimeMapFromProto[T comparable](in map[T]*timestamppb.Timestamp) map[T]time.Time

func TimeMapToProto

func TimeMapToProto[T comparable](in map[T]time.Time) map[T]*timestamppb.Timestamp

func UnmarshalProto

func UnmarshalProto[T any, P protoMessage[T]](b []byte) (p P, err error)

UnmarshalProto returns the message for the provided bytes.

Types

type Backoff

type Backoff struct {
	MinDelay   time.Duration `yaml:"minDelay" default:"100ms" validate:"min:10ms"`
	MaxDelay   time.Duration `yaml:"maxDelay" default:"2s" validate:"min:100ms"`
	Jitter     float64       `yaml:"jitter" default:"0.15" validate:"min:0,max:1"`
	Multiplier float64       `yaml:"multiplier" default:"1.75" validate:"min:1"`
}

Backoff retry parameters

type Bytes

type Bytes string

func (Bytes) MustParse

func (b Bytes) MustParse() uint64

func (Bytes) Parse

func (b Bytes) Parse() (uint64, error)

type Drainer

type Drainer struct {
	// contains filtered or unexported fields
}

Drainer will cancel all in-flight work when Stop is called or when the context is canceled.

func NewDrainer

func NewDrainer(log logging.Logger) *Drainer

func (*Drainer) Stop

func (d *Drainer) Stop()

func (*Drainer) WithDrain

func (d *Drainer) WithDrain(ctx context.Context) (context.Context, context.CancelFunc)

WithDrain returns a child context that will be canceled when drainer is stopped. If already draining, it will return a canceled context. For all scenarios, the caller must invoke the cancel func when work is done.

type Float

type Float interface {
	~float32 | ~float64
}

type Integer

type Integer interface {
	Signed | Unsigned
}

type Lifecycle

type Lifecycle interface {
	Start(ctx context.Context) error
	Stop()
}

Lifecycle defines methods for instance control.

type Ordered

type Ordered interface {
	Integer | Float | ~string
}

type Publisher

type Publisher[T any] interface {
	Subscribe(bufferSize int) Subscriber[T]
	Publish(T)
	PublishAttempt(T) bool
	NotifyChan() <-chan any
}

func NewPubSub

func NewPubSub[T any](minSize int) Publisher[T]

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue[T any](items ...T) *Queue[T]

func (*Queue[T]) IsEmpty

func (q *Queue[T]) IsEmpty() bool

func (*Queue[T]) PeekBack

func (q *Queue[T]) PeekBack() (t T, ok bool)

func (*Queue[T]) PeekFront

func (q *Queue[T]) PeekFront() (t T, ok bool)

func (*Queue[T]) PopBack

func (q *Queue[T]) PopBack() (t T, ok bool)

func (*Queue[T]) PopFront

func (q *Queue[T]) PopFront() (t T, ok bool)

func (*Queue[T]) PushBack

func (q *Queue[T]) PushBack(t T)

func (*Queue[T]) PushFront

func (q *Queue[T]) PushFront(t T)

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts int     `yaml:"maxAttempts" default:"3" validate:"min:1"`
	Backoff     Backoff `yaml:"backoff"`
}

type Signed

type Signed interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64
}

type Subscriber

type Subscriber[T any] interface {
	Item() T
	ItemChan() <-chan T
	Unsubscribe()
	NotifyPublisher()
}

type Unsigned

type Unsigned interface {
	~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL