mb

package module
v3.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2023 License: BSD-2-Clause Imports: 5 Imported by: 7

README

Message batching queue

This package very useful for organizing batch messages.
Can help you create batch inserts to a database for example. Thread safe and well tested.

// create new queue object
batch := mb.New((any)(nil), 0)

// add new message to the queue
batch.Add(msg)

// wait until anybody add message/messages
// will return the slice of all queued messages. ([]T)
messages := batch.Wait(ctx)

// wait until count of messages will be more than 10
// if we have more than 100 messages, will be returned only 100
messages := batch.NewCond().WithMin(10).WithMax(100).Wait(ctx)

// when we have 0 messages returned that means the queue is closed.
if len(messages) == 0 {
	return
}

// close queue
// if the queue has messages all receivers will get remaining data.
batch.Close()
Docs

https://godoc.org/github.com/cheggaaa/mb/v3

Installation

go get -u github.com/cheggaaa/mb/v3

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cheggaaa/mb/v3"
)

func main() {
	ctx := context.Background()

	// create the queue with 10 items capacity
	q := mb.New[int](10)

	// create the channel for showing when all work will be done
	done := make(chan bool)

	// start two workers
	go worker(ctx, "first", q, done)
	go worker(ctx, "second", q, done)

	// start two publishers
	go publisher(ctx, "first", q)
	go publisher(ctx, "second", q)

	// give time to work
	time.Sleep(time.Second)

	// close the queue
	q.Close()

	// and wait until all sent messages will be processed
	for i := 0; i < 2; i++ {
		<-done
	}
}

func publisher(ctx context.Context, name string, q *mb.MB[string]) {
	fmt.Printf("Publisher %s: started\n", name)
	var i int
	for {
		// will sending name and counter
		msg := fmt.Sprintf("%s - %d", name, i)
		// add
		if err := q.Add(ctx, msg); err != nil {
			// non-nil err mean that queue is closed
			break
		}
		// 10 messages per second
		time.Sleep(time.Second / 10)
		i++
	}
	fmt.Printf("Publisher %s: closed\n", name)
}

func worker(ctx context.Context, name string, q *mb.MB[string], done chan bool) {
	fmt.Printf("Worker %s: started\n", name)
	for {
		// getting messages
		msgs, err := q.Wait(ctx)
		if err != nil {
			break
		}

		msgsForPrint := ""
		for _, msg := range msgs {
			msgsForPrint += fmt.Sprintf("\t%s\n", msg)
		}
		fmt.Printf("Worker %s: %d messages received\n%s", name, len(msgs), msgsForPrint)

		// doing working, for example, send messages to remote server
		time.Sleep(time.Second / 3)
	}
	fmt.Printf("Worker %s: closed\n", name)
	done <- true
}

Documentation

Overview

Package mb - queue with message batching feature

Example
var ctx = context.Background()
// bufSize - whole capacity of batcher
var bufSize = 100
// create the new batcher
batcher := mb.New[Item](bufSize)

// start goroutine that will wait items
// it can be a lot of the wait goroutines
var done = make(chan struct{})
go func() {
	defer close(done)
	for {
		// wait items
		items, err := batcher.Wait(context.Background())
		if err != nil {
			fmt.Printf("waiter received error: %v; stop goroutine\n", err)
			return
		}
		// insert batch to db
		// while this func works, the batcher collects new item
		BatchInsert(items)
	}
}()

// add two items to batcher
batcher.Add(ctx, Item{Id: 1}, Item{Id: 2})
time.Sleep(time.Millisecond)
// add more items to batcher
for i := 0; i < 10; i++ {
	// it's safe to call Add from other goroutines
	batcher.Add(ctx, Item{Id: i + 3})
}

// close batcher
batcher.Close()
// and wait until inserter exits
<-done
Output:

inserted 2 items
inserted 10 items
waiter received error: mb: MB closed; stop goroutine
Example (WithTimeLimit)
var ctx = context.Background()
// bufSize - whole capacity of batcher
var bufSize = 100
// create the new batcher
batcher := mb.New[Item](bufSize)

// start goroutine that will wait items
// it can be a lot of the wait goroutines
var done = make(chan struct{})
go func() {
	defer close(done)
	ctxWithTimeLimit := mb.CtxWithTimeLimit(ctx, time.Millisecond*200)
	cond := batcher.NewCond().WithMin(10).WithMax(15)
	for {
		// get at least 10 items or after 200 ms get at least 1 item
		items, err := cond.Wait(ctxWithTimeLimit)
		if err != nil {
			fmt.Printf("waiter received error: %v; stop goroutine\n", err)
			return
		}
		// insert batch to db
		// while this func works, the batcher collects new item
		BatchInsert(items)
	}
}()
// add two items to batcher
batcher.Add(ctx, Item{Id: 1}, Item{Id: 2})
time.Sleep(time.Millisecond * 300)
// add more items to batcher
for i := 0; i < 20; i++ {
	// it's safe to call Add from other goroutines
	batcher.Add(ctx, Item{Id: i + 3})
}
time.Sleep(time.Second)
// close batcher
batcher.Close()
// and wait until inserter exits
<-done
Output:

inserted 2 items
inserted 15 items
inserted 5 items
waiter received error: mb: MB closed; stop goroutine

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("mb: MB closed")

ErrClosed is returned when you add message to closed queue

View Source
var ErrOverflowed = errors.New("mb: overflowed")

ErrOverflowed means new messages can't be added until there is free space in the queue

View Source
var ErrTooManyMessages = errors.New("mb: too many messages")

ErrTooManyMessages means that adding more messages (at one call) than the limit

Functions

func CtxWithTimeLimit

func CtxWithTimeLimit(ctx context.Context, timeLimit time.Duration) context.Context

CtxWithTimeLimit makes child context with given timeLimit duration This context can be passed to all Wait methods. When a given min param can't be achieved within a time limit then a min param will be reseted

Types

type MB

type MB[T any] struct {
	// contains filtered or unexported fields
}

MB - message batching object

func New

func New[T any](size int) *MB[T]

New returns a new MB with given queue size. size <= 0 means unlimited

func (*MB[T]) Add

func (mb *MB[T]) Add(ctx context.Context, msgs ...T) (err error)

Add - adds new messages to queue. When queue is closed - returning ErrClosed When count messages bigger then queue size - returning ErrTooManyMessages When the queue is full - wait until will free place

func (*MB[T]) Close

func (mb *MB[T]) Close() (err error)

Close closes the queue All added messages will be available for active Wait When queue is paused, messages do not be released for Wait (use GetAll for fetching them)

func (*MB[T]) GetAll

func (mb *MB[T]) GetAll() (msgs []T)

GetAll return all messages and flush queue Works on closed queue

func (*MB[T]) Len

func (mb *MB[T]) Len() (l int)

Len returning current size of queue

func (*MB[T]) NewCond

func (mb *MB[T]) NewCond() WaitCond[T]

NewCond creates new condition object

func (*MB[T]) Pause

func (mb *MB[T]) Pause()

Pause lock all "Wait" routines until call Resume

func (*MB[T]) Resume

func (mb *MB[T]) Resume()

Resume release all "Wait" routines

func (*MB[T]) Stats

func (mb *MB[T]) Stats() (addCount, addMsgsCount, getCount, getMsgsCount int64)

Stats returning current statistic of queue usage addCount - count of calls Add addMsgsCount - count of added messages getCount - count of calls Wait getMsgsCount - count of issued messages

func (*MB[T]) TryAdd

func (mb *MB[T]) TryAdd(msgs ...T) (err error)

TryAdd - adds new messages to queue. When queue is closed - returning ErrClosed When count messages bigger then queue size - returning ErrTooManyMessages When the queue is full - returning ErrOverflowed

func (*MB[T]) Wait

func (mb *MB[T]) Wait(ctx context.Context) (msgs []T, err error)

Wait until anybody add message Returning array of accumulated messages

func (*MB[T]) WaitCond

func (mb *MB[T]) WaitCond(ctx context.Context, cond WaitCond[T]) (msgs []T, err error)

WaitCond waits new messages with given conditions

func (*MB[T]) WaitOne

func (mb *MB[T]) WaitOne(ctx context.Context) (msg T, err error)

WaitOne waits one message

type WaitCond

type WaitCond[T any] struct {
	Priority float64
	Min      int
	Max      int
	Filter   func(v T) bool
	// contains filtered or unexported fields
}

WaitCond describes condition for messages

func (WaitCond[T]) Wait

func (wc WaitCond[T]) Wait(ctx context.Context) (msgs []T, err error)

Wait waits messages with defined condition

func (WaitCond[T]) WaitOne

func (wc WaitCond[T]) WaitOne(ctx context.Context) (msg T, err error)

WaitOne waits one message with defined condition

func (WaitCond[T]) WithFilter

func (wc WaitCond[T]) WithFilter(f func(v T) bool) WaitCond[T]

WithFilter adds filter to conditions filter function should return true for acceptable message and false for unacceptable

func (WaitCond[T]) WithMax

func (wc WaitCond[T]) WithMax(max int) WaitCond[T]

WithMax adds max to conditions

func (WaitCond[T]) WithMin

func (wc WaitCond[T]) WithMin(min int) WaitCond[T]

WithMin adds min to conditions

func (WaitCond[T]) WithPriority

func (wc WaitCond[T]) WithPriority(priority float64) WaitCond[T]

WithPriority adds priority to conditions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL