pipelines

package module
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2023 License: MIT Imports: 1 Imported by: 0

README

pipelines

Go Reference Go Report Card

Golang package for parallel execution of tasks.

Pools types:
  • BusPool: Common Pool of Workers. The Task is taken into work by the first released Worker.
  • HashPool: Worker pool, which allows you to change the strategy for assigning Tasks to Workers.
  • Semaphore: Primitive for limiting the number of threads for the Tasks parallel execution.
Examples:
package main

import (
	"fmt"
	"sync"

	"github.com/egnd/go-toolbox/pipelines/assign"
	"github.com/egnd/go-toolbox/pipelines/decorators"
	"github.com/egnd/go-toolbox/pipelines/pools"
	"github.com/egnd/go-toolbox/pipelines/tasks"
	"github.com/rs/zerolog"
	"go.uber.org/zap"
)

func main() {
	var wg sync.WaitGroup

	// BusPool example:
	pipe := pools.NewBusPool(
		2,  // set parallel threads count
		10, // set tasks queue size
		&wg,
		// add some task decorators:
		decorators.LogErrorZero(zerolog.Nop()), // log tasks errors with zerolog logger
		decorators.CatchPanic,                  // convert tasks panics to errors
	)

	// HashPool example:
	pipe := pools.NewHashPool(
		2,  // set parallel threads count
		10, // set tasks queue size
		&wg,
		assign.Sticky, // choose tasks to workers assignment method
		// add some task decorators:
		decorators.LogErrorZap(zap.NewNop()), // log tasks errors with zap logger
		decorators.CatchPanic,                // convert tasks panics to errors
	)

	// Semaphore example:
	pipe := pools.NewSemaphore(2, // set parallel threads count
		&wg,
		// add some task decorators:
		decorators.ThrowPanic, // convert tasks errors to panics
	)

	// Send some tasks to pool
	for i := 0; i < 10; i++ {
		pipe.Push(tasks.NewFunc("task#"+fmt.Sprint(i), func() (err error) {
			// do something
			return err
		}))
	}

	// Wait for task processing
	pipe.Wait()

	// Close pool
	if err := pipe.Close(); err != nil {
		panic(err)
	}
}

Documentation

Overview

Package pipelines contains tools for parallel execution

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher interface {
	io.Closer
	Push(Task) error
	Wait()
}

Dispatcher is a pool interface.

type Doer

type Doer interface {
	io.Closer
	Do(Task) error
}

Doer is a worker interface.

type Hasher

type Hasher func(string, uint64) uint64

Hasher is a hash function type.

type Task

type Task interface {
	ID() string
	Do() error
}

Task is a task interface.

type TaskDecorator

type TaskDecorator func(TaskExecutor) TaskExecutor

TaskDecorator is a decorator for task execution logic.

type TaskExecutor

type TaskExecutor func(Task) error

TaskExecutor is a type for task execution method.

func NewTaskExecutor

func NewTaskExecutor(decorators []TaskDecorator) TaskExecutor

NewTaskExecutor builds chain of decorators.

Directories

Path Synopsis
Package assign contains hashing functions
Package assign contains hashing functions
Package decorators contains decorators for pipeline tasks
Package decorators contains decorators for pipeline tasks
Package pools contains pool and worker structs
Package pools contains pool and worker structs
Package tasks contains different types of tasks
Package tasks contains different types of tasks
Package workers contains different types of workers
Package workers contains different types of workers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL