Versions in this module Expand all Collapse all v1 v1.0.5 Jan 25, 2022 v1.0.4 Aug 1, 2021 Changes in this version type Batch + func (b *Batch) SetDebugLogLevel() v1.0.3 Jul 28, 2021 Changes in this version type Batch + func (b *Batch) Unlock() v1.0.2 Jul 28, 2021 v1.0.1 Jul 16, 2021 v1.0.0 Apr 18, 2021 Changes in this version + var DefaultBatchNo = int32(1) + var DefaultMaxItems = uint64(100) + var DefaultMaxWait = time.Duration(30) * time.Second + var DefaultWorkerPool = 10 + type Batch struct + Consumer *BatchConsumer + Id int + Islocked bool + Item chan interface{} + Log *log.Logger + Producer *BatchProducer + Semaphore *Semaphore + func NewBatch(opts ...BatchOptions) *Batch + func (b *Batch) Close() + func (b *Batch) ReadItems() + func (b *Batch) StartBatchProcessing() + func (b *Batch) Stop() + func (b *Batch) StopProducer() + type BatchConsumer struct + BatchWorkerCh chan []BatchItems + ConsumerCh chan []BatchItems + Log *log.Logger + Quit chan bool + Supply *BatchSupply + TerminateCh chan os.Signal + Workerline *sync.WaitGroup + func NewBatchConsumer() *BatchConsumer + func (c *BatchConsumer) ConsumerBatch(ctx context.Context) + func (c *BatchConsumer) ConsumerFunc(items []BatchItems) + func (c *BatchConsumer) GetBatchSupply() + func (c *BatchConsumer) Shutdown() + func (c *BatchConsumer) StartConsumer() + func (c *BatchConsumer) WorkerFunc(index int) + type BatchItems struct + BatchNo int + Id int + Item interface{} + type BatchOptions func(b *BatchProducer) + func WithMaxItems(maxItems uint64) BatchOptions + func WithMaxWait(maxWait time.Duration) BatchOptions + type BatchProducer struct + BatchNo int32 + ConsumerFunc ConsumerFunc + Log *log.Logger + MaxItems uint64 + MaxWait time.Duration + Quit chan bool + Watcher chan *BatchItems + func NewBatchProducer(callBackFn ConsumerFunc, opts ...BatchOptions) *BatchProducer + func (p *BatchProducer) CheckRemainingItems(done chan bool) + func (p *BatchProducer) WatchProducer() + type BatchSupply struct + BatchSupplyCh chan chan []BatchItems + ClientSupplyCh chan []BatchItems + func NewBatchSupply() *BatchSupply + type ConsumerFunc func(items []BatchItems) + type Semaphore struct + func NewSemaphore(n int) *Semaphore + func (s *Semaphore) Acquire(n int) + func (s *Semaphore) Lock() + func (s *Semaphore) RLock() + func (s *Semaphore) RUnlock() + func (s *Semaphore) Release(n int) + func (s *Semaphore) Unlock()