prosumer

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2020 License: MIT Imports: 5 Imported by: 2

README

Prosumer

GoDoc Build Status codecov

A producer-consumer solution for Golang.

Motivation

Go is popular for its simplicity, builtin support for concurrency, light-weight goroutine. However, there are some tricks(here and there) when to coordinate among different goroutines, especially when implements producer-consumer pattern using buffered chan.

I don't want cover details here, guys who interested can check links above. But following cannot be emphasized too much:

Close chan is a sender -> receiver communication, not the reverse. #11344

Feature

  • Inner buffer queue support RejectPolicy
  • Graceful close implemented

If you have any suggestions/questions, open a PR.

Usage

There is a quickstart example , view the GoDoc for details.

License

MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDiscard       = errors.New("discard current element")
	ErrDiscardOldest = errors.New("discard oldest element")
)

Functions

This section is empty.

Types

type Callback

type Callback func([]Element, error)

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config defines params for Coordinator

func NewConfig

func NewConfig(c Consumer, opts ...Option) Config

NewConfig returns a config with well-defined defaults Warn: default rejectPolicy is Block.

type Consumer

type Consumer func(lst []Element) error

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator implements a producer-consumer workflow. Put() add new elements into inner buffer queue, and will be processed by Consumer.

func NewCoordinator

func NewCoordinator(config Config) Coordinator

func (Coordinator) Close

func (c Coordinator) Close(graceful bool) error

Close closes the Coordinator, no more element can be put any more. It can be graceful, which means: 1. blocking 2. all remaining elements in buffer queue will make sure to be consumed.

func (Coordinator) Put

func (c Coordinator) Put(ctx context.Context, e Element) ([]Element, error)

Put new element into inner buffer queue. It return error when inner buffer queue is full, and elements failed putting to queue is the first return value. Due to different rejectPolicy, multiple elements may be discarded before current element put successfully. Common usages pattern:

 discarded, err := c.Put(e)
 if err != nil {
	  fmt.Errorf("discarded elements %+v for err %v", discarded, err)
 }

func (Coordinator) RemainingCapacity

func (c Coordinator) RemainingCapacity() int

RemainingCapacity return how many elements inner buffer queue can hold.

func (Coordinator) Start

func (c Coordinator) Start()

Start workers to consume elements from queue.

type Element

type Element interface{}

Consumer process elements from queue

type Option

type Option func(*Config)

Option constructs a Config

func SetBatchInterval

func SetBatchInterval(interval time.Duration) Option

func SetBatchSize

func SetBatchSize(batchSize int) Option

func SetBufferSize

func SetBufferSize(bufferSize int) Option

SetBufferSize defines inner buffer queue's size

func SetCallback

func SetCallback(cb Callback) Option

SetCallback defines callback invoked with elements and err returned from consumer

func SetNumConsumer

func SetNumConsumer(numConsumer int) Option

func SetRejectPolicy

func SetRejectPolicy(rp RejectPolicy) Option

SetRejectPolicy defines which elements get discarded when the queue is full

type RejectPolicy

type RejectPolicy int

rejectPolicy control which elements get discarded when the queue is full

const (
	// Block current goroutine, no elements discarded
	Block RejectPolicy = iota
	// Discard current element
	Discard
	// DiscardOldest remove the oldest to make room for new element
	DiscardOldest
)

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL