batcher

package
v2.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2023 License: MIT Imports: 7 Imported by: 0

README

Batch

Instead of executing a task immediately whenever you receive an input, sometimes, it might be more efficient to create a batch of inputs and process all in one go.

out := make(chan int, taskCount)

processBatch := func(nums []int) error {
    for _, number := range nums {
        out <- number * 10
    }
    
    return nil
}

// Auto process batch every 100ms
periodicBatcher := NewBatcher(
    processBatch,
    WithAutoProcessInterval(100*time.Millisecond),
)

// Auto process batch when pending queue reaches 10
sizeBatcher := NewBatcher(
    processBatch,
    WithAutoProcessSize(10),
)

// Auto process batch every 100ms or when pending queue reaches 10
periodicSizeBatcher := NewBatcher(
    processBatch,
    WithAutoProcessInterval(100*time.Millisecond),
    WithAutoProcessSize(10),
)

// Auto process batch when pending queue reaches 10
manualBatcher := NewBatcher(
    processBatch,
)

manualBatcher.Process()

See batch_test.go for more detailed examples on how to use this feature.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchProcessorNotActive = errors.New("batch processor has already shut down")
)

Functions

This section is empty.

Types

type Batcher

type Batcher[P any, T any] interface {

	// Append adds a new payload to the batch and returns a task for that particular payload.
	// Clients MUST execute the returned task before blocking and waiting for it to complete
	// to extract result.
	Append(ctx context.Context, payload P) async.Task[T]
	// contains filtered or unexported methods
}

Batcher is a batch processor which is suitable for sitting in the back to receive tasks from callers to execute in one go and then return individual result to each caller.

Example
ctx := context.Background()

b := NewBatcher(
	func(input []int) ([]int, error) {
		fmt.Println(input)

		result := make([]int, len(input))
		for idx, number := range input {
			result[idx] = number * 2
		}

		return result, nil
	},
	nil,
)

defer b.Shutdown()

t1 := b.Append(ctx, 1)
t2 := b.Append(ctx, 2)

b.Process(ctx)

async.ContinueWithNoResult(
	t1, func(_ context.Context, v int, err error) error {
		fmt.Println(v)

		return nil
	},
).ExecuteSync(ctx)

async.ContinueWithNoResult(
	t2, func(_ context.Context, v int, err error) error {
		fmt.Println(v)

		return nil
	},
).ExecuteSync(ctx)
Output:

[1 2]
2
4

func NewBatcher

func NewBatcher[P any, T any](
	processFn func([]P) ([]T, error),
	payloadKeyExtractor func(P) any,
	options ...BatcherOption,
) Batcher[P, T]

NewBatcher returns a new Batcher

type BatcherOption

type BatcherOption func(*batcherConfigs)

func WithAutoProcessByTicketBooth

func WithAutoProcessByTicketBooth() BatcherOption

WithAutoProcessByTicketBooth creates a synchronous ticket booth that can track how many ticket owners have arrived in order to start batch processing.

func WithAutoProcessInterval

func WithAutoProcessInterval(autoProcessIntervalInMilliseconds time.Duration) BatcherOption

WithAutoProcessInterval sets the interval at which Batcher will automatically process the pending tasks. If `autoProcessDurationInMilliseconds <= 0`, the default behavior applies: no periodic auto processing will be done.

Note: if periodic auto processing is ON, clients MUST call Batcher.Shutdown() to clean up the timer goroutine properly in order to avoid memory leak.

func WithAutoProcessSize

func WithAutoProcessSize(autoProcessSize int) BatcherOption

WithAutoProcessSize sets the limit at which Batcher will automatically process the pending tasks. If `autoProcessSize <= 0`, the default behavior applies: no auto processing will be done based on size.

func WithShutdownGraceDuration

func WithShutdownGraceDuration(shutdownDurationInMilliseconds time.Duration) BatcherOption

WithShutdownGraceDuration specifies how long Batcher will wait for the Shutdown operation to complete before returning. If `shutdownDurationInMilliseconds <= 0`, Batcher will block and wait until the shutdown operation fully completes.

type MockBatcher

type MockBatcher[P interface{}, T interface{}] struct {
	mock.Mock
}

MockBatcher is an autogenerated mock type for the Batcher type

func NewMockBatcher

func NewMockBatcher[P interface{}, T interface{}](t mockConstructorTestingTNewMockBatcher) *MockBatcher[P, T]

NewMockBatcher creates a new instance of MockBatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockBatcher[P, T]) Append

func (_m *MockBatcher[P, T]) Append(ctx context.Context, payload P) async.Task[T]

Append provides a mock function with given fields: ctx, payload

func (*MockBatcher[P, T]) BuyTicket

func (_m *MockBatcher[P, T]) BuyTicket(ctx context.Context) context.Context

BuyTicket provides a mock function with given fields: ctx

func (*MockBatcher[P, T]) DiscardTicket

func (_m *MockBatcher[P, T]) DiscardTicket(ctx context.Context)

DiscardTicket provides a mock function with given fields: ctx

func (*MockBatcher[P, T]) Process

func (_m *MockBatcher[P, T]) Process(ctx context.Context)

Process provides a mock function with given fields: ctx

func (*MockBatcher[P, T]) Shutdown

func (_m *MockBatcher[P, T]) Shutdown()

Shutdown provides a mock function with given fields:

func (*MockBatcher[P, T]) Size

func (_m *MockBatcher[P, T]) Size() int

Size provides a mock function with given fields:

type MockSilentBatcher

type MockSilentBatcher[P interface{}] struct {
	mock.Mock
}

MockSilentBatcher is an autogenerated mock type for the SilentBatcher type

func NewMockSilentBatcher

func NewMockSilentBatcher[P interface{}](t mockConstructorTestingTNewMockSilentBatcher) *MockSilentBatcher[P]

NewMockSilentBatcher creates a new instance of MockSilentBatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockSilentBatcher[P]) Append

func (_m *MockSilentBatcher[P]) Append(ctx context.Context, payload P) async.SilentTask

Append provides a mock function with given fields: ctx, payload

func (*MockSilentBatcher[P]) BuyTicket

func (_m *MockSilentBatcher[P]) BuyTicket(ctx context.Context) context.Context

BuyTicket provides a mock function with given fields: ctx

func (*MockSilentBatcher[P]) DiscardTicket

func (_m *MockSilentBatcher[P]) DiscardTicket(ctx context.Context)

DiscardTicket provides a mock function with given fields: ctx

func (*MockSilentBatcher[P]) Process

func (_m *MockSilentBatcher[P]) Process(ctx context.Context)

Process provides a mock function with given fields: ctx

func (*MockSilentBatcher[P]) Shutdown

func (_m *MockSilentBatcher[P]) Shutdown()

Shutdown provides a mock function with given fields:

func (*MockSilentBatcher[P]) Size

func (_m *MockSilentBatcher[P]) Size() int

Size provides a mock function with given fields:

type SilentBatcher

type SilentBatcher[P any] interface {

	// Append adds a new payload to the batch and returns a task for that particular payload.
	// Clients MUST execute the returned task before blocking and waiting for it to complete
	// to extract result.
	Append(ctx context.Context, payload P) async.SilentTask
	// contains filtered or unexported methods
}

SilentBatcher is a batch processor which is suitable for sitting in the back to accumulate tasks and then execute all in one go silently.

Example
var wg sync.WaitGroup
wg.Add(2)

ctx := context.Background()

b := NewSilentBatcher(
	func(input []interface{}) error {
		fmt.Println(input)
		return nil
	},
)

async.ContinueInSilence(
	b.Append(ctx, 1), func(_ context.Context, err error) error {
		wg.Done()

		return nil
	},
).Execute(ctx)

async.ContinueInSilence(
	b.Append(ctx, 2), func(_ context.Context, err error) error {
		wg.Done()

		return nil
	},
).Execute(ctx)

b.Process(ctx)

wg.Wait()
Output:

[1 2]

func NewSilentBatcher

func NewSilentBatcher[P any](processFn func([]P) error, options ...BatcherOption) SilentBatcher[P]

NewSilentBatcher returns a new SilentBatcher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL