batchqueue

package module
v0.0.0-...-d6d61e7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2021 License: MIT Imports: 2 Imported by: 0

README

batchqueue

GoDoc reference example GitHub license

A batchqueue is an in-memory concurrency-safe message queue by enqueueing and dequeueing a batch of messages.

A batchqueue shouldn't be used for unstable message-producing situations, like network packets. Because it won't commit the local enqueueing messages to the batchqueue when the local enqueueing cache hasn't been filled up. Or you can flush them with a timer.

Enqueue

Enqueue pushes a value to its local cache.

When its local enqueueing cache is nil, it gets a local cache from batchqueue's freelist.

When its local enqueueing cache is filled up, it commits the local cache to batchqueue's workingq.

Dequeue

Dequeue pops a value from its local cache.

When its local dequeueing cache is nil, it gets a local cache from batchqueue's workingq.

When its local dequeueing cache becomes empty, it returns the local cache to batchqueue's freelist.

Usage

go get github.com/Asphaltt/batchqueue

See examples/pubsub.go:

package main

import (
	"fmt"
	"sync"

	bq "github.com/Asphaltt/batchqueue"
)

func main() {
	b := bq.NewBatch(8)
	var wg sync.WaitGroup
	wg.Add(128)
	// produce 128 messages
	producing(b.GetQueue(), 128)
	// start 8 goroutines to consume 128 messages
	for i := 0; i < 8; i++ {
		go func() {
			consuming(b.GetQueue(), 16)
			wg.Add(-16)
		}()
	}
	wg.Wait()
}

func consuming(q bq.Queue, n int) {
	for i := 0; i < n; i++ {
		v := q.Dequeue()
		fmt.Println(v)
	}
}

func producing(q bq.Queue, n int) {
	for i := 0; i < n; i++ {
		q.Enqueue(i)
	}
	q.Flush()
}

Documentation

Index

Constants

View Source
const (
	// DefaultQueueCapacity is the default capacity of local cache.
	// The specified capacity should not be less than it.
	DefaultQueueCapacity = 8
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch interface {
	// GetQueue creates a new Queue instance for a goroutine.
	//
	// For example:
	//
	//    func consuming(q Queue) {
	//        for v := q.Dequeue() {
	//            ...
	//        }
	//    }
	//
	//    func producing(q Queue) {
	//        for i:=0; i<100; i++ {
	//            q.Enqueue(i)
	//        }
	//        q.Flush() // flushes the left caching values
	//    }
	//
	//    func main() {
	//        b := NewBatch()
	//        go consuming(b.GetQueue())
	// 	      go producing(b.GetQueue())
	//        ...
	//    }
	GetQueue() Queue
}

A Batch is an in-memory concurrency-safe message queue with local cache.

func NewBatch

func NewBatch(capacity int) Batch

NewBatch creates a batchqueue with local cache capacity.

type Queue

type Queue interface {
	// Enqueue pushes a value to local cache. When the local cache
	// is full, it'll be commited to the batchqueue.
	Enqueue(v interface{})

	// Dequeue pops a value from local cache. When the local cache
	// is empty, it gets one local cache from batchqueue.
	Dequeue() (v interface{})

	// Flush forcely commits local cache to the batchqueue.
	Flush()
}

A Queue is a message queue with local cache. When its local cache is not full, it won't commit the caching values to the batchqueue. Or it can forcely commit the caching values by Flush.

A Queue must be used by only one goroutine, because it enqueues or dequeues a value without locking.

Using local cache for less locking.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL