batch

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2021 License: MIT Imports: 9 Imported by: 0

README

go-batch

go-batch is a batch processing library written in Go. The process execution has multiple stages to release a Batch to the client.

Features

  1. Client can use this library as an asynchronous batch processing for their application use case.
  2. There are no restrictions on applying batch processing matrices to the library. The client can define the maximum no of items for a batch using the BatchOptions.
  3. The library has a Workerpool that will faster the batch processing in concurrent scenarios.

Demo

asciicast

Stages

  1. Batch Reader receives the resource payload from the client and marshals the payload item into the BatchItem object.

    type BatchItems struct {
        Id      int
        BatchNo int
        Item    interface{}
    } 
    
    
  2. BatchProducer has a Watcher channel that receives the marshal payload from the Batch reader. Watcher marks each BatchItem with a BatchNo and adds it to the []BatchItems array. After the batch itemCounter++ increases to the MaxItems [DefaultMaxItems: 100], the Batch gets releases to the Consumer callback function.

  3. BatchConsumer has a ConsumerFunc that gets invoke by BatchProducer as a callback function to send the prepared []BatchItems arrays. Then, the Consumer channel sends the []BatchItems to the Worker channel.

  4. Workerline is the sync.WaitGroup synchronizes the workers to send the []BatchItems to the supply chain.

  5. BatchSupplyChannel works as a bidirectional channel that requests for the []BatchItems to the Workerline and gets in the response.

  6. ClientSupplyChannel is the delivery channel that works as a Supply line to sends the []BatchItems and the client receives by listening to the channel.

Go Docs

Documentation at pkg.go.dev

Installation

go get github.com/Deeptiman/go-batch

Example

b := batch.NewBatch(batch.WithMaxItems(100))
go b.StartBatchProcessing()

for i := 1; i <= 1000; i++ {
     b.Item <- &Resources{
         id:   i,
         name: fmt.Sprintf("%s%d", "R-", i),
         flag: false,
     }
 }
b.Close() 

Note

  • In this version release, the library doesn't support starting concurrent BatchProcessing sessions.

License

This project is licensed under the MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultMaxItems = uint64(100)                     // maximum no of items packed inside a Batch
	DefaultMaxWait  = time.Duration(30) * time.Second //seconds
	DefaultBatchNo  = int32(1)
)
View Source
var (
	DefaultWorkerPool = 10
)

Functions

This section is empty.

Types

type Batch

type Batch struct {
	Item      chan interface{}
	Id        int
	Semaphore *Semaphore
	Islocked  bool
	Producer  *BatchProducer
	Consumer  *BatchConsumer
	Log       *log.Logger
}

Batch struct defines the structure payload for a Batch.

	Item: channel that contains the Resources object from the client.
	Id: Each item that a client send for the processing marked with Id.
	Semaphore: The ReadWrite locks handle by the Semaphore object, it helps to synchronize the batch processing session.
	Islocked: Whenever the batch processing session starts, Islocked changes to [true], so it will restrict the concurrent batch processing.
 Producer: The BatchItem object send to the Producer for further processing.
 Consumer: The Consumer arranges the prepared []BatchItems for the Workerline.
 Log: Batch processing library uses "github.com/sirupsen/logrus" as logging tool.

func NewBatch

func NewBatch(opts ...BatchOptions) *Batch

NewBatch creates a new Batch object with BatchProducer & BatchConsumer. The BatchOptions sets the MaxItems for a batch and maximum wait time for a batch to complete set by MaxWait.

func (*Batch) Close

func (b *Batch) Close()

Close is the exit function to terminate the batch processing.

func (*Batch) ReadItems

func (b *Batch) ReadItems()

ReadItems function will run infinitely to listen to the Resource channel and the received object marshaled with BatchItem and then send to the Producer Watcher channel for further processing.

func (*Batch) StartBatchProcessing

func (b *Batch) StartBatchProcessing()

StartBatchProcessing function to begin the BatchProcessing library and to start the Producer/ Consumer listeners. The ReadItems goroutine will receive the item from a source that keeps listening infinitely.

func (*Batch) Stop

func (b *Batch) Stop()

Stop to run StopProducer/StopConsumer goroutines to quit the execution.

func (*Batch) StopProducer

func (b *Batch) StopProducer()

StopProducer to exit the Producer line.

type BatchConsumer

type BatchConsumer struct {
	ConsumerCh    chan []BatchItems
	BatchWorkerCh chan []BatchItems
	Supply        *BatchSupply
	Workerline    *sync.WaitGroup
	TerminateCh   chan os.Signal
	Quit          chan bool
	Log           *log.Logger
}

BatchConsumer struct defines the Consumer line for the Batch processing. It has the Workerline that manages the concurrent scenarios where a large set of []BatchItems needs to be send to client.

	ConsumerCh: It receives the []BatchItems from the Producer line.
	BatchWorkerCh: It has set of workers that manages the concurrent work under Workerline [sync.WaitGroup].
	Supply: The final chain in the batch processing that sends the []BatchItems to the client.
 Workerline: It's WaitGroup that synchronizes the workers to send the []BatchItems to the supply chain.
 TerminateCh: To handle the graceful shutdown, this channel will listen to the os.Signal and terminate processing accordingly.
 Quit: It's the exit channel for the Consumer to end the processing
 Log: Batch processing library uses "github.com/sirupsen/logrus" as logging tool.

func NewBatchConsumer

func NewBatchConsumer() *BatchConsumer

NewBatchConsumer defines several types of production channels those are works at a different stages to release a Batch to the client. The ConsumerCh received the Batch and send it to the Workers channel. Then, the Workerline arranges the worker under a waitGroup to release the Batch to the Supply channel.

The BatchSupply has a bidirectional channel that requests a Batch from the Worker channel and receives a Batch via response channel. Also, BatchSupply has a Client channel that sends the released Batch to the Client. The client needs to listen to the ClientSupplyCh to receive batch instantly.

func (*BatchConsumer) ConsumerBatch

func (c *BatchConsumer) ConsumerBatch(ctx context.Context)

ConsumerBatch has the <-c.ConsumerCh receive channel to receives the newly created []BatchItems. After that, the []BatchItems gets send to the WorkerCh to send the batch item to the supply line.

This also supports the termination of the Consumer line in case of graceful shutdown or to exit the batch processing forcefully.

<-ctx.Done(): get called during a graceful shutdown scenarios and closes the worker channel
<-c.Quit: Exit the batch processing during a forceful request from the client.

func (*BatchConsumer) ConsumerFunc

func (c *BatchConsumer) ConsumerFunc(items []BatchItems)

ConsumerFunc works as a callback function for the Producer line to send the released []BatchItems to the Consumer and then the batch items send to the ConsumerCh channel for further processing.

func (*BatchConsumer) GetBatchSupply

func (c *BatchConsumer) GetBatchSupply()

GetBatchSupply request the WorkerChannel for the released []BatchItems. The BatchSupplyChannel works as a bidirectional channel to request/response for the final []BatchItems product. The ClientSupplyChannel will send the []BatchItems to the client.

func (*BatchConsumer) Shutdown

func (c *BatchConsumer) Shutdown()

func (*BatchConsumer) StartConsumer

func (c *BatchConsumer) StartConsumer()

StartConsumer will create the Wokerpool [DefaultWorkerPool: 10] to handle the large set of []BatchItems that gets created fequently in highly concurrent scenarios. Also, starts the ConsumerCh channel listener to the incoming []BatchItems from the Producer line.

signal.Notify(c.TerminateCh, syscall.SIGINT, syscall.SIGTERM)
<-c.TerminateCh

To handle the graceful shutdown, the BatchConsumer supports os.Signal. So, the TerminateCh works as a terminate channel in case of certain os.Signal received [syscall.SIGINT, syscall.SIGTERM]. This logic will help the Workerline to complete the remaining work before going for a shutdown.

func (*BatchConsumer) WorkerFunc

func (c *BatchConsumer) WorkerFunc(index int)

WorkerFunc is the final production of []BatchItems. Each WorkerChannel sends their released []BatchItems to the SupplyChannel.

type BatchItems

type BatchItems struct {
	Id      int         `json:"id"`
	BatchNo int         `json:"batchNo"`
	Item    interface{} `json:"item"`
}

BatchItems struct defines the each batch item payload with an Id and relates to an overall BatchNo

type BatchOptions

type BatchOptions func(b *BatchProducer)

func WithMaxItems

func WithMaxItems(maxItems uint64) BatchOptions

func WithMaxWait

func WithMaxWait(maxWait time.Duration) BatchOptions

type BatchProducer

type BatchProducer struct {
	Watcher      chan *BatchItems
	MaxItems     uint64
	BatchNo      int32
	MaxWait      time.Duration
	ConsumerFunc ConsumerFunc
	Quit         chan bool
	Log          *log.Logger
}

BatchProducer struct defines the Producers fields that requires to create a []BatchItems object.

Watcher: The receiver channel that gets the BatchItems marshalled object from Batch reader. MaxItems: Maximum no of BatchItems can be packed for a released Batch. BatchNo: Every []BatchItems that gets released marked with BatchNo [integer]. MaxWait: If a batch processing takes too long, then MaxWait has the timeout that expires after an interval. ConsumerFunc: It's the callback function that gets invoke by the Consumer Quit: It's the exit channel for the Producer to end the processing Log: Batch processing library uses "github.com/sirupsen/logrus" as logging tool.

func NewBatchProducer

func NewBatchProducer(callBackFn ConsumerFunc, opts ...BatchOptions) *BatchProducer

NewBatchProducer defines the producer line for creating a Batch. There will be a Watcher channel that receives the incoming BatchItem from the source. The ConsumerFunc works as a callback function to the Consumer line to release the newly created set of BatchItems.

Each Batch is registered with a BatchNo that gets created when the Batch itemCounter++ increases to the MaxItems value.

func (*BatchProducer) CheckRemainingItems

func (p *BatchProducer) CheckRemainingItems(done chan bool)

CheckRemainingItems is a force re-check function on remaining batch items that are available for processing.

func (*BatchProducer) WatchProducer

func (p *BatchProducer) WatchProducer()

WatchProducer has the Watcher channel that receives the BatchItem object from the Batch read item channel. Watcher marks each BatchItem with a BatchNo and adds it to the []BatchItems array. After the batch itemCounter++ increases to the MaxItems [DefaultMaxItems: 100], the Batch gets releases to the Consumer callback function.

If the Batch processing get to halt in the Watcher channel then the MaxWait [DefaultMaxWait: 30 sec] timer channel gets called to check the state to releases the Batch to the Consumer callback function.

type BatchSupply

type BatchSupply struct {
	BatchSupplyCh  chan chan []BatchItems
	ClientSupplyCh chan []BatchItems
}

BatchSupply structure defines the supply line for the final delivery of []BatchItems to the client

BatchSupplyCh: It's the bidirectional channel that request for the []BatchItems to the Workerline and gets in the response.
ClientSupplyCh: It's delivery channel that works as a Supply line to sends the []BatchItems and the client receives by listening to the channel.

func NewBatchSupply

func NewBatchSupply() *BatchSupply

NewBatchSupply will create the BatchSupply object that has two sets of supply channels. The BatchSupplyCh will work as a bidirectional channel to request for a []BatchItems from the Workerline and gets the batch items from the response channel. The ClientSupplyCh will send received the []BatchItems from the BatchSupplyCh to the client.

type ConsumerFunc

type ConsumerFunc func(items []BatchItems)

ConsumerFunc is the callback function that invoke from Consumer

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

func NewSemaphore

func NewSemaphore(n int) *Semaphore

func (*Semaphore) Acquire

func (s *Semaphore) Acquire(n int)

func (*Semaphore) Lock

func (s *Semaphore) Lock()

func (*Semaphore) RLock

func (s *Semaphore) RLock()

func (*Semaphore) RUnlock

func (s *Semaphore) RUnlock()

func (*Semaphore) Release

func (s *Semaphore) Release(n int)

func (*Semaphore) Unlock

func (s *Semaphore) Unlock()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL