bqueue

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2023 License: MIT Imports: 2 Imported by: 0

README

bqueue Go Report Card Build Status GoDoc Reference

A high-performance batch queue to process items at time intervals or when a batching limit is met.
It is implemented using the go standard library and does not import any third-party libraries.

Features

  • Non-blocking enqueue
    Queue up incoming items without blocking processing.

  • Dispatching by periodic time intervals
    Set a time interval and get batched items after time expires.

  • Dispatching as soon as a batch limit is met
    If a batch is filled before the time interval is up, dispatching is handled immediately.

  • Supports channel and callback
    You can read the OutQueue channel to get batch items, or you can use callback function. See Examples for details.

  • Plain old Go channels
    Implementation relies heavily on channels and is free of mutexes and other bookkeeping techniques.

Install

$ go get -u github.com/wind-c/bqueue

Sample Usage

Dispatch a batch at 1 second intervals or as soon as a batching limit of 64 items is met, if the number of messages is large, increase MaxQueueSize. See examples/ for working code.

import (
  "fmt"
  "log"
  "time"
  "github.com/wind-c/bqueue"
)

// initialize
b := bqueue.NewBatchQueue(&bqueue.Options{
  Interval:      time.Duration(1) * time.Second,
  MaxBatchItems: 64,
  MaxQueueSize:  1024,
})
defer b.Stop()
go b.Start()

// produce some messages
go func() {
  for i:= 0; i < 100; i++ {
    m := fmt.Sprintf("message #%d", i)
    b.Enqueue(m)
  }
}()

// consume the batch
for {
  select {
    case batch := <-b.OutQueue:
      for _, item := range batch {
        s := item.(string)
        // do whatever.
        log.Print(s)
      }
  }
}

Contribute

Improvements, fixes, and feedback are welcome.

MIT license.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchQueue

type BatchQueue struct {
	OutQueue chan []any
	// contains filtered or unexported fields
}

BatchQueue coordinates dispatching of queue items by time intervals or immediately after the batching limit is met.

func NewBatchQueue

func NewBatchQueue(config *Options) *BatchQueue

NewBatchQueue returns an initialized instance of BatchQueue.

func (*BatchQueue) Enqueue

func (b *BatchQueue) Enqueue(item any)

func (*BatchQueue) GetDispatchedCount

func (b *BatchQueue) GetDispatchedCount() int

func (*BatchQueue) Start

func (b *BatchQueue) Start()

Start begins item dispatching.

func (*BatchQueue) Stop

func (b *BatchQueue) Stop()

Stop stops the internal dispatch and listen scheduler.

type Options

type Options struct {
	Interval      time.Duration // wait time when batch quantity is insufficient
	MaxBatchItems int           // maximum items per batch
	MaxQueueSize  int           // maximum queue size
	DequeueFunc   func([]any)   // callback function
}

Options configure time interval and set a batch limit.

Directories

Path Synopsis
examples
basic command
callback command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL