Documentation
¶
Index ¶
- Constants
- func AddJitter(d time.Duration, pct ...float64) time.Duration
- func AppendMap[K comparable, V any](dest, source map[K]V)
- func CloneMap[K comparable, V any](orig map[K]V) map[K]V
- func CloneProto[T proto.Message](orig T) T
- func CloneProtoMap[K comparable, V proto.Message](orig map[K]V) map[K]V
- func CloneProtoMapValues[K comparable, V proto.Message](orig map[K]V) []V
- func CloneProtoSlice[T proto.Message](orig []T) []T
- func ContainsDuplicates[T comparable](slice []T) bool
- func DebounceChan[T any](ctx context.Context, source <-chan T, pause time.Duration) <-chan T
- func EnsureAddressPort(addr string, defaultPort int) string
- func EnsureEnvPath(targetPath string) error
- func EnsureFile(filePath string) error
- func GetBindAddress() (string, error)
- func GetLastSuffix(dirPath, prefix string) (int, error)
- func GetMonoTime() time.Time
- func GetOptionalParameter[T any](defaultValue T, values []T) T
- func GetSingleMapKey[K comparable, V any](m map[K]V) (k K, v V, ok bool)
- func GetTickerChan(ticker *time.Ticker) <-chan time.Time
- func GetTimerChan(timer *time.Timer) <-chan time.Time
- func GracefulShutdown(message string)
- func JoinHostPort(addr string, port int) string
- func LifecycleRun(ctx context.Context, log logging.Logger, instance Lifecycle) error
- func LifecycleStart[T Lifecycle](ctx context.Context, log logging.Logger, instances ...T) error
- func LifecycleStop[T Lifecycle](log logging.Logger, instances ...T)
- func LookupHost(addr string) (string, error)
- func MakeDebounceChan[T any](ctx context.Context, pause time.Duration, bufferSize ...int) (chan<- T, <-chan T)
- func MakeKeySlice[K comparable, V any](m map[K]V) []K
- func MakeSet[T comparable](slice []T) map[T]bool
- func MakeThrottleChan[T any](interval time.Duration, bufferSize ...int) (chan<- T, <-chan T)
- func MakeThrottleChanContext[T any](ctx context.Context, interval time.Duration, bufferSize ...int) (chan<- T, <-chan T)
- func MakeValueSlice[K comparable, V any](m map[K]V) []V
- func MarshalProto(msg proto.Message) ([]byte, error)
- func MkdirAll(path string) error
- func MkdirsAll(paths ...string) error
- func MustSetDefaults[T any](instance *T) *T
- func NewRateLimiter(limit int, interval time.Duration) *rate.Limiter
- func NewTicker(d time.Duration) *time.Ticker
- func NewTimer(d time.Duration) *time.Timer
- func PathExists(path string) (bool, error)
- func PointerOf[T any](val T) *T
- func RandElem[T any](slice []T) T
- func RepeatElem[T any](val T, count int) []T
- func RetryB(policy RetryPolicy, work func() error) bool
- func RetryContextB(ctx context.Context, policy RetryPolicy, work func() error) bool
- func RetryContextE(ctx context.Context, policy RetryPolicy, work func() error) error
- func RetryContextR[R any](ctx context.Context, policy RetryPolicy, work func() (R, error)) (R, error)
- func RetryE(policy RetryPolicy, work func() error) error
- func RetryForeverB(ctx context.Context, b *Backoff, work func() error) bool
- func RetryForeverE(ctx context.Context, backoff *Backoff, work func() error) error
- func RetryForeverR[R any](ctx context.Context, backoff *Backoff, work func() (R, error)) (R, error)
- func RetryR[R any](policy RetryPolicy, work func() (R, error)) (R, error)
- func RunAsync(ctx context.Context, work func(ctx context.Context)) context.CancelFunc
- func SetDefaults[T any](instance *T) error
- func SetTime(value time.Time) error
- func SetValues[T any](instance *T, values map[string]any) error
- func ShuffleSlice[T any](slice []T) []T
- func ShutdownNow(message string)
- func ShutdownNowf(format string, args ...any)
- func SliceLast[T any](slice []T) T
- func SplitHostPort(addr string) (string, int, error)
- func ThrottleChan[T any](ctx context.Context, ch <-chan T, interval time.Duration) <-chan T
- func TimeMapFromProto[T comparable](in map[T]*timestamppb.Timestamp) map[T]time.Time
- func TimeMapToProto[T comparable](in map[T]time.Time) map[T]*timestamppb.Timestamp
- func UnmarshalProto[T any, P protoMessage[T]](b []byte) (p P, err error)
- type Backoff
- type Bytes
- type Drainer
- type Float
- type Integer
- type Lifecycle
- type Ordered
- type Publisher
- type Queue
- type RetryPolicy
- type Signed
- type Subscriber
- type Unsigned
Constants ¶
const (
RFC3339Milli = "2006-01-02T15:04:05.999Z07:00"
)
Variables ¶
This section is empty.
Functions ¶
func AddJitter ¶
AddJitter adds random jitter in the range (-pct, +pct). If pct is not provided, will use 0.15 as the default value.
func AppendMap ¶
func AppendMap[K comparable, V any](dest, source map[K]V)
AppendMap appends source map items to dest map.
func CloneMap ¶
func CloneMap[K comparable, V any](orig map[K]V) map[K]V
CloneMap returns a surface-only map clone.
func CloneProto ¶
func CloneProtoMap ¶
func CloneProtoMap[K comparable, V proto.Message](orig map[K]V) map[K]V
func CloneProtoMapValues ¶
func CloneProtoMapValues[K comparable, V proto.Message](orig map[K]V) []V
func CloneProtoSlice ¶
func ContainsDuplicates ¶
func ContainsDuplicates[T comparable](slice []T) bool
ContainsDuplicates returns true if the slice contains duplicate items.
func DebounceChan ¶
DebounceChan wraps the channel to debounce its output and emits only when it observes a pause in activity. The result chan will be closed when the source is closed or the context is canceled.
func EnsureAddressPort ¶
EnsureAddressPort adds the defaultPort if the addr is missing the port.
func EnsureEnvPath ¶
EnsureEnvPath adds the target path to the PATH env variable.
func EnsureFile ¶
EnsureFile creates an empty file if it does not already exist.
func GetBindAddress ¶
GetBindAddress returns the hosts bind address.
func GetLastSuffix ¶
GetLastSuffix finds the last/max suffix in the specified dir.
func GetOptionalParameter ¶
func GetOptionalParameter[T any](defaultValue T, values []T) T
GetOptionalParameter is used by variadic functions to specify a single optional parameter.
func GetSingleMapKey ¶
func GetSingleMapKey[K comparable, V any](m map[K]V) (k K, v V, ok bool)
GetSingleMapKey returns true if the map contains a single key.
func GetTickerChan ¶
GetTickerChan returns the ticker.C chan if ticker is not nil.
func GetTimerChan ¶
GetTimerChan returns the timer.C chan if timer is not nil.
func GracefulShutdown ¶
func GracefulShutdown(message string)
GracefulShutdown allows shutting down the process in a graceful and controlled fashion.
func JoinHostPort ¶
JoinHostPort returns the host:port formatted string.
func LifecycleRun ¶
LifecycleRun starts the instance and runs until a stop signal is received.
func LifecycleStart ¶
LifecycleStart starts the provided instances.
func LifecycleStop ¶
LifecycleStop stops the provided instances.
func LookupHost ¶
LookupHost resolve names to IP addresses.
func MakeDebounceChan ¶
func MakeDebounceChan[T any](ctx context.Context, pause time.Duration, bufferSize ...int) (chan<- T, <-chan T)
MakeDebounceChan will make a new souce chan along with a debounced counterpart.
func MakeKeySlice ¶
func MakeKeySlice[K comparable, V any](m map[K]V) []K
MakeKeySlice returns map keys slice.
func MakeSet ¶
func MakeSet[T comparable](slice []T) map[T]bool
MakeSet returns a new set for the input slice.
func MakeThrottleChan ¶
MakeThrottleChan will make a new souce chan along with a throttled counterpart.
func MakeThrottleChanContext ¶
func MakeThrottleChanContext[T any](ctx context.Context, interval time.Duration, bufferSize ...int) (chan<- T, <-chan T)
MakeThrottleChanContext will make a new souce chan along with a throttled counterpart.
func MakeValueSlice ¶
func MakeValueSlice[K comparable, V any](m map[K]V) []V
MakeValueSlice returns map values slice.
func MarshalProto ¶
MarshalProto returns the proto bytes.
func MustSetDefaults ¶
func MustSetDefaults[T any](instance *T) *T
MustSetDefaults sets struct field values as specified via the 'default' tag.
func NewRateLimiter ¶
NewRateLimiter creates a new rate limiter.
func PathExists ¶
func PointerOf ¶
func PointerOf[T any](val T) *T
PointerOf is a helper that returns the pointer of input value.
func RepeatElem ¶
RepeatElem returns a new slice filled with the provided element.
func RetryB ¶
func RetryB(policy RetryPolicy, work func() error) bool
RetryB retries the provided work func. Returns true if the function returned with success. If backoff is not provided will use the default values.
func RetryContextB ¶
func RetryContextB(ctx context.Context, policy RetryPolicy, work func() error) bool
RetryContextB executes the provided work func for a max number of retries or until ctx is canceled. Returns true if the function returned with success. If backoff is not provided will use the default values.
func RetryContextE ¶
func RetryContextE(ctx context.Context, policy RetryPolicy, work func() error) error
RetryContextE executes the provided work func for a max number of retries or until ctx is canceled. Returns nil if the function returned with success. If backoff is not provided will use the default values.
func RetryContextR ¶
func RetryContextR[R any](ctx context.Context, policy RetryPolicy, work func() (R, error)) (R, error)
RetryContextR executes the provided work func for a max number of retries or until ctx is canceled. Returns the work result if the function returned with success. If backoff is not provided will use the default values.
func RetryE ¶
func RetryE(policy RetryPolicy, work func() error) error
RetryE retries the provided work func. Returns nil if the function returned with success. If backoff is not provided will use the default values.
func RetryForeverB ¶
RetryForeverB executes the provided work func until success or until ctx is canceled. Returns true if the function returned with success. If backoff is not provided will use the default values.
func RetryForeverE ¶
RetryForeverE executes the provided work func until success or until ctx is canceled. Returns nil if the function returned with success. If backoff is not provided will use the default values.
func RetryForeverR ¶
RetryForeverR executes the provided work func until success or until ctx is canceled. Returns the work result if the function returned with success. If backoff is not provided will use the default values.
func RetryR ¶
func RetryR[R any](policy RetryPolicy, work func() (R, error)) (R, error)
RetryR retries the provided work func. Returns the work result if the function returned with success. If backoff is not provided will use the default values.
func SetDefaults ¶
SetDefaults sets struct field values as specified via the 'default' tag.
func ShuffleSlice ¶
func ShuffleSlice[T any](slice []T) []T
ShuffleSlice is a generic helper for rand.Shuffle.
func ShutdownNow ¶
func ShutdownNow(message string)
ShutdownNow triggers process shutdown and never returns.
func ShutdownNowf ¶
ShutdownNowf triggers process shutdown and never returns.
func SplitHostPort ¶
SplitHostPort retruns the host, port pair.
func ThrottleChan ¶
ThrottleChan wraps the channel to throttle its output to max once per provided interval. The result chan will be closed when the source is closed or the context is canceled.
func TimeMapFromProto ¶
func TimeMapFromProto[T comparable](in map[T]*timestamppb.Timestamp) map[T]time.Time
func TimeMapToProto ¶
func TimeMapToProto[T comparable](in map[T]time.Time) map[T]*timestamppb.Timestamp
func UnmarshalProto ¶
UnmarshalProto returns the message for the provided bytes.
Types ¶
type Backoff ¶
type Backoff struct {
MinDelay time.Duration `yaml:"minDelay" default:"100ms" validate:"min:10ms"`
MaxDelay time.Duration `yaml:"maxDelay" default:"2s" validate:"min:100ms"`
Jitter float64 `yaml:"jitter" default:"0.15" validate:"min:0,max:1"`
Multiplier float64 `yaml:"multiplier" default:"1.75" validate:"min:1"`
}
Backoff retry parameters
type Drainer ¶
type Drainer struct {
// contains filtered or unexported fields
}
Drainer will cancel all in-flight work when Stop is called or when the context is canceled.
func NewDrainer ¶
type Publisher ¶
type Publisher[T any] interface { Subscribe(bufferSize int) Subscriber[T] Publish(T) PublishAttempt(T) bool NotifyChan() <-chan any }
type RetryPolicy ¶
type Subscriber ¶
type Subscriber[T any] interface { Item() T ItemChan() <-chan T Unsubscribe() NotifyPublisher() }