buffer

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2023 License: MIT Imports: 6 Imported by: 1

README

async-buffer

Go codecov

The async-buffer buffer data that can be flushed when reach threshold or duration limit. It is multi-goroutinue safe.

It only support go1.18 or later

Why you need it?

An Usecase:

You have a message queue subscriber server.

The Server receives messages one by one and inserts them into your database,

But there is a big performance gap between one by one insertion and batch insertion to your database.

So that to use async-buffer to buffer data then find timing to batch insert them.

Installation

go get -u github.com/woorui/async-buffer

Documents

Complete doc here: https://pkg.go.dev/github.com/woorui/async-buffer

Quick start

The Write, Flush, Close api are goroutinue-safed.

package main

import (
	"context"
	"fmt"
	"time"

	buffer "github.com/woorui/async-buffer"
)

// pp implements Flusher interface
type pp struct{}

func (p *pp) Flush(strs []string) error {
	return print(strs)
}

func print(strs []string) error {
	fmt.Printf("print: %v \n", strs)
	return nil
}

func main() {
	// can also call buffer.FlushFunc` to adapt a function to Flusher
	buf := buffer.New[string](&pp{}, buffer.Option[string]{
		Threshold:     5,
		FlushInterval: 3 * time.Second,
		WriteTimeout:  time.Second,
		FlushTimeout:  time.Second,
		ErrHandler:    func(err error, t []string) { fmt.Printf("err: %v, ele: %v", err, t) },
	})

	// data maybe loss if Close() is not be called
	defer buf.Close()

	// 1. flush at threshold
	buf.Write("a", "b", "c", "d", "e", "f")

	// 2. time to flush automatically
	buf.Write("aaaaa")
	buf.Write("bbbbb")
	buf.Write("ccccc", "ddddd")
	time.Sleep(5 * time.Second)

	// 3. flush manually and write call `WriteWithContext`
	buf.WriteWithContext(context.Background(), "eeeee", "fffff")
	buf.Flush()
}

License

MIT License

Documentation

Overview

Example
package main

import (
	"fmt"
	"time"

	buffer "github.com/woorui/async-buffer"
)

// pp implements Flusher interface
type pp struct{}

func (p *pp) Flush(strs []string) error {
	return print(strs)
}

func print(strs []string) error {
	for _, s := range strs {
		fmt.Printf("%s ", s)
	}
	return nil
}

func main() {
	// can also call buffer.FlushFunc` to adapt a function to Flusher
	buf := buffer.New[string](&pp{}, buffer.Option[string]{
		Threshold:     1,
		FlushInterval: 3 * time.Second,
		WriteTimeout:  time.Second,
		FlushTimeout:  time.Second,
		ErrHandler:    func(err error, t []string) { fmt.Printf("err: %v, ele: %v", err, t) },
	})
	// data maybe loss if Close() is not be called
	defer buf.Close()

	buf.Write("a", "b", "c", "d", "e", "f")

}
Output:

a b c d e f

Index

Examples

Constants

View Source
const DefaultDataBackupSize = 128

DefaultDataBackupSize is the default size of buffer data backup

Variables

View Source
var (
	// ErrClosed represents a closed buffer
	ErrClosed = errors.New("async-buffer: buffer is closed")
	// ErrWriteTimeout returned if write timeout
	ErrWriteTimeout = errors.New("async-buffer: write timeout")
	// ErrFlushTimeout returned if flush timeout
	ErrFlushTimeout = errors.New("async-buffer: flush timeout")
)

Functions

func DefaultErrHandler added in v0.0.3

func DefaultErrHandler[T any](err error, elements []T)

DefaultErrHandler prints error and the size of elements to stderr.

Types

type Buffer

type Buffer[T any] struct {
	// contains filtered or unexported fields
}

Buffer represents an async buffer.

The Buffer automatically flush data within a cycle flushing is also triggered when the data reaches the specified threshold.

If both Threshold and FlushInterval are setted to zero, Writing is Flushing.

You can also flush data manually by calling `Flush`.

func New

func New[T any](flusher Flusher[T], option Option[T]) *Buffer[T]

New returns the async buffer based on option

func (*Buffer[T]) Close

func (b *Buffer[T]) Close() error

Close stop flushing and handles rest elements.

func (*Buffer[T]) Flush

func (b *Buffer[T]) Flush()

Flush flushs elements once.

func (*Buffer[T]) Write

func (b *Buffer[T]) Write(elements ...T) (int, error)

Write writes elements to buffer, It returns the count the written element and a closed error if buffer was closed.

func (*Buffer[T]) WriteWithContext added in v0.0.3

func (b *Buffer[T]) WriteWithContext(ctx context.Context, elements ...T) (int, error)

WriteWithContext writes elements to buffer, It returns the count the written element and a closed error if buffer was closed.

type FlushFunc added in v0.0.2

type FlushFunc[T any] func(elements []T) error

The FlushFunc is an adapter to allow the use of ordinary functions as a Flusher. FlushFunc(f) is a Flusher that calls f.

func (FlushFunc[T]) Flush added in v0.0.2

func (f FlushFunc[T]) Flush(elements []T) error

Flush calls FlushFunc itself.

type Flusher

type Flusher[T any] interface {
	Flush(elements []T) error
}

Flusher holds FlushFunc, Flusher tell Buffer how to flush data.

type Option added in v0.0.3

type Option[T any] struct {
	// Threshold indicates that the buffer is large enough to trigger flushing,
	// if Threshold is zero, do not judge threshold.
	Threshold uint32
	// WriteTimeout set write timeout, set to zero if a negative, zero means no timeout.
	WriteTimeout time.Duration
	// FlushTimeout flush timeout, set to zero if a negative, zero means no timeout.
	FlushTimeout time.Duration
	// FlushInterval indicates the interval between automatic flushes, set to zero if a negative.
	// There is automatic flushing if zero FlushInterval.
	FlushInterval time.Duration
	// ErrHandler handles errors, print error and the size of elements to stderr in default.
	ErrHandler func(err error, elements []T)
}

Option for New the buffer.

If both Threshold and FlushInterval are set to zero, Writing is Flushing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL