patterns

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: Apache-2.0 Imports: 3 Imported by: 0

README

Package cloudeng.io/sync/patterns

import cloudeng.io/sync/patterns

Package patterns provides common synchronization and communication patterns built using channels and other primitives.

Constants

DefaultFIFOSize
DefaultFIFOSize = 100

DefaultPubSubCapacity
DefaultPubSubCapacity = 100

Types

Type FIFO
type FIFO[T any] struct {
	// contains filtered or unexported fields
}

FIFO is a goroutine-safe queue that drops the oldest item when the internal buffer (capacity items) is full. b.out is unbuffered; items are only delivered when a receiver is ready.

The internal state (buf, head, tail, count) is a ring buffer accessed exclusively by the run goroutine, so drop-oldest is atomic with respect to external readers and requires no allocations after the initial make.

Functions
func NewFIFO[T any](ctx context.Context, capacity int) *FIFO[T]

NewFIFO creates a new FIFO with the specified buffer capacity. If capacity is <= 0, it defaults to DefaultFIFOSize.

Methods
func (b *FIFO[T]) In() chan<- T
func (b *FIFO[T]) Out() <-chan T
func (b *FIFO[T]) Stop(ctx context.Context)
Type PubSub
type PubSub[T any] struct {
	// contains filtered or unexported fields
}

PubSub provides a concurrent pub-sub mechanism that drops the oldest items for slow subscribers when their buffer is full.

Functions
func New[T any]() *PubSub[T]

New returns a new PubSub instance.

Methods
func (ps *PubSub[T]) Close()

Close closes the PubSub instance and all of its active subscribers.

func (ps *PubSub[T]) Publish(item T)

Publish sends an item to all active subscribers. If a subscriber's buffer is full, its oldest item is dropped to make room for the new one. Subscribers whose run goroutine has exited (e.g. context cancelled) are detected via their alive channel and pruned from the map without blocking.

func (ps *PubSub[T]) Subscribe(ctx context.Context, capacity int) *Subscriber[T]

Subscribe creates and returns a new Subscriber with the given buffer capacity. If capacity is <=0, it defaults to DefaultPubSubCapacity. ctx is passed to the underlying FIFO.

func (ps *PubSub[T]) Unsubscribe(sub *Subscriber[T])

Unsubscribe removes a subscriber and closes its underlying channel.

Type Subscriber
type Subscriber[T any] struct {
	// contains filtered or unexported fields
}

Subscriber represents a subscription to a PubSub instance.

Methods
func (s *Subscriber[T]) C() <-chan T

C returns the underlying receive-only channel for use in select statements.

Documentation

Overview

Package patterns provides common synchronization and communication patterns built using channels and other primitives.

Index

Constants

View Source
const DefaultFIFOSize = 100
View Source
const (
	DefaultPubSubCapacity = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type FIFO

type FIFO[T any] struct {
	// contains filtered or unexported fields
}

FIFO is a goroutine-safe queue that drops the oldest item when the internal buffer (capacity items) is full. b.out is unbuffered; items are only delivered when a receiver is ready.

The internal state (buf, head, tail, count) is a ring buffer accessed exclusively by the run goroutine, so drop-oldest is atomic with respect to external readers and requires no allocations after the initial make.

func NewFIFO

func NewFIFO[T any](ctx context.Context, capacity int) *FIFO[T]

NewFIFO creates a new FIFO with the specified buffer capacity. If capacity is <= 0, it defaults to DefaultFIFOSize.

func (*FIFO[T]) In

func (b *FIFO[T]) In() chan<- T

func (*FIFO[T]) Out

func (b *FIFO[T]) Out() <-chan T

func (*FIFO[T]) Stop

func (b *FIFO[T]) Stop(ctx context.Context)

type PubSub

type PubSub[T any] struct {
	// contains filtered or unexported fields
}

PubSub provides a concurrent pub-sub mechanism that drops the oldest items for slow subscribers when their buffer is full.

func New

func New[T any]() *PubSub[T]

New returns a new PubSub instance.

func (*PubSub[T]) Close

func (ps *PubSub[T]) Close()

Close closes the PubSub instance and all of its active subscribers.

func (*PubSub[T]) Publish

func (ps *PubSub[T]) Publish(item T)

Publish sends an item to all active subscribers. If a subscriber's buffer is full, its oldest item is dropped to make room for the new one. Subscribers whose run goroutine has exited (e.g. context cancelled) are detected via their alive channel and pruned from the map without blocking.

func (*PubSub[T]) Subscribe

func (ps *PubSub[T]) Subscribe(ctx context.Context, capacity int) *Subscriber[T]

Subscribe creates and returns a new Subscriber with the given buffer capacity. If capacity is <=0, it defaults to DefaultPubSubCapacity. ctx is passed to the underlying FIFO.

func (*PubSub[T]) Unsubscribe

func (ps *PubSub[T]) Unsubscribe(sub *Subscriber[T])

Unsubscribe removes a subscriber and closes its underlying channel.

type Subscriber

type Subscriber[T any] struct {
	// contains filtered or unexported fields
}

Subscriber represents a subscription to a PubSub instance.

func (*Subscriber[T]) C

func (s *Subscriber[T]) C() <-chan T

C returns the underlying receive-only channel for use in select statements.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL