buffer

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2022 License: MIT Imports: 4 Imported by: 1

README

async-buffer

Go codecov

The async-buffer buffer data that can be flushed when reach threshold or duration limit. It is multi-goroutinue safe.

It only support go1.18 or later

Why you need it?

An Usecase:

You have a message queue subscriber server.

The Server receive message one by one and insert then your database,

But there is a big performance gap that between one by one insertion and batch insertion to your database.

So that to use async-buffer to buffer data then find a timing to batch insert them.

Installation

go get -u github.com/woorui/async-buffer

Quick start

The Write, Flush, Close api are goroutinue-safed.

package main

import (
	"errors"
	"fmt"
	"time"

	buffer "github.com/woorui/async-buffer"
)

type printer struct{}

func (p *printer) Flush(strs ...string) error {
	return print(strs...)
}

func print(strs ...string) error {
	fmt.Printf("printer flush elements: %v, flush size: %d \n", strs, len(strs))
	return nil
}

func main() {
	buf, errch := buffer.New[string](6, 3*time.Second, &printer{})
	// can also call buffer.FlushFunc to adapt the Flusher, 
	// code as below:
	// buf, errch := buffer.New[string](6, 3*time.Second, buffer.FlushFunc[string](print))
	defer buf.Close()

	// If you don't care about the refresh error
	// and the refresh error elements, you can ignore them.
	go errHandle(errch)

	// 1. flush at threshold
	buf.Write("a", "b", "c", "d", "e", "f")
	// Output
	// printer flush elements: [a b c d e f], flush size: 6

	// 2. time to flush automatically
	buf.Write("aaaaa")
	buf.Write("bbbbb")
	buf.Write("ccccc", "ddddd")
	time.Sleep(5 * time.Second)
	// Output
	// printer flush elements: [aaaaa bbbbb ccccc ddddd], flush size: 4

	// 3. flush manually
	buf.Write("eeeee", "fffff")
	buf.Flush()
	// Output
	// printer flush elements: [eeeee fffff], flush size: 2

	// waiting...
	select {}
}

func errHandle(errch <-chan error) {
	for err := range errch {
		if se := new(buffer.ErrFlush[string]); errors.As(err, se) {
			fmt.Printf("flush err backup %v \n", se.Backup)
		} else {
			fmt.Printf("flush err %v \n", err)
		}
	}
}

License

MIT License

Documentation

Index

Constants

View Source
const DefaultDataBackupSize = 128

DefaultDataBackupSize is the default size of buffer data backup

Variables

View Source
var (
	// ErrClosed represents a closed buffer
	ErrClosed = errors.New("async-buffer: buffer is closed")
)

Functions

func NewErrFlush

func NewErrFlush[T any](err error, elements []T) error

NewErrFlush return ErrFlush, error is flush error, elements is elements that not be handled.

Types

type Buffer

type Buffer[T any] struct {
	// contains filtered or unexported fields
}

Buffer represents an async buffer

The Buffer automatically flush data within a cycle flushing is also triggered when the data reaches the specified threshold

You can also flush data manually

func New

func New[T any](threshold uint32, flushInterval time.Duration, flusher Flusher[T]) (*Buffer[T], <-chan error)

New return the async buffer

threshold indicates that the buffer is large enough to trigger flushing, if threshold is zero, do not judge threshold. flushInterval indicates the interval between automatic flushes, flusher is the Flusher that flushes outputs the buffer to a permanent destination.

error returned is an error channel that holds errors generated during the flush process. You can subscribe to this channel if you want handle flush errors. using `errors.As(err, ErrFlush)` to get elements that not be flushed.

func (*Buffer[T]) Close

func (b *Buffer[T]) Close() error

Close stop flushing and handles rest elements.

func (*Buffer[T]) Flush

func (b *Buffer[T]) Flush()

Flush flushs elements once.

func (*Buffer[T]) Write

func (b *Buffer[T]) Write(elements ...T) (int, error)

Write writes elements to buffer, It returns the count the written element and a closed error if buffer was closed.

type ErrFlush

type ErrFlush[T any] struct {
	Backup []T
	// contains filtered or unexported fields
}

ErrFlush is returned form `Write` when automatic flushing error,

func (ErrFlush[T]) Error

func (e ErrFlush[T]) Error() string

type FlushFunc added in v0.0.2

type FlushFunc[T any] func(elements ...T) error

The FlushFunc is an adapter to allow the use of ordinary functions as a Flusher. FlushFunc(f) is a Flusher that calls f.

func (FlushFunc[T]) Flush added in v0.0.2

func (f FlushFunc[T]) Flush(elements ...T) error

Flush calls f(ctx,m)

type Flusher

type Flusher[T any] interface {
	Flush(elements ...T) error
}

Flusher hold FlushFunc, Flusher tell Buffer how to flush data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL