sink

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2022 License: Apache-2.0 Imports: 5 Imported by: 0

README

sink

sink allows you to batch expensive operations using items from different flows.

GitHub Codecov GitHub Workflow Status

Installation

Install using go modules

  go get -u github.com/ormanli/sink

Usage/Examples

Configuration
cfg := sink.Config[int, int]{
    MaxItemsForBatching:   10, // Maximum number of items to batch inputs, mandatory, can't be less than 1.
    MaxTimeoutForBatching: 10 * time.Millisecond, // Maximum time to wait for inputs, mandatory, can't be less than 1 millisecond.
    AddPoolSize:           10, // Add operation goroutine pool size, mandatory, can't be less than 1.
    CallbackPoolSize:      10, // Callback operation goroutine pool size, mandatory, can't be less than 1.
    ExpensivePoolSize:     10, // Expensive operation goroutine pool size, mandatory, can't be less than 1.
    ExpensiveOperation: func(i []int) ([]int, error) {
        time.Sleep(time.Second)

        return i, nil
    }, // Actual function that is called with batched items, mandatory.
    Logger:                 customLogger, // Logger is optional, if not provided log package used.
}

Sink will either wait until MaxItemsForBatching of items to arrive or wait until MaxTimeoutForBatching to start processing batched items.

Sink

Processes given inputs by provided configuration. When it is no longer required, stop sink by calling Close method.

s, err := sink.NewSink[dummy, dummy](cfg)
defer s.Close()

_, err = s.Add(dummy{i: 10})
Sink With Context

Processes given inputs by provided configuration. It will run until the given context is canceled.

ctx, cncl := context.WithCancel(context.Background())
s, err := sink.NewSinkWithContext[dummy, dummy](ctx, cfg)
defer cncl()

_, err = s.Add(dummy{i: 10})

Running Tests

To run tests, run the following command

  make test

Authors

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidMaxItemsForBatching is an error.
	ErrInvalidMaxItemsForBatching = errors.New("max items for batching must be more than zero")
	// ErrNilExpensiveOperation is an error.
	ErrNilExpensiveOperation = errors.New("there is not expensive operation")
	// ErrInvalidMaxTimeoutForBatching is an error.
	ErrInvalidMaxTimeoutForBatching = errors.New("max timeout for matching must be more than 1 millisecond")
	// ErrInvalidAddPoolSize is an error.
	ErrInvalidAddPoolSize = errors.New("add pool size must be more than zero")
	// ErrInvalidCallbackPoolSize is an error.
	ErrInvalidCallbackPoolSize = errors.New("callback pool size must be more than zero")
	// ErrInvalidExpensivePoolSize is an error.
	ErrInvalidExpensivePoolSize = errors.New("expensive pool size must be more than zero")
)

Functions

This section is empty.

Types

type Config

type Config[I, O any] struct {
	MaxItemsForBatching   int
	MaxTimeoutForBatching time.Duration

	Logger Logger

	AddPoolSize        int
	CallbackPoolSize   int
	ExpensivePoolSize  int
	ExpensiveOperation expensiveOperation[I, O]
}

Config holds configuration for the sink.

type Logger

type Logger interface {
	ants.Logger
}

Logger is the logging interface.

type Sink

type Sink[I, O any] struct {
	// contains filtered or unexported fields
}

Sink is a struct to process different request simultaneously.

func NewSink

func NewSink[I, O any](config Config[I, O]) (*Sink[I, O], error)

NewSink initializes a sink with the provided config.

func (*Sink[I, O]) Add

func (s *Sink[I, O]) Add(value I) (O, error)

Add adds a value to the sink and waits for result.

func (*Sink[I, O]) Close

func (s *Sink[I, O]) Close()

Close closes sink to stop processing.

type SinkWithContext

type SinkWithContext[I, O any] struct {
	// contains filtered or unexported fields
}

SinkWithContext is a struct to process different request simultaneously.

func NewSinkWithContext

func NewSinkWithContext[I, O any](ctx context.Context, config Config[I, O]) (*SinkWithContext[I, O], error)

NewSinkWithContext initializes a sink with the provided config and context.

func (*SinkWithContext[I, O]) Add

func (s *SinkWithContext[I, O]) Add(value I) (O, error)

Add adds a value to the sink and waits for result.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL