concurrency

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2025 License: MIT Imports: 10 Imported by: 1

README

Concurrency package

This is a Golang library to implement popular concurrency patterns in a painless for developers.

Documentation

CONTRIBUTING

CHANGELOG

RELEASING

Usage

Learn more about usage in testing files.

type Transaction struct {
	ID     int
	Amount int
}

type trans struct {
	data []Transaction
	mu   sync.Mutex
}

type Recovery struct{}

func (r *Recovery) Recover(msg any) { fmt.Println("Recovered in func", msg) }

func Test_Success(t *testing.T) {
	tr := NewTrans()
	NewPipeline(tr).
		SetSteps(filter, convert).
		SetCommit(tr.Commit).
		SetGoroutinesLimit(3).
		SetRecoveryHandler(new(Recovery)).
		Run()
}

func NewTrans() *trans {
	const size = 10
	t := &trans{
		data: make([]Transaction, 0, size),
	}
	for i := 0; i < size; i++ {
		t.data = append(t.data, Transaction{
			ID:     i,
			Amount: rand.Intn(500) - 100,
		})
	}
	return t
}

func (t *trans) Fetch(_ context.Context) iter.Seq[Transaction] {
	const size = 10
	return func(yield func(Transaction) bool) {
		for i := 0; i < size; i++ {
			if i == 9 {
				panic("ah man")
			}
			time.Sleep(time.Millisecond * 500)
			tr := Transaction{
				ID:     i,
				Amount: rand.Intn(500) - 100,
			}
			fmt.Printf("Transaction %d created...\n", i)
			if !yield(tr) {
				return
			}
		}
	}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// worker pool is closed
	ErrPoolClosed = errors.New("worker pool is closed")
)

Functions

func MergeChannels added in v0.0.2

func MergeChannels[Type any, Uint Unsigned](
	ctx context.Context,
	bufsize Uint,
	channels ...chan Type,
) <-chan Type

func MergeChannelsIter added in v0.0.2

func MergeChannelsIter[Type any, Uint Unsigned](
	ctx context.Context,
	bufsize Uint,
	channels ...chan Type,
) iter.Seq[Type]

func NewPipeline

func NewPipeline[Type any](pipline Pipeline[Type]) *pipeline[Type]

func NewPipelineContext

func NewPipelineContext[Type any](pipline Pipeline[Type], ctx context.Context) *pipeline[Type]

Types

type Func

type Func = func()

type HeapQueue added in v0.0.2

type HeapQueue struct {
	// contains filtered or unexported fields
}

func MustInitHeapQueue added in v0.0.2

func MustInitHeapQueue() *HeapQueue

func NewHeapQueue added in v0.0.2

func NewHeapQueue() (*HeapQueue, error)

func (*HeapQueue) Add added in v0.0.2

func (hq *HeapQueue) Add(job Job)

func (*HeapQueue) Next added in v0.0.2

func (hq *HeapQueue) Next() (time.Time, bool)

func (*HeapQueue) Run added in v0.0.2

func (s *HeapQueue) Run()

type HeapQueueManager added in v0.0.2

type HeapQueueManager interface {
	heap.Interface
	Next() (time.Time, bool)
}

type IntervalJob added in v0.0.2

type IntervalJob struct {
	// contains filtered or unexported fields
}

func NewIntervalJob added in v0.0.2

func NewIntervalJob(id string, interval time.Duration, Next time.Time, job func()) *IntervalJob

func (*IntervalJob) Next added in v0.0.2

func (jb *IntervalJob) Next() time.Time

func (*IntervalJob) Reset added in v0.0.2

func (jb *IntervalJob) Reset()

func (*IntervalJob) Run added in v0.0.2

func (jb *IntervalJob) Run()

type Job added in v0.0.2

type Job interface {
	Next() time.Time
	Run()
	Reset()
}

type JobList added in v0.0.2

type JobList []Job

func MustInitJobList added in v0.0.2

func MustInitJobList() *JobList

func NewJobList added in v0.0.2

func NewJobList() (*JobList, error)

func (JobList) Len added in v0.0.2

func (jb JobList) Len() int

func (JobList) Less added in v0.0.2

func (jb JobList) Less(i, j int) bool

func (*JobList) Next added in v0.0.2

func (jb *JobList) Next() (time.Time, bool)

func (*JobList) Pop added in v0.0.2

func (jb *JobList) Pop() any

func (*JobList) Push added in v0.0.2

func (jb *JobList) Push(x any)

func (JobList) Swap added in v0.0.2

func (jb JobList) Swap(i, j int)

type Pipeline

type Pipeline[Type any] interface {
	Fetch(context.Context) iter.Seq[Type]
}

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(opts ...PoolOption) *Pool

func NewWorkerPoolContext

func NewWorkerPoolContext(ctx context.Context, opts ...PoolOption) *Pool

func (*Pool) AddTask

func (p *Pool) AddTask(task Task) (string, error)

func (*Pool) GetMetrics

func (p *Pool) GetMetrics() PoolMetrics

func (*Pool) SetMaxWorkers

func (p *Pool) SetMaxWorkers(n int32)

func (*Pool) SetMinWorkers

func (p *Pool) SetMinWorkers(n int32)

New methods for dynamic adjustment

func (*Pool) Start

func (p *Pool) Start(_ context.Context) error

func (*Pool) Stop

func (p *Pool) Stop(_ context.Context) error

type PoolMetrics

type PoolMetrics struct {
	WorkersCurrent int32
	WorkersMin     int32
	WorkersMax     int32
	QueueDepth     int
	HighQueueDepth int
	TasksProcessed int64
	TasksFailed    int64
}

type PoolOption

type PoolOption func(*Pool)

func WithTimeouts

func WithTimeouts(up, down time.Duration) PoolOption

func WithWorkersControl

func WithWorkersControl[Int constraints.Signed](min, max, init Int) PoolOption

type Priority

type Priority int
const (
	PriorityLow Priority = iota
	PriorityHigh
)

func (Priority) String

func (pr Priority) String() string

type QueueManager added in v0.0.2

type QueueManager interface {
	Next() (time.Time, bool)
	Run()
	Add(job Job)
}

type Recoverer

type Recoverer interface {
	Recover(message any)
}

type Scheduler added in v0.0.2

type Scheduler struct {
	// contains filtered or unexported fields
}

func MustInitScheduler added in v0.0.2

func MustInitScheduler(opts ...SchedulerOption) *Scheduler

func NewScheduler added in v0.0.2

func NewScheduler(opts ...SchedulerOption) (*Scheduler, error)

func (*Scheduler) Add added in v0.0.2

func (s *Scheduler) Add(job Job)

func (*Scheduler) Start added in v0.0.2

func (s *Scheduler) Start()

func (*Scheduler) Stop added in v0.0.2

func (s *Scheduler) Stop()

type SchedulerOption added in v0.0.2

type SchedulerOption func(*Scheduler)

func WithQueueManager added in v0.0.2

func WithQueueManager(QueueManager QueueManager) SchedulerOption

type Task

type Task struct {
	ID       string
	Func     Func
	Priority Priority
}

type Unsigned added in v0.0.2

type Unsigned interface {
	~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64
}

type WorkerStats

type WorkerStats struct {
	ID              int32
	StartTime       time.Time
	LastActivity    time.Time
	TasksHandled    int64
	Status          WorkerStatus
	CurrentTask     string // task description or ID
	PanicsRecovered int64
}

type WorkerStatus

type WorkerStatus int
const (
	StatusIdle WorkerStatus = iota
	StatusWorking
	StatusTerminated
)

func (WorkerStatus) String

func (ws WorkerStatus) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL