bloque

package module
v0.0.0-...-bd88a8b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2019 License: MIT Imports: 1 Imported by: 0

README

BloQue, a Block Queue of stacked items

Wait, what? What the title means is that this data structure is a block queue, that is, blocks are FIFO (First In First Out), of stacked items, that is, items are a stack LIFO (Last In First Out). A block is just a bunch of items.

Why is this is useful?

A use case I had was required adding items one by one to a data structure to act like a cache, and once enough of them where inserted, or after a pre-specified time had passed, I needed to get the first inserted X elements to batch-process them (send to a data pipeline, or bulk-write to a DB). A normal queue would do, but then I would need to loop over the first X elements, popping them one at a time. Add in concurrency, and you need to hold a lock all that looping time.

With this BloQue data structure you can just add elements one at a time, and once you are ready to consume them you just pop the front block of the queue. That's a very fast operation (just touching a few pointers) so it can be done holding the lock for very little time (or if there's only one producer, no locking needed), and then you can keep adding elements while processing this front block elsewhere.

The other "extra" stack operations such as Pop and Peek items (from the back) are added for convenience, as well.

I haven't added full deque API on blocks (from the back) and items (from the front) because it gets trickier and I don't see the use case for it, but please raise an issue if needed :)

Example usage

package main

import (
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/antoniomo/bloque"
)

const (
	elementsPerBlock = 1024
	// Just to show the last block with less than full size
	itemsToProcess = (elementsPerBlock * 10) - 3
)

func consumer(wg *sync.WaitGroup, readyBlocks <-chan bloque.BlockT) {
	// Lets keep a counter of processed blocks
	processedBlocks := 0
	totalProcessedItems := 0

	for blk := range readyBlocks {
		fmt.Printf("Processing BloQue %d, items %d\n", processedBlocks, len(blk))
		processedBlocks++
		totalProcessedItems += len(blk)
		for _, item := range blk {
			// Is this what we expect?
			_, ok := item.(int)
			if !ok {
				log.Panic("oh no, wrong type assertion!")
			}
			// Simulate consumption/processing of the item
			time.Sleep(time.Nanosecond)
		}
	}

	wg.Done()
}

func producer(wg *sync.WaitGroup, readyBlocks chan bloque.BlockT) {

	// Lets prepare a BloQue with a custom elementsPerBlock
	b := bloque.New(bloque.BlockSize(elementsPerBlock))

	for i := 0; i < itemsToProcess; i++ {
		if isCompleted := b.PushBackItem(i); isCompleted {
			// Block completed, send to the processing channel
			frontBlock, _ := b.PopFrontBlock()
			readyBlocks <- frontBlock
		}
	}
	// Last block wasn't completed but lets say we want to process it right
	// away without waiting for more data
	frontBlock, ok := b.PopFrontBlock()
	if !ok {
		log.Panic("ouch, there should be items here")
	}
	readyBlocks <- frontBlock

	close(readyBlocks) // All done, close channel

	wg.Done()
}

func main() {
	fmt.Printf("Processing %d items, block size %d\n", itemsToProcess, elementsPerBlock)

	// Channel to write ready blocks
	readyBlocks := make(chan bloque.BlockT)

	wg := &sync.WaitGroup{}
	wg.Add(2)
	go producer(wg, readyBlocks)
	go consumer(wg, readyBlocks)

	wg.Wait()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BloQue

type BloQue struct {
	// contains filtered or unexported fields
}

BloQue represents a block queue with push/pop items on the back interface

It can be thought of as an items stack (as you push items on the back) while allowing FIFO semantics for blocks (that is, it is a block queue allowing you to pop out the first block). See the README for use cases.

Code is inspired in the deque implementation at juju: https://github.com/juju/utils/blob/master/deque/deque.go

func New

func New(opt ...Option) *BloQue

New returns a new BloQue with any options

func NewWithMaxBlocks

func NewWithMaxBlocks(maxBlocks int, opt ...Option) *BloQue

NewWithMaxBlocks returns a new BloQue with a length (blocks) cap

func NewWithMaxLength

func NewWithMaxLength(maxLength int, opt ...Option) *BloQue

NewWithMaxLength returns a new BloQue with a length (items) cap

func (*BloQue) Len

func (b *BloQue) Len() int

Len returns the amount of items on a BloQue

func (*BloQue) NumBlocks

func (b *BloQue) NumBlocks() int

NumBlocks returns the amount of blocks on a BloQue

Note that an empty BloQue has 1 block allocated and ready to use

func (*BloQue) PeekBackItem

func (b *BloQue) PeekBackItem() (interface{}, bool)

PeekBackItem ...

func (*BloQue) PopBackItem

func (b *BloQue) PopBackItem() (interface{}, bool)

PopBackItem ...

func (*BloQue) PopFrontBlock

func (b *BloQue) PopFrontBlock() (BlockT, bool)

PopFrontBlock ...

func (*BloQue) PushBackItem

func (b *BloQue) PushBackItem(item interface{}) bool

PushBackItem ...

type BlockT

type BlockT []interface{}

BlockT is the block type

type Option

type Option func(*BloQue)

Option ...

func BlockSize

func BlockSize(blockSize int) Option

BlockSize ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL