batcher

package module
v1.8.1-0...-3963fac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2024 License: MIT Imports: 12 Imported by: 0

README

Batcher

This content supports the Rate Limiting Cloud Design Pattern found in the Azure Architecture Center.

Overview

Batcher is a datastore-agnostic batching and rate-limiting implementation for Go.

Datastores have performance limits and the work that is executed against them has costs in terms of memory, CPU, disk, network, and so on (whether you have quantified those costs or not).

Batcher, not only allows you to enqueue operations which are then given back to you in a batch, but can provide an easy way for applications to consume all available resources on a datastore without exceeding their performance limits.

Use Cases

Here are the most common use cases for Batcher:

Rate limiting on datastores

Consider this scenario:

  • You have an Azure Cosmos database that you have provisioned with 20K Request Units (RU).
  • Your service runs in a pod with 4 replicas on Kubernetes that need to share the capacity.
  • Your service gets large jobs for processing, commonly 100K records or more at a time.
  • Each record costs 10 RU.
  • If 2 jobs come in at the same time (1 job to each of 2 replicas), then we have 100K records x 2 jobs x 10 RU = 2M RU of work that needs to be done.

Given that we have capacity of 20K RU per second, we know that we could complete the work in 100 seconds if we could spread it out.

However, each process might try and send their own 100K records in parallel and with no knowledge of each other. This would cause Cosmos to start issuing 429 TooManyRequests error messages and given the volume it might even cut you off with 503 ServiceUnavailable error messages.

Batcher solves this problem by allowing you to share the capacity across multiple replicas and controlling the flow of traffic so you don't exceed the 20K RU per second.

Cost savings - Reserved vs Shared Capacity

Consider this scenario:

  • You have an Azure Cosmos database.
  • Your service runs in a pod with 4 replicas on Kubernetes that need to share the capacity.
  • You want to ensure that each replica can operate at up to 20K RU, but without having to provision 4 * 20K RU = 80K RU.

Using Batcher, you might still reserve capacity per instance, but it can be a small amount. You can then share capacity across the instances. For instance, you might reserve 2K RU for each of the 4 instances and share an addition 18K RU, allowing each instance to have capacity between 2K and 20K RU. This would require provisioning 26K RU in Cosmos instead of 80K RU.

Cost control

Consider this scenario:

  • You have an Azure Cosmos database.
  • You have a lot of expensive reads and writes, but don't want to pay a lot for Cosmos.

Batcher will ensure that you don't exceed this capacity by lengthening the time it takes for your operations to complete. Therefore, if you find that your application takes too long for operations to complete, you can increase the capacity. If you want to save money, you can decrease the capacity.

Rate limiting on other resource targets

Batcher use cases are not limited to datastores. Consider the scenario where you want to limit the number of messages you are allowed to send in a mail API. Batcher can provide the same rate limiting feature.

Batcher Components

  • Batcher: A Batcher is created for each datastore that has capacity you wish to respect. Batchers are long-lived.

  • Watcher: A Watcher is created for each process you wish to manage. Lots of Watchers will share the same Batcher. The Watcher receives the batches as they become available. Watchers are short-lived. For instance, if your solution is an HTTP server, you will probably create a Watcher with each request, send your Operations to a shared Batcher, and then get batches for processing back on your Watcher. If you need to handle different types of Operations that are processed in different ways or if they have different characteristics (such as an optimal batchsize), you might create a separate Watcher for each of those use cases.

  • Operation: Operations are enqueued into the Batcher. An Operation has an associated "Watcher", a "cost", a designation of whether or not it can be batched, a counter for the number of times it has been "attempted", and a "payload" (which can be anything you want).

  • SharedResource: This is the one included rate limiter (though you can write your own) that allows for reserving and/or sharing capacity across instances. When sharing capacity, you must provide a LeaseManager (such as AzureBlobLeaseManager) that is responsible for managing partitions and leases on those partitions.

  • AzureBlobLeaseManager: This is a Azure Blob Storage partition and lease management component that uses an Azure Storage Account. It is used by SharedResource so that capacity can be shared across multiple instances.

topology

Terminology

Some other terms will be used throughout...

  • Instance: Using SharedResource, you can share the capacity of a datastore across different processes, containers, servers, Pod replicas, etc. Hereafter, these separate entities will be refered to as Instances.

  • Target: As Operations are enqueued or marked done in Batcher it updates a Target number which is the total cost Batcher thinks is necessary to process any outstanding Operations. In other words, as Operations are enqueued, the Target grows by the cost of that Operation. When a batch is marked done, the Target is reduced by the cost of all Operations in that batch.

  • Capacity: The capacity that the rate limiter has been able to procure is available via the Capacity() method or the capacity event.

  • MaxCapacity: When using a rate limiter, the MaxCapacity is the maximum capacity that could even be provided. For the rate limiter SharedResource, this is the total of SharedCapacity and ReservedCapacity.

  • SharedCapacity: SharedResource optionally allows you to specify a SharedCapacity which is defined when NewSharedResource() is called. This is the capacity for the datastore that is shared across any number of Instances. In the most simple case, if a Cosmos database had 20K RU and 4 Instances of the service using it, you might specify the SharedCapacity as 20K on each Instance if you want all the capacity shared. SharedCapacity reduces cost.

  • ReservedCapacity: SharedResource optionally allows you to specify a ReservedCapacity that will only be used by this Instance. For example, in the above example, if you wanted to reserve 2K RU for each of your Instances, you might use a ReservedCapacity of 2K (on each of 4 Instances) and then use 12K for the SharedCapacity. ReservedCapacity reduces latency.

  • Partitions: The SharedResource rate limiter divides the SharedCapacity by a factor to determine the number of partitions to provision as blobs. If a process owns the exclusive lease on the partition blob, then it is allowed to use 1 factor of capacity. For example, if the SharedCapacity is 10K and the factor is 1K, then there are 10 partitions, control of each is worth 1K capacity.

  • Inflight: The number of batches that are currently being processed (i.e. the batch has been raised to the callback function on the Watcher, but has not not completed that function yet).

Features

  • Datastore Agnostic: Batcher does not process the Operations it batches, it just notifies the caller when a batch is ready for processing. This design means the solution can work with any datastore.

  • Batching: You may specify that Operations can be batched (ex. writes) and then specify constraints, like how often Operations should be flushed, maximum batch size, restricting the number of batches Inflight, datastore capacity, etc. Batcher will send you batches of Operations ready for you to process within all your constraints.

  • Rate Limiting: You may optionally attach a rate limiter to Batcher that can restrict the Operations so they don't exceed a certain cost per second.

  • Shared Capacity: The provided rate limiter SharedResource allows for sharing capacity across multiple Instances. Sharing capacity in this way can reduce cost.

  • Reserved Capacity: SharedResource also supports a reserved capacity to improve latency. For instance, you might have 4 Instances that need to share 20K RU in a Cosmos database. You might give each 2K reserved capacity and share the remaining 14K RU. This gives each process low latency up to 2K RU but allows each process to request more.

  • Cost per Operation: Each Operation that you enqueue to Batcher will have an associated cost.

  • Limit Retries: Commonly datastores have transient faults. You want to retry those Operations after a short time because they might succeed, but you don't want to retry them forever. Watchers can be set to enforce a maximum number of retries.

  • Pause: When your datastore is getting too much pressure (throwing timeouts or too-many-requests), you can pause the Batcher for a short period of time to give it some time to catch-up.

Workflow

  1. A Batcher is created for a datastore. It may be assigned a rate limiter.

  2. A user creates a Watcher, then enqueues Operations into the Batcher.

  3. In the Batcher processing loop, the CapacityInterval asks for capacity from the rate limiter to process the Operations.

  4. In the Batcher processing loop, the FlushInterval organizes Operations into batches and raises them back to the Watchers.

  5. The user performs the queries as batches are raised.

Usage

This code sample shows the general usage...

  1. If you are going to use rate limiting...
    1. Create a rate limiter for each Batcher via New() methods
    2. Set capacity for the rate limiters and/or attach LeaseManagers if appropriate
    3. Start() those rate limiters
  2. Create one or more Batchers via New() methods
  3. Start() those Batchers
  4. As you need to process data...
    1. Create a Watcher
    2. Enqueue Operations into the appropriate Batcher
import (
    gobatcher "github.com/plasne/go-batcher"
)

func main() {
    ctx := context.Background()

    // start getting shared resource capacity
    leaseMgr := gobatcher.NewAzureBlobLeaseManager(AZBLOB_ACCOUNT, AZBLOB_CONTAINER, AZBLOB_KEY)
    azresource := gobatcher.NewSharedResource().
        WithSharedCapacity(uint32(CAPACITY), leaseMgr).
        WithFactor(1000)
    resourceListener := azresource.AddListener(func(event string, val int, msg string, metadata interface{}) {
        switch event {
        case gobatcher.ErrorsEvent:
            log.Err(errors.New(msg)).Msgf("SharedResource raised the following error...")
        }
    })
    defer azresource.RemoveListener(resourceListener)
    if err := azresource.Provision(ctx); err != nil {
        panic(err)
    }
    if err := azresource.Start(ctx); err != nil {
        panic(err)
    }

    // start the batcher
    batcher := gobatcher.NewBatcher().
        WithRateLimiter(azresource)
    batcherListener := batcher.AddListener(func(event string, val int, msg string, metadata interface{}) {
        switch event {
        case gobatcher.PauseEvent:
            log.Debug().Msgf("batcher paused for %v ms to alleviate pressure on the datastore.", val)
        case gobatcher.AuditFailEvent:
            log.Debug().Msgf("batcher audit-fail: %v", msg)
        }
    })
    defer batcher.RemoveListener(batcherListener)
    if err := batcher.Start(ctx); err != nil {
        panic(err)
    }

    // example of an HTTP call
    http.HandleFunc("/ingest", func(res http.ResponseWriter, req *http.Request) {

        // create a batch watcher
        watcher := gobatcher.NewWatcher(func(batch []gobatcher.Operation) {
            // process the batch
        }).WithMaxAttempts(3)

        // enqueue operations
        for i := 0; i < total; i++ {
            payload := struct{}{}
            op := gobatcher.NewOperation(watcher, 10, payload).AllowBatch()
            if errorOnEnqueue := batcher.Enqueue(op); errorOnEnqueue != nil {
                panic(errorOnEnqueue)
            }
        }
    })
}

ℹ A full code sample that demonstrates usage is available in the sample folder.

There are some additional commands that can be executed on Batcher, including...

  • Pause(): This allows you to pause the main processing loop for Batcher. For instance, no batches will be flushed, no audits will be performed, etc.

  • Flush(): You may call Flush() to queue a manual flush. It will be processed immediately, but the Flush() method is not blocking.

Batcher Configuration

Batcher can be configured depending on your use case and requirements. For example, creating a new Batcher with some configuration items might look like this...

batcher := gobatcher.NewBatcherWithBuffer(buffer).
    WithRateLimiter(rateLimiter)

All configuration options are documented in the Batcher Configuration docs.

Events

Events are raised with a "name" (string), "val" (int), "msg" (string), and "metadata" (interface{}). Some of the events that can be raised by Batcher are shutdown or pause, while the rate limiters can raise events like capacity to indicate capacity changes. The complete list of events is documented in the Batcher Events docs.

Unit Testing

There are several ways to facilitate unit testing when using Batcher. Detailed documentation is available in the Unit Testing docs.

Rate Limiting

The SharedResource rate limiter works like this...

lease

When there is only ReservedCapacity and no SharedCapacity is set, there is no requirement to use a LeaseManager.

Scenarios

There are a couple of scenarios I want to call attention to...

  • ReservedCapacity plus SharedCapacity: Using 100% ReservedCapacity reduces latency and using 100% SharedCapacity is very cost efficient, however, it is generally best to find a happy middle-ground - a small amount of ReservedCapacity and the rest as SharedCapacity.

  • Operations per Second: If you want to limit the number of Operations per second rather than the cost of those operations, you can create all Operations with a cost of 1 and set the capacity appropriately.

Cost Savings

Traditionally if you want to run multiple instances of a service, you might provision capacity in your datastore times the number of instances. For example, in Azure Cosmos, if you need 20K RU and have 4 instances, you might provision 80K RU to ensure that any node could operate at maximum capacity.

Using SharedResource, you might still reserve capacity per instance, but it can be a small amount. You can then share capacity across the instances. For instance, in the same scenario, you might reserve 2K RU for the 4 instances and (minimally) share an addition 18K RU.

reserved-shared-capacity To give a cost comparison with retail pricing in the East US region with 1 TB of capacity:

  • 80K RU is $4,992 per month
  • 26K RU is $1,768 per month
Cost Increase

When using an AzureBlobLeaseManager and default settings, each instance of the SharedResource will make a single storage transaction roughly every 250 milliseconds when it needs additional capacity. Therefore, we can determine the maximum cost for 4 instances on an Azure Storage Account GPv2 (or Blob) as...

(4 processes) x (4 lease operations per second) x (60 seconds per minute) x (60 minutes per hour) x 730 (hours per month) / (10,000 operations per billing unit) * ($0.004 per billing unit) = ~$168 month

However, this is a maximum cost - actual costs in many cases will be much lower as there are only storage operations when additional capacity is needed.

Changing Capacity

The SharedResource rate limiter supports changing capacity after Start(). You can call SetSharedCapacity(newcap) and SetReservedCapacity(newcap). If you change SharedCapacity, to a higher value than previously seen, realize it will need to provision partitions again. For AzureBlobLeaseManager, it will need to provision blobs (per SharedCapacity divided by Factor) before it can return to its normal cycle of procuring partitions for capacity.

Determining Cost

A Batcher with a rate limiter depends on each operation having a cost. The following documents provide you with assistance on determining what values you should use for cost.

Context

When you Start() Batcher or a RateLimiter you must provide a context. If the context is ever "done" (cancelled, deadlined, etc.), the Batcher or RateLimiter is shutdown. Once it is shutdown it cannot be restarted.

Generally, you want Batcher (and any associated RateLimiter) to run for the duration of your process. If that is true, you can simply use context.Background().

However, if you wanted Batcher (and any associated RateLimiter) to explicitly shutdown when some work was done, you could do something like:

func DoWork(workitems chan WorkItem) error {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // when DoWork is done (ie. workitems is closed), cancel the context
    batcher := gobatcher.NewBatcher()
    watcher := gobatcher.NewWatcher(func(batch []gobatcher.Operation) {
        // process the workitems in the batch
    })
    err := batcher.Start(ctx)
    if err != nil {
        return err
    }
    for workitem := range workitems
        op := gobatcher.NewOperation(watcher, 100, workitem, true)
        err := batcher.Enqueue(op)
        if err != nil {
            return err
        }
    }
}

Opportunities for improvement

  • This tool was originally designed to limit transactions against Azure Cosmos which has a cost model expressed as a single composite value (Request Unit). For datastores that might have more granular capacities, it would be nice to be able to provision Batcher with all those capacities and have an enqueue method that supports those costs. For example, memory, CPU, disk, network, etc. might all have separate capacities and individual queries might have individual costs.

  • There is currently not a good way to model a datastore that autoscales but might require some time to increase capacity. Ideally something that allowed for capacity to increase by "no more than x amount over y time" would be helpful. This could be a rate limiter or a feature that is added to existing rate limiters.

  • The pause logic is an existing feature that delays new batches for a fixed amount of time, but it might be nice to have an exponential back-off.

  • Currently the only LeaseManager is for Azure Blob Storage. It would be nice to add support for Zookeeper, Consul, etcd, redis/redsync, or similar.

  • There is currently no way to prioritize Operations so they are released before other Batches, but now that the Buffer is a double linked list, this would be possible.

Contributions

Please see our contributor guide.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Microsoft Patterns & Practices, Azure Architecture Center.

Documentation

Index

Constants

View Source
const (
	BatchEvent             = "batch"
	PauseEvent             = "pause"
	ResumeEvent            = "resume"
	ShutdownEvent          = "shutdown"
	AuditPassEvent         = "audit-pass"
	AuditFailEvent         = "audit-fail"
	AuditSkipEvent         = "audit-skip"
	RequestEvent           = "request"
	CapacityEvent          = "capacity"
	ReleasedEvent          = "released"
	AllocatedEvent         = "allocated"
	TargetEvent            = "target"
	VerifiedContainerEvent = "verified-container"
	CreatedContainerEvent  = "created-container"
	VerifiedBlobEvent      = "verified-blob"
	CreatedBlobEvent       = "created-blob"
	FailedEvent            = "failed"
	ErrorEvent             = "error"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AzureSharedResource

type AzureSharedResource struct {
	// contains filtered or unexported fields
}

func NewAzureSharedResource

func NewAzureSharedResource(accountName, containerName string, sharedCapacity uint32) *AzureSharedResource

This function should be called to create a new AzureSharedResource. The accountName and containerName refer to the details of an Azure Storage Account and container that the lease blobs can be created in. If multiple processes are sharing the same capacity, they should all point to the same container. The sharedCapacity parameter is the maximum shared capacity for the resource. For example, if you provision a Cosmos database with 20k RU, you might set sharedCapacity to 20,000. Capacity is renewed every 1 second. Commonly after calling NewAzureSharedResource() you will chain some WithXXXX methods, for instance... `NewAzureSharedResource().WithMasterKey(key)`.

func (*AzureSharedResource) AddListener

func (r *AzureSharedResource) AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID

func (*AzureSharedResource) Capacity

func (r *AzureSharedResource) Capacity() uint32

This returns the current allocated capacity. It is `NumberOfPartitionsControlled x Factor + ReservedCapacity`.

func (*AzureSharedResource) GiveMe

func (r *AzureSharedResource) GiveMe(target uint32)

You should call GiveMe() to update the capacity you are requesting. You will always specify the new amount of capacity you require. For instance, if you have a large queue of records to process, you might call GiveMe() every time new records are added to the queue and every time a batch is completed. Another common pattern is to call GiveMe() on a timer to keep it generally consistent with the capacity you need.

func (*AzureSharedResource) MaxCapacity

func (r *AzureSharedResource) MaxCapacity() uint32

This returns the maximum capacity that could ever be obtained by the rate limiter. It is `SharedCapacity + ReservedCapacity`.

func (*AzureSharedResource) Provision

func (r *AzureSharedResource) Provision(ctx context.Context) (err error)

Call this method before calling Start() to provision any needed partition blobs in the configured Azure Storage Account and container.

func (*AzureSharedResource) RemoveListener

func (r *AzureSharedResource) RemoveListener(id uuid.UUID)

func (*AzureSharedResource) Start

func (r *AzureSharedResource) Start(ctx context.Context) (err error)

Call this method to start the processing loop. It must be called after Provision(). The processing loop runs on a random interval not to exceed MaxInterval and attempts to obtain an exclusive lease on blob partitions to fulfill the capacity requests.

func (*AzureSharedResource) Stop

func (r *AzureSharedResource) Stop()

Call this method to stop the processing loop. You may not restart after stopping.

func (*AzureSharedResource) WithFactor

func (r *AzureSharedResource) WithFactor(val uint32) *AzureSharedResource

You may provide a factor that determines how much capacity each partition is worth. For instance, if you provision a Cosmos database with 20k RU, you might use a factor of 1000, meaning 20 partitions would be created, each worth 1k RU. If not provided, the factor defaults to `1`. There is a limit of 500 partitions, so if you have a shared capacity in excess of 500, you must provide a factor.

func (*AzureSharedResource) WithMasterKey

func (r *AzureSharedResource) WithMasterKey(val string) *AzureSharedResource

You must provide credentials for the AzureSharedResource to access the Azure Storage Account. Currently, the only supported method is to provide a read/write key via WithMasterKey(). This method is required unless you calling WithMocks().

func (*AzureSharedResource) WithMaxInterval

func (r *AzureSharedResource) WithMaxInterval(val uint32) *AzureSharedResource

The rate limiter will attempt to obtain an exclusive lease on a partition (when needed) every so often. The interval is random to reduce the number of collisions and to provide an equal opportunity for processes to compete for partitions. This setting determines the maximum amount of time between intervals. It defaults to `500` and is measured in milliseconds.

func (*AzureSharedResource) WithMocks

func (r *AzureSharedResource) WithMocks(container IAzureContainer, blob IAzureBlob) *AzureSharedResource

This allows you to provide mocked objects for container and blob for unit tests.

func (*AzureSharedResource) WithReservedCapacity

func (r *AzureSharedResource) WithReservedCapacity(val uint32) *AzureSharedResource

You may provide a reserved capacity. The capacity is always available to the rate limiter and is in addition to the shared capacity. For instance, if you have 4 processes and provision a Cosmos database with 28k RU, you might give each process 2,000 reserved capacity and 20,000 shared capacity. Any of the processes could obtain a maximum of 22,000 capacity. Capacity is renewed every 1 second. Generally you use reserved capacity to reduce your latency - you no longer have to wait on a partition to be acquired in order to process a small number of records.

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

func (*Batcher) AddListener

func (r *Batcher) AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID

func (*Batcher) Enqueue

func (r *Batcher) Enqueue(op IOperation) error

Call this method to add an Operation into the buffer.

func (*Batcher) Flush

func (r *Batcher) Flush()

Call this method to manually flush as if the flushInterval were triggered.

func (*Batcher) NeedsCapacity

func (r *Batcher) NeedsCapacity() uint32

This tells you how much capacity the Batcher believes it needs to process everything outstanding. Outstanding operations include those in the buffer and operations and any that have been sent as a batch but not marked done yet.

func (*Batcher) OperationsInBuffer

func (r *Batcher) OperationsInBuffer() uint32

This tells you how many operations are still in the buffer. This does not include operations that have been sent back to the Watcher as part of a batch for processing.

func (*Batcher) Pause

func (r *Batcher) Pause()

Call this method when your datastore is throwing transient errors. This pauses the processing loop to ensure that you are not flooding the datastore with additional data it cannot process making the situation worse.

func (*Batcher) RemoveListener

func (r *Batcher) RemoveListener(id uuid.UUID)

func (*Batcher) Start

func (r *Batcher) Start() (err error)

Call this method to start the processing loop. The processing loop requests capacity at the CapacityInterval, organizes operations into batches at the FlushInterval, and audits the capacity target at the AuditInterval.

func (*Batcher) Stop

func (r *Batcher) Stop()

Call this method to stop the processing loop. You may not restart after stopping.

func (*Batcher) WithAuditInterval

func (r *Batcher) WithAuditInterval(val time.Duration) IBatcher

The AuditInterval determines how often the target capacity is audited to ensure it still seems legitimate. The default is `10s`. The target capacity is the amount of capacity the Batcher thinks it needs to process all outstanding Operations. Only atomic operatios are performed on the target and there are other failsafes such as MaxOperationTime, however, since it is critical that the target capacity be correct, this is one final failsafe to ensure the Batcher isn't asking for the wrong capacity. Generally you should leave this set at the default.

func (*Batcher) WithCapacityInterval

func (r *Batcher) WithCapacityInterval(val time.Duration) IBatcher

The CapacityInterval determines how often the processing loop asks the rate limiter for capacity by calling GiveMe(). The default is `100ms`. The Batcher asks for capacity equal to every Operation's cost that has not been marked done. In other words, when you Enqueue() an Operation it increments a target based on cost. When you call done() on a batch (or the MaxOperationTime is exceeded), the target is decremented by the cost of all Operations in the batch. If there is no rate limiter attached, this interval does nothing.

func (*Batcher) WithEmitBatch

func (r *Batcher) WithEmitBatch() IBatcher

DO NOT SET THIS IN PRODUCTION. For unit tests, it may be beneficial to raise an event for each batch of operations.

func (*Batcher) WithErrorOnFullBuffer

func (r *Batcher) WithErrorOnFullBuffer() IBatcher

Setting this option changes Enqueue() such that it throws an error if the buffer is full. Normal behavior is for the Enqueue() func to block until it is able to add to the buffer.

func (*Batcher) WithFlushInterval

func (r *Batcher) WithFlushInterval(val time.Duration) IBatcher

The FlushInterval determines how often the processing loop attempts to flush buffered Operations. The default is `100ms`. If a rate limiter is being used, the interval determines the capacity that each flush has to work with. For instance, with the default 100ms and 10,000 available capacity, there would be 10 flushes per second, each dispatching one or more batches of Operations that aim for 1,000 total capacity. If no rate limiter is used, each flush will attempt to empty the buffer.

func (*Batcher) WithMaxOperationTime

func (r *Batcher) WithMaxOperationTime(val time.Duration) IBatcher

The MaxOperationTime determines how long Batcher waits until marking a batch done after releasing it to the Watcher. The default is `1m`. You should always call the done() func when your batch has completed processing instead of relying on MaxOperationTime. The MaxOperationTime on Batcher will be superceded by MaxOperationTime on Watcher if provided.

func (*Batcher) WithPauseTime

func (r *Batcher) WithPauseTime(val time.Duration) IBatcher

The PauseTime determines how long Batcher suspends the processing loop once Pause() is called. The default is `500ms`. Typically, Pause() is called because errors are being received from the datastore such as TooManyRequests or Timeout. Pausing hopefully allows the datastore to catch up without making the problem worse.

func (*Batcher) WithRateLimiter

func (r *Batcher) WithRateLimiter(rl RateLimiter) IBatcher

Use AzureSharedResource or ProvisionedResource as a rate limiter with Batcher to throttle the requests made against a datastore. This is optional; the default behavior does not rate limit.

type BatcherImproperOrderError

type BatcherImproperOrderError struct{}

func (BatcherImproperOrderError) Error

type BufferFullError

type BufferFullError struct{}

func (BufferFullError) Error

func (e BufferFullError) Error() string

type BufferNotAllocated

type BufferNotAllocated struct{}

func (BufferNotAllocated) Error

func (e BufferNotAllocated) Error() string

type IAzureContainer

type IAzureContainer interface {
	Create(context.Context, azblob.Metadata, azblob.PublicAccessType) (*azblob.ContainerCreateResponse, error)
	NewBlockBlobURL(string) azblob.BlockBlobURL
}

This interface describes an Azure Storage Container that can be mocked.

type IBatcher

type IBatcher interface {
	WithRateLimiter(rl RateLimiter) IBatcher
	WithFlushInterval(val time.Duration) IBatcher
	WithCapacityInterval(val time.Duration) IBatcher
	WithAuditInterval(val time.Duration) IBatcher
	WithMaxOperationTime(val time.Duration) IBatcher
	WithPauseTime(val time.Duration) IBatcher
	WithErrorOnFullBuffer() IBatcher
	WithEmitBatch() IBatcher
	Enqueue(op IOperation) error
	Pause()
	Flush()
	OperationsInBuffer() uint32
	NeedsCapacity() uint32
	Start() (err error)
	Stop()
	// contains filtered or unexported methods
}

func NewBatcher

func NewBatcher() IBatcher

This method creates a new Batcher. Generally you should have 1 Batcher per datastore. Commonly after calling NewBatcher() you will chain some WithXXXX methods, for instance... `NewBatcher().WithRateLimiter(limiter)`.

func NewBatcherWithBuffer

func NewBatcherWithBuffer(maxBufferSize uint32) IBatcher

type IOperation

type IOperation interface {
	Payload() interface{}
	Attempt() uint32
	Cost() uint32
	Watcher() IWatcher
	IsBatchable() bool
	MakeAttempt()
}

func NewOperation

func NewOperation(watcher IWatcher, cost uint32, payload interface{}, batchable bool) IOperation

type IWatcher

type IWatcher interface {
	WithMaxAttempts(val uint32) IWatcher
	WithMaxBatchSize(val uint32) IWatcher
	WithMaxOperationTime(val time.Duration) IWatcher
	MaxAttempts() uint32
	MaxBatchSize() uint32
	MaxOperationTime() time.Duration
	ProcessBatch(ops []IOperation)
}

func NewWatcher

func NewWatcher(onReady func(batch []IOperation)) IWatcher

type NoOperationError

type NoOperationError struct{}

func (NoOperationError) Error

func (e NoOperationError) Error() string

type NoWatcherError

type NoWatcherError struct{}

func (NoWatcherError) Error

func (e NoWatcherError) Error() string

type Operation

type Operation struct {
	// contains filtered or unexported fields
}

func (*Operation) Attempt

func (o *Operation) Attempt() uint32

func (*Operation) Cost

func (o *Operation) Cost() uint32

func (*Operation) IsBatchable

func (o *Operation) IsBatchable() bool

func (*Operation) MakeAttempt

func (o *Operation) MakeAttempt()

func (*Operation) Payload

func (o *Operation) Payload() interface{}

func (*Operation) Watcher

func (o *Operation) Watcher() IWatcher

type PartitionsOutOfRangeError

type PartitionsOutOfRangeError struct {
	MaxCapacity    uint32
	Factor         uint32
	PartitionCount int
}

func (PartitionsOutOfRangeError) Error

type ProvisionedResource

type ProvisionedResource struct {
	// contains filtered or unexported fields
}

func NewProvisionedResource

func NewProvisionedResource(capacity uint32) *ProvisionedResource

func (*ProvisionedResource) AddListener

func (r *ProvisionedResource) AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID

func (*ProvisionedResource) Capacity

func (r *ProvisionedResource) Capacity() uint32

func (*ProvisionedResource) GiveMe

func (r *ProvisionedResource) GiveMe(target uint32)

func (*ProvisionedResource) MaxCapacity

func (r *ProvisionedResource) MaxCapacity() uint32

func (*ProvisionedResource) Provision

func (r *ProvisionedResource) Provision(ctx context.Context) error

func (*ProvisionedResource) RemoveListener

func (r *ProvisionedResource) RemoveListener(id uuid.UUID)

func (*ProvisionedResource) Start

func (r *ProvisionedResource) Start(ctx context.Context) error

func (*ProvisionedResource) Stop

func (r *ProvisionedResource) Stop()

type RateLimiter

type RateLimiter interface {
	Provision(ctx context.Context) error
	MaxCapacity() uint32
	Capacity() uint32
	GiveMe(target uint32)
	Start(ctx context.Context) error
	Stop()
}

type RateLimiterImproperOrderError

type RateLimiterImproperOrderError struct{}

func (RateLimiterImproperOrderError) Error

type TooExpensiveError

type TooExpensiveError struct{}

func (TooExpensiveError) Error

func (e TooExpensiveError) Error() string

type TooManyAttemptsError

type TooManyAttemptsError struct{}

func (TooManyAttemptsError) Error

func (e TooManyAttemptsError) Error() string

type UndefinedLeaseManagerError

type UndefinedLeaseManagerError struct{}

func (UndefinedLeaseManagerError) Error

type UndefinedSharedCapacityError

type UndefinedSharedCapacityError struct{}

func (UndefinedSharedCapacityError) Error

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

func (*Watcher) MaxAttempts

func (w *Watcher) MaxAttempts() uint32

func (*Watcher) MaxBatchSize

func (w *Watcher) MaxBatchSize() uint32

func (*Watcher) MaxOperationTime

func (w *Watcher) MaxOperationTime() time.Duration

func (*Watcher) ProcessBatch

func (w *Watcher) ProcessBatch(batch []IOperation)

func (*Watcher) WithMaxAttempts

func (w *Watcher) WithMaxAttempts(val uint32) IWatcher

func (*Watcher) WithMaxBatchSize

func (w *Watcher) WithMaxBatchSize(val uint32) IWatcher

func (*Watcher) WithMaxOperationTime

func (w *Watcher) WithMaxOperationTime(val time.Duration) IWatcher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL