Documentation
¶
Overview ¶
batch made easy.
batch is a safe and efficient way to combine results of multiple parallel tasks into one heavy "commit" (save) operation. batch ensures each task is either receive successful result if its results have been committed (possibly as a parth of a batch) or an error in other case.
Logic for Coordinator ¶
------------------------- -> worker0 ------> | common cumulative state | /--> worker0 -> worker1 ------> | collected from multiple | -> result ---> worker1 -> worker2 ------> | workers and committed | \--> worker2 -> worker3 ------> | all at once | \-> worker3 -------------------------
Logic for Multi ¶
Workers are distributed among multiple coaches and then each coach works the same as in Coordinator case.
------------------------- | -> | workers are ditributed | /--> worker0 -> worker0 -> | -> | among free coaches | -> result ---> worker1 -> worker1 -> | ------------------------- -> worker2 -> | -> worker3 -> | ------------------------- | -> | state is combined in a | /--> worker2 | -> | few independent coaches | -> result ---> worker3 -------------------------
Full Guide ¶
bc.Queue().In() // get in the queue // Get data ready for the commit. // All the independent from other workers operations should be done here. data := 1 if leave() { bc.Queue().Out() // Get out of the queue. bc.Notify() // Notify waiting goroutines we won't come. return errNoNeed } idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting. // Just like with the Mutex there are no guaranties who will enter first. if idx < 0 { // In non-blocking mode we didn't Enter the batch, it's always >= 0 in blocking mode. return errBusy // No need to do anything here. } defer bc.Exit() // if we entered we must exit _ = 0 // calling it with defer ensures state consistency in case of panic // We are inside locked Mutex between Enter and Exit. // So the shared state can be modified safely. // That also means that all the long and heavy operations // should be done before Enter. if idx == 0 { // we are first in the batch, reset the state sum = 0 } if full() { // if we didn't change the state we can just leave. bc.Trigger() // Optionally we can trigger the batch. // Common use case is to flush the data if we won't fit. return errRetry // Then we may return and retry in the next batch. } sum += data // adding our work to the batch if spoilt() { // If we spoilt the satate and want to abort commit _, err = bc.Cancel(ctx, causeErr) // cancel the batch. Commit isn't done, all workers get causeErr. // If causeErr is nil it's set to Canceled. } if failed() { // Suppose some library panicked here. panic("we can safely panic here") // Panic will be propogated to the caller, // other goroutines in the batch will get PanicError. } if urgent() { // If we want to commit immideately bc.Trigger() // trigger it. // Workers already added their job will be committed too. // Workers haven't entered the batch will go to the next one. } res, err := bc.Commit(ctx) // Call Commit. // The last goroutine entered the batch calls the actual Coordinator.Commit. // All the others wait and get the same res and error.
Batch is a safe wrapper around Coordinator. It will call missed methods if that makes sense or panic otherwise.
Multi is a coordinator with N available parallel batches. Suppose you have 3 db replicas so you can distribute load across them. Multi.Enter will enter the first free coach (replica in our example) and return its index. The rest is similar to Coordinator. Custom logic for choosing coach can be used by setting Multi.Balancer.
MultiBatch is a safe wrapper around Multi just like Batch wraps Coordinator.
Index ¶
- Variables
- type Batch
- type Coordinator
- func (c *Coordinator[Res]) Cancel(ctx context.Context, err error) (Res, error)
- func (c *Coordinator[Res]) Commit(ctx context.Context) (Res, error)
- func (c *Coordinator[Res]) Enter(blocking bool) int
- func (c *Coordinator[Res]) Exit() int
- func (c *Coordinator[Res]) Init(f func(ctx context.Context) (Res, error))
- func (c *Coordinator[Res]) Notify()
- func (c *Coordinator[Res]) Queue() *Queue
- func (c *Coordinator[Res]) Trigger()
- type Multi
- func (c *Multi[Res]) Cancel(ctx context.Context, coach int, err error) (Res, error)
- func (c *Multi[Res]) Commit(ctx context.Context, coach int) (Res, error)
- func (c *Multi[Res]) Enter(blocking bool) (coach, idx int)
- func (c *Multi[Res]) Exit(coach int) int
- func (c *Multi[Res]) Init(n int, f func(ctx context.Context, coach int) (Res, error))
- func (c *Multi[Res]) Notify()
- func (c *Multi[Res]) Queue() *Queue
- func (c *Multi[Res]) Trigger(coach int)
- type MultiBatch
- func (b *MultiBatch[Res]) Cancel(ctx context.Context, err error) (Res, error)
- func (b *MultiBatch[Res]) Commit(ctx context.Context) (Res, error)
- func (b *MultiBatch[Res]) Enter(blocking bool) (coach, idx int)
- func (b *MultiBatch[Res]) Exit() int
- func (b *MultiBatch[Res]) QueueIn() int
- func (b *MultiBatch[Res]) Trigger()
- type PanicError
- type Queue
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var Canceled = errors.New("batch canceled")
Canceled is a default error returned to workers if Cancel was called with nil err.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch[Res any] struct { // contains filtered or unexported fields }
Example ¶
package main import ( "context" "errors" "log" "sync" "nikand.dev/go/batch" ) type ( SafeService struct { sum int // state we collect to commit together bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it } contextKeySafe struct{} ) func NewSafeService() *SafeService { s := &SafeService{} s.bc = batch.New(s.commit) return s } func (s *SafeService) commit(ctx context.Context) (int, error) { // suppose some heavy operation here // update a file or write to db log.Printf("* * * commit %2d * * *", s.sum) return s.sum, nil } func (s *SafeService) DoWork(ctx context.Context, data int) (int, error) { b := batch.By(s.bc) defer b.Exit() // it's like Mutex.Unlock, but safely works even if we didn't enter _ = 0 // Must be called with defer to outlive panics b.QueueIn() // let others know we are going to join _ = data // prepare data idx := b.Enter(true) // true for blocking, false if we want to leave instead of waiting if idx < 0 { // we haven't entered the batch in non blocking mode return 0, errors.New("not this time") // we have to leave in that case } if idx == 0 { // we are first in the batch, reset the state s.sum = 0 log.Printf("* * * reset batch * * *") } log.Printf("worker %2d got in with index %2d", ctx.Value(contextKeySafe{}), idx) s.sum += data // add our work to the batch // only one of return/Cancel/Commit must be called and only once res, err := b.Commit(ctx) if err != nil { // batch failed, each worker in it will get the same error return 0, err } log.Printf("worker %2d got result %v %v", ctx.Value(contextKeySafe{}), res, err) // if we are here, all of the workers have their work committed return res, nil } func main() { const jobs = 5 s := NewSafeService() // let's spin up some workers var wg sync.WaitGroup for j := 0; j < jobs; j++ { j := j wg.Add(1) go func() { defer wg.Done() ctx := context.Background() // passed to commit function ctx = context.WithValue(ctx, contextKeySafe{}, j) res, err := s.DoWork(ctx, 1) _, _ = res, err }() } wg.Wait() }
func By ¶ added in v0.5.0
func By[Res any](c *Coordinator[Res]) Batch[Res]
type Coordinator ¶ added in v0.5.0
type Coordinator[Res any] struct { // CommitFunc is called to commit shared shate. // // It's already called owning critical section. Enter-Exit cycle must not be called from it. // // Required. CommitFunc func(ctx context.Context) (Res, error) // contains filtered or unexported fields }
Coordinator coordinates workers to update shared state, commit it, and deliver result to all participated workers.
Example ¶
package main import ( "context" "errors" "log" "sync" "nikand.dev/go/batch" ) type ( Service struct { sum int // state we collect to commit together bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it } contextKey struct{} ) func NewService() *Service { s := &Service{} s.bc = batch.New(s.commit) return s } func (s *Service) commit(ctx context.Context) (int, error) { // suppose some heavy operation here // update a file or write to db log.Printf("* * * commit %2d * * *", s.sum) return s.sum, nil } func (s *Service) DoWork(ctx context.Context, data int) (int, error) { s.bc.Queue().In() // let others know we are going to join _ = data // prepare data idx := s.bc.Enter(true) // true for blocking, false if we want to leave instead of waiting if idx < 0 { // we haven't entered the batch in non blocking mode return 0, errors.New("not this time") // we have to leave in that case } defer s.bc.Exit() // it's like Mutex.Unlock. It's a pair to successful Enter. _ = 0 // Must be called with defer to outlive panics if idx == 0 { // we are first in the batch, reset the state s.sum = 0 log.Printf("* * * reset batch * * *") } log.Printf("worker %2d got in with index %2d", ctx.Value(contextKey{}), idx) s.sum += data // add our work to the batch // only one of return/Cancel/Commit must be called and only once res, err := s.bc.Commit(ctx) if err != nil { // batch failed, each worker in it will get the same error return 0, err } log.Printf("worker %2d got result %v %v", ctx.Value(contextKey{}), res, err) // if we are here, all of the workers have their work committed return res, nil } func main() { const jobs = 5 s := NewService() // let's spin up some workers var wg sync.WaitGroup for j := 0; j < jobs; j++ { j := j wg.Add(1) go func() { defer wg.Done() ctx := context.Background() // passed to commit function ctx = context.WithValue(ctx, contextKey{}, j) res, err := s.DoWork(ctx, 1) _, _ = res, err }() } wg.Wait() }
func New ¶
func New[Res any](f func(ctx context.Context) (Res, error)) *Coordinator[Res]
New creates new Coordinator.
func (*Coordinator[Res]) Cancel ¶ added in v0.5.0
func (c *Coordinator[Res]) Cancel(ctx context.Context, err error) (Res, error)
Cancel aborts current batch and returns the same error to all workers already added their data to the batch. Coordinator.Commit is not called. Waiting workers but not Entered the critical section are not affected.
func (*Coordinator[Res]) Commit ¶ added in v0.5.0
func (c *Coordinator[Res]) Commit(ctx context.Context) (Res, error)
Commit waits for the waiting workes to add their data to the batch, calls Coordinator.Commit only once for the batch, and returns the same shared result to all workers.
func (*Coordinator[Res]) Enter ¶ added in v0.5.0
func (c *Coordinator[Res]) Enter(blocking bool) int
Enter enters the batch. When the call returns we are in the critical section. Shared resources can be used safely. It's similar to Mutex.Lock. Pair method Exit must be called if Enter was successful (returned value >= 0). It returns index of entered worker. 0 means we are the first in the batch and we should reset shared state. If blocking == false and batch is not available negative value returned. Enter also removes the worker from the queue.
func (*Coordinator[Res]) Exit ¶ added in v0.5.0
func (c *Coordinator[Res]) Exit() int
Exit exits the critical section. It should be called with defer just after we successfuly Entered the batch. It's similar to Mutex.Unlock. Returns number of workers have not Exited yet. 0 means we are the last exiting the batch, state can be reset here. But remember the case when worker have panicked.
func (*Coordinator[Res]) Init ¶ added in v0.5.0
func (c *Coordinator[Res]) Init(f func(ctx context.Context) (Res, error))
Init initiates zero Coordinator. It can also be used as Reset but not in parallel with its usage.
func (*Coordinator[Res]) Notify ¶ added in v0.5.0
func (c *Coordinator[Res]) Notify()
Notify wakes up waiting workers.
Must be called if the worker left the Queue before Enter.
func (*Coordinator[Res]) Queue ¶ added in v0.5.0
func (c *Coordinator[Res]) Queue() *Queue
Gets the queue of waitng workers.
Worker can leave the Queue before Enter, but we must call Notify to wake up waiting workers.
func (*Coordinator[Res]) Trigger ¶ added in v0.5.0
func (c *Coordinator[Res]) Trigger()
Trigger batch to Commit. We can call both Commit or Exit after that. If we added our data to the batch or if we didn't respectively. So we will be part of the batch or not.
type Multi ¶ added in v0.4.0
type Multi[Res any] struct { // CommitFunc is called to commit coach `coach`. // // It's already called owning critical section. Enter-Exit cycle must not be called from it. CommitFunc func(ctx context.Context, coach int) (Res, error) // Balancer chooses replica among available or it can choose to wait for more. // bitset is a set of available coaches. // Coach n is available <=> bitset & (1<<n) != 0. // If returned value >= 0 and that coach is available it proceeds with it. // If returned value < 0 or that coach is not available // worker acts as there were no available coaches. Balancer func(bitset uint64) int // BigBalancer is if 64 coaches is not enough. // Coach n is available <=> bitset[n/64] & (1<<(n%64)) != 0. BigBalancer func(bitset []uint64) int // contains filtered or unexported fields }
Multi is a coordinator for multiple parallel batches. Multi can't be created as a literal, it must be initialized either by NewMulti or Init.
Example ¶
package main import ( "context" "errors" "log" "sync" "time" "nikand.dev/go/batch" ) type ( ServiceMulti struct { sum []int // state we collect to commit together bc *batch.Multi[int] // [int] is the result value type, set to struct{} if don't need it } ) func NewServiceMulti(coaches int) *ServiceMulti { s := &ServiceMulti{ sum: make([]int, coaches), } s.bc = batch.NewMulti(coaches, s.commit) return s } func (s *ServiceMulti) commit(ctx context.Context, coach int) (int, error) { // suppose some heavy operation here // update a file or write to db log.Printf("* * * coach: %2d, commit %2d * * *", coach, s.sum[coach]) time.Sleep(time.Millisecond) // let other workers to use another coach return s.sum[coach], nil } func (s *ServiceMulti) DoWork(ctx context.Context, data int) (int, error) { s.bc.Queue().In() // let others know we are going to join _ = data // prepare data coach, idx := s.bc.Enter(true) // true for blocking, false if we want to leave instead of waiting if idx < 0 { // we haven't entered the batch in non blocking mode return 0, errors.New("not this time") // we have to leave in that case } defer s.bc.Exit(coach) // it's like Mutex.Unlock. It's a pair to successful Enter. _ = 0 // Must be called with defer to outlive panics if idx == 0 { // we are first in the batch, reset the state s.sum[coach] = 0 log.Printf("* * * coach %2d, reset batch * * *", coach) } log.Printf("worker %2d got into coach %2d with index %2d", ctx.Value(contextKey{}), coach, idx) if data == 0 { // if we didn't spoil the batch state return 0, nil // we can leave freely } s.sum[coach] += data // add our work to the batch if s.sum[coach] >= 3 { // if batch is already big s.bc.Trigger(coach) // trigger it } // only one of return/Cancel/Commit must be called and only once res, err := s.bc.Commit(ctx, coach) if err != nil { // batch failed, each worker in it will get the same error return 0, err } log.Printf("worker %2d got result for coach %2d: %v %v", ctx.Value(contextKey{}), coach, res, err) // if we are here, all of the workers have their work committed return res, nil } func main() { const jobs = 5 s := NewServiceMulti(2) // let's spin up some workers var wg sync.WaitGroup for j := 0; j < jobs; j++ { j := j wg.Add(1) go func() { defer wg.Done() ctx := context.Background() // passed to commit function ctx = context.WithValue(ctx, contextKey{}, j) res, err := s.DoWork(ctx, 1) _, _ = res, err }() } wg.Wait() }
func (*Multi[Res]) Cancel ¶ added in v0.5.0
Cancel indicates the current worker is done with shared data but it mustn't be committed. All the workers from the same batch receive zero Res and the same error.
It can be used if batch shared state have been spoilt as a result of error or something.
func (*Multi[Res]) Commit ¶ added in v0.5.0
Commit indicates the current worker is done with the shared state and ready to Commit it. Commit blocks until batch is committed. The same Res and error is returned to all the workers in the batch. (Res, error) is what the Multi.Commit returns.
func (*Multi[Res]) Enter ¶ added in v0.4.0
Enter available batch. It will return -1, -1 if blocking == false and no batches available. Enter also removes worker from the queue.
See also documentation for Coordinator.Enter.
coach choice can be configured by setting custom Multi.Balancer.
func (*Multi[Res]) Exit ¶ added in v0.5.0
Exit the batch. Should be called with defer. It works similar to Mutex.Unlock in the sense it unlocks shared resources.
func (*Multi[Res]) Init ¶ added in v0.5.0
Init initiates zero Multi. It can also be used as Reset but not in parallel with its usage.
func (*Multi[Res]) Notify ¶ added in v0.5.0
func (c *Multi[Res]) Notify()
Notify wakes up waiting workers.
Must be called if the worker left the Queue before Enter.
func (*Multi[Res]) Queue ¶ added in v0.5.0
Queue returns common queue for all coaches. Getting into it means already started batches will wait for it not committing yet.
Worker can leave the Queue before Enter, but we must call Notify to wake up waiting workers.
func (*Multi[Res]) Trigger ¶ added in v0.5.0
Trigger the batch to commit. Must be called inside Enter-Exit section.
Can be used to flush current batch. With our data or without and then we can retry. Commit happens when current worker Exits from critical section. So you need to Exit and then get into the Queue and Enter again to retry.
type MultiBatch ¶ added in v0.5.0
type MultiBatch[Res any] struct { // contains filtered or unexported fields }
Example ¶
package main import ( "context" "errors" "log" "sync" "time" "nikand.dev/go/batch" ) type ( SafeServiceMulti struct { sum []int // state we collect to commit together bc *batch.Multi[int] // [int] is the result value type, set to struct{} if don't need it } ) func NewSafeServiceMulti(coaches int) *SafeServiceMulti { s := &SafeServiceMulti{ sum: make([]int, coaches), } s.bc = batch.NewMulti(coaches, s.commit) return s } func (s *SafeServiceMulti) commit(ctx context.Context, coach int) (int, error) { // suppose some heavy operation here // update a file or write to db log.Printf("* * * coach %2d, commit %2d * * *", coach, s.sum[coach]) time.Sleep(time.Millisecond) // let other workers to use another coach return s.sum[coach], nil } func (s *SafeServiceMulti) DoWork(ctx context.Context, data int) (int, error) { b := batch.ByMulti(s.bc) defer b.Exit() // it's like Mutex.Unlock, but safely works even if we didn't enter _ = 0 // Must be called with defer to outlive panics b.QueueIn() // let others know we are going to join _ = data // prepare data coach, idx := b.Enter(true) // true for blocking, false if we want to leave instead of waiting if idx < 0 { // we haven't entered the batch in non blocking mode return 0, errors.New("not this time") // we have to leave in that case } if idx == 0 { // we are first in the batch, reset the state s.sum[coach] = 0 log.Printf("* * * coach %2d, reset batch * * *", coach) } log.Printf("worker %2d got into coach %2d with index %2d", ctx.Value(contextKeySafe{}), coach, idx) if data == 0 { // if we didn't spoil the batch state return 0, nil // we can leave freely } s.sum[coach] += data // add our work to the batch if s.sum[coach] >= 3 { // if batch is already big b.Trigger() // trigger it } // only one of return/Cancel/Commit must be called and only once res, err := b.Commit(ctx) if err != nil { // batch failed, each worker in it will get the same error return 0, err } log.Printf("worker %2d got result for coach %2d: %v %v", ctx.Value(contextKeySafe{}), coach, res, err) // if we are here, all of the workers have their work committed return res, nil } func main() { const jobs = 5 s := NewSafeServiceMulti(2) // let's spin up some workers var wg sync.WaitGroup for j := 0; j < jobs; j++ { j := j wg.Add(1) go func() { defer wg.Done() ctx := context.Background() // passed to commit function ctx = context.WithValue(ctx, contextKeySafe{}, j) res, err := s.DoWork(ctx, 1) _, _ = res, err }() } wg.Wait() }
func ByMulti ¶ added in v0.5.0
func ByMulti[Res any](c *Multi[Res]) MultiBatch[Res]
func (*MultiBatch[Res]) Cancel ¶ added in v0.5.0
func (b *MultiBatch[Res]) Cancel(ctx context.Context, err error) (Res, error)
func (*MultiBatch[Res]) Commit ¶ added in v0.5.0
func (b *MultiBatch[Res]) Commit(ctx context.Context) (Res, error)
func (*MultiBatch[Res]) Enter ¶ added in v0.5.0
func (b *MultiBatch[Res]) Enter(blocking bool) (coach, idx int)
func (*MultiBatch[Res]) Exit ¶ added in v0.5.0
func (b *MultiBatch[Res]) Exit() int
func (*MultiBatch[Res]) QueueIn ¶ added in v0.5.0
func (b *MultiBatch[Res]) QueueIn() int
func (*MultiBatch[Res]) Trigger ¶ added in v0.5.0
func (b *MultiBatch[Res]) Trigger()
type PanicError ¶
type PanicError struct {
Panic interface{}
}
PanicError is returned to all the workers in the batch if one panicked. The panicked worker gets panic, not an error.
func AsPanicError ¶ added in v0.5.0
AsPanicError unwrapes PanicError.