gobasetools

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2025 License: MIT Imports: 11 Imported by: 0

README

Go Base Tools

This repository contains a collection of base tools for Go, including reactive entities, data structures, and utilities. The tools are designed to be lightweight, efficient, and easy to use, providing essential functionality for various applications.

This repository is a work-in-progress, and new tools will be added over time. Feel free to contribute, suggest improvements, or request specific tools that you'd like to see added.

Tools

Reactive Library

This lightweight reactive library provides reactive entities for Go, allowing you to manage subscribers and publish updates in a way that adheres to Go's philosophy of simplicity, channels, and concurrency. Subscribers receive updates through channels, and you can target specific subscribers using the Notify method.

Methods
Subscribe(id string) (<-chan T, error)

Creates a new subscriber with a unique id. Returns a channel that receives messages. If a subscriber with the given ID already exists, returns an error.

Unsubscribe(id string) error

Unsubscribes the subscriber with the given id. After this, the subscriber will no longer receive messages.

Broadcast(value T)

Sends a message to all subscribers.

Notify(id string, value T) error

Sends a message to a specific subscriber, identified by id. If no such subscriber exists, returns an error.

Close()

Closes all subscriber channels and clears the map of subscribers, cleaning up resources.

Documentation

Index

Constants

View Source
const TsFormat = "2006-01-02T15:04:05.999Z"

Variables

This section is empty.

Functions

func AddLineNums

func AddLineNums(s string) string

func Capitalize

func Capitalize(s string) string

func Compact

func Compact(s string) string

func Dasherize

func Dasherize(s string) string

func EnsureMinDuration

func EnsureMinDuration(start time.Time, minDuration time.Duration)

func GetRandomAlphanumeric

func GetRandomAlphanumeric(n int) ([]byte, error)

func IndexRunes

func IndexRunes(haystack []rune, needle []rune) int

indexRunes searches for the slice of runes `needle` in the slice of runes `haystack` and returns the index of the first rune of `needle` in `haystack`, or -1 if `needle` is not present.

func RemoveLineNums

func RemoveLineNums(s string) string

func StringTs

func StringTs() string

Types

type BackpressurePolicy added in v1.1.0

type BackpressurePolicy int

BackpressurePolicy defines how to behave when a subscriber's mailbox is full.

const (
	// BackpressureDropNewest drops the incoming value if the subscriber's mailbox is full (default).
	BackpressureDropNewest BackpressurePolicy = iota
	// BackpressureDropOldest removes one oldest pending value from the mailbox (if any) and enqueues the new one.
	BackpressureDropOldest
	// BackpressureBlock blocks the broadcaster until there is space in the subscriber's mailbox.
	// This may degrade real-time characteristics if any subscriber is slow.
	BackpressureBlock
)

type Observable

type Observable[T any] interface {
	// Subscribe registers a subscriber with id and returns a receive-only channel.
	// If ctx is canceled before subscription is established, returns ctx.Err.
	// An error is returned if the id already exists or the observable is closed.
	Subscribe(ctx context.Context, id string, opts ...SubOption[T]) (<-chan T, error)

	// Unsubscribe removes a subscriber by id and closes its channel.
	// Returns an error if no such subscriber exists.
	Unsubscribe(id string) error

	// Broadcast delivers value to all subscribers according to their backpressure policy.
	// This call is non-blocking per-subscriber by default (DropNewest).
	Broadcast(value T)

	// BroadcastCtx is like Broadcast but can return early if ctx is canceled while applying
	// blocking backpressure policies. For non-blocking policies, it behaves like Broadcast.
	BroadcastCtx(ctx context.Context, value T) error

	// Notify delivers value to a specific subscriber using that subscriber's backpressure policy.
	// Returns an error if the subscriber doesn't exist or if blocking is interrupted by ctx.
	Notify(id string, value T) error
	NotifyCtx(ctx context.Context, id string, value T) error

	// Close closes the observable and all subscriber channels. Further Subscribe returns an error,
	// and Broadcast/Notify become no-ops (or return errors for context variants).
	Close()

	// Closed reports whether the observable has been closed.
	Closed() bool

	// Len returns the number of current subscribers.
	Len() int
}

func NewObservable

func NewObservable[T any]() Observable[T]

NewObservable creates a new observable with sensible defaults: - Buffer: 64 - Policy: DropNewest

func NewObservableWithDefaults added in v1.1.0

func NewObservableWithDefaults[T any](defaults SubConfig[T]) Observable[T]

NewObservableWithDefaults allows overriding defaults applied to all new subscribers.

type SubConfig added in v1.1.0

type SubConfig[T any] struct {
	// Buffer is the per-subscriber mailbox depth (and outward channel depth).
	Buffer int
	// Policy determines backpressure behavior when the mailbox is full.
	Policy BackpressurePolicy
	// ReplayLatest, if true, immediately delivers the most recently broadcast value to the new subscriber (if any).
	ReplayLatest bool
	// DropCallback is invoked when a value is dropped for this subscriber due to backpressure.
	DropCallback func(id string, value T)
	// FlushOnUnsubscribe, if true, attempts to drain the mailbox to the outward channel before closing on unsubscribe.
	// Default false (fast close, drop remaining).
	FlushOnUnsubscribe bool
}

SubConfig controls per-subscriber behavior.

type SubOption added in v1.1.0

type SubOption[T any] func(*SubConfig[T])

SubOption modifies SubConfig.

func WithBuffer added in v1.1.0

func WithBuffer[T any](n int) SubOption[T]

func WithDropCallback added in v1.1.0

func WithDropCallback[T any](fn func(id string, value T)) SubOption[T]

func WithFlushOnUnsubscribe added in v1.1.0

func WithFlushOnUnsubscribe[T any](enable bool) SubOption[T]

func WithPolicy added in v1.1.0

func WithPolicy[T any](p BackpressurePolicy) SubOption[T]

func WithReplayLatest added in v1.1.0

func WithReplayLatest[T any](enable bool) SubOption[T]

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL