buflice

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2021 License: MIT Imports: 2 Imported by: 0

README

buflice

This package need to create buffered slice that can be flushed when reach size or duration limit

When it can be needed?

Example: You have a worker for rabbitmq that receives jobs from queue. You receive them one by one and process it. But sometimes you need to accumulate data from jobs for batch processing in database.

Website | Blog

license GoDoc Coverage Status Build Status Go Report Card

Installation

go get -u github.com/riftbit/buflice

Example (dirty example)

package main

import (
	"log"
	"sync"
	"time"

	"github.com/rb-pkg/buflice"
)

type Book struct {
	Author string
}

func flushProcessor(chFlush chan []interface{}, chDone chan struct{}, wait *sync.WaitGroup) {
	for {
		select {
		case data := <-chFlush:
			wait.Add(1)
			log.Printf("%+v", data)
			wait.Done()
		case <-chDone:
			log.Println("Finished flushProcessor")
			return
		}
	}
}

func main() {
	chFlush := make(chan []interface{})
	chDone := make(chan struct{})
	wait := sync.WaitGroup{}

	bfl := buflice.NewBuflice(10, 1000*time.Millisecond, chFlush)

	go flushProcessor(chFlush, chDone, &wait)
	bfl.Start()

	bfl.Add(Book{Author: "Author #1"})
	bfl.Add(Book{Author: "Author #2"})
	bfl.Add(Book{Author: "Author #3"})
	bfl.Add(Book{Author: "Author #4"})
	bfl.Add(Book{Author: "Author #5"})
	time.Sleep(1111 * time.Millisecond)
	bfl.Add(Book{Author: "Author #6"})
	bfl.Add(Book{Author: "Author #7"})
	bfl.Add(Book{Author: "Author #8"})
	bfl.Add(Book{Author: "Author #9"})
	bfl.Add(Book{Author: "Author #10"})
	
	err := bfl.Close()
	if err != nil {
		log.Fatalln(err)
    }

	wait.Wait()
	chDone <- struct{}{}

}

Will print:

2019/09/03 14:56:28 [Record #1 Record #2 Record #3 Record #4 Record #5 Record #6]
2019/09/03 14:56:28 [Record #7 Record #8 Record #9 Record #10]
2019/09/03 14:56:28 Finished flushProcessor

Credits

Thanks to:

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Buflice

type Buflice struct {
	// contains filtered or unexported fields
}

Buflice main struct that contains configs and methods

func NewBuflice

func NewBuflice(size int, flushDuration time.Duration, notifyChannel chan []interface{}) *Buflice

NewBuflice method to initiate Buflice and get it

func (*Buflice) Add

func (bs *Buflice) Add(element interface{})

Add is for adding elements

func (*Buflice) Close

func (bs *Buflice) Close() error

Close is for close time ticker, clean slice data and slice position

func (*Buflice) Flush

func (bs *Buflice) Flush()

Flush is for manual flush data to channel

func (*Buflice) GetCap

func (bs *Buflice) GetCap() int

GetCap function to get max batch size

func (*Buflice) GetCurrentLen

func (bs *Buflice) GetCurrentLen() int

GetCurrentLen function to get current batch size

func (*Buflice) Start

func (bs *Buflice) Start()

Start starts ticker and serving data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL