Documentation ¶
Index ¶
- Constants
- type AzureSharedResource
- func (r *AzureSharedResource) AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID
- func (r *AzureSharedResource) Capacity() uint32
- func (r *AzureSharedResource) GiveMe(target uint32)
- func (r *AzureSharedResource) MaxCapacity() uint32
- func (r *AzureSharedResource) Provision(ctx context.Context) (err error)
- func (r *AzureSharedResource) RemoveListener(id uuid.UUID)
- func (r *AzureSharedResource) Start(ctx context.Context) (err error)
- func (r *AzureSharedResource) Stop()
- func (r *AzureSharedResource) WithFactor(val uint32) *AzureSharedResource
- func (r *AzureSharedResource) WithMasterKey(val string) *AzureSharedResource
- func (r *AzureSharedResource) WithMaxInterval(val uint32) *AzureSharedResource
- func (r *AzureSharedResource) WithMocks(container IAzureContainer, blob IAzureBlob) *AzureSharedResource
- func (r *AzureSharedResource) WithReservedCapacity(val uint32) *AzureSharedResource
- type Batcher
- func (r *Batcher) AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID
- func (r *Batcher) Enqueue(op IOperation) error
- func (r *Batcher) Flush()
- func (r *Batcher) NeedsCapacity() uint32
- func (r *Batcher) OperationsInBuffer() uint32
- func (r *Batcher) Pause()
- func (r *Batcher) RemoveListener(id uuid.UUID)
- func (r *Batcher) Start() (err error)
- func (r *Batcher) Stop()
- func (r *Batcher) WithAuditInterval(val time.Duration) IBatcher
- func (r *Batcher) WithCapacityInterval(val time.Duration) IBatcher
- func (r *Batcher) WithEmitBatch() IBatcher
- func (r *Batcher) WithErrorOnFullBuffer() IBatcher
- func (r *Batcher) WithFlushInterval(val time.Duration) IBatcher
- func (r *Batcher) WithMaxOperationTime(val time.Duration) IBatcher
- func (r *Batcher) WithPauseTime(val time.Duration) IBatcher
- func (r *Batcher) WithRateLimiter(rl RateLimiter) IBatcher
- type BatcherImproperOrderError
- type BufferFullError
- type BufferNotAllocated
- type IAzureBlob
- type IAzureContainer
- type IBatcher
- type IOperation
- type IWatcher
- type NoOperationError
- type NoWatcherError
- type Operation
- type PartitionsOutOfRangeError
- type ProvisionedResource
- func (r *ProvisionedResource) AddListener(fn func(event string, val int, msg string, metadata interface{})) uuid.UUID
- func (r *ProvisionedResource) Capacity() uint32
- func (r *ProvisionedResource) GiveMe(target uint32)
- func (r *ProvisionedResource) MaxCapacity() uint32
- func (r *ProvisionedResource) Provision(ctx context.Context) error
- func (r *ProvisionedResource) RemoveListener(id uuid.UUID)
- func (r *ProvisionedResource) Start(ctx context.Context) error
- func (r *ProvisionedResource) Stop()
- type RateLimiter
- type RateLimiterImproperOrderError
- type TooExpensiveError
- type TooManyAttemptsError
- type UndefinedLeaseManagerError
- type UndefinedSharedCapacityError
- type Watcher
- func (w *Watcher) MaxAttempts() uint32
- func (w *Watcher) MaxBatchSize() uint32
- func (w *Watcher) MaxOperationTime() time.Duration
- func (w *Watcher) ProcessBatch(batch []IOperation)
- func (w *Watcher) WithMaxAttempts(val uint32) IWatcher
- func (w *Watcher) WithMaxBatchSize(val uint32) IWatcher
- func (w *Watcher) WithMaxOperationTime(val time.Duration) IWatcher
Constants ¶
const ( BatchEvent = "batch" PauseEvent = "pause" ResumeEvent = "resume" ShutdownEvent = "shutdown" AuditPassEvent = "audit-pass" AuditFailEvent = "audit-fail" AuditSkipEvent = "audit-skip" RequestEvent = "request" CapacityEvent = "capacity" ReleasedEvent = "released" AllocatedEvent = "allocated" TargetEvent = "target" VerifiedContainerEvent = "verified-container" CreatedContainerEvent = "created-container" VerifiedBlobEvent = "verified-blob" CreatedBlobEvent = "created-blob" FailedEvent = "failed" ErrorEvent = "error" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AzureSharedResource ¶
type AzureSharedResource struct {
// contains filtered or unexported fields
}
func NewAzureSharedResource ¶
func NewAzureSharedResource(accountName, containerName string, sharedCapacity uint32) *AzureSharedResource
This function should be called to create a new AzureSharedResource. The accountName and containerName refer to the details of an Azure Storage Account and container that the lease blobs can be created in. If multiple processes are sharing the same capacity, they should all point to the same container. The sharedCapacity parameter is the maximum shared capacity for the resource. For example, if you provision a Cosmos database with 20k RU, you might set sharedCapacity to 20,000. Capacity is renewed every 1 second. Commonly after calling NewAzureSharedResource() you will chain some WithXXXX methods, for instance... `NewAzureSharedResource().WithMasterKey(key)`.
func (*AzureSharedResource) AddListener ¶
func (*AzureSharedResource) Capacity ¶
func (r *AzureSharedResource) Capacity() uint32
This returns the current allocated capacity. It is `NumberOfPartitionsControlled x Factor + ReservedCapacity`.
func (*AzureSharedResource) GiveMe ¶
func (r *AzureSharedResource) GiveMe(target uint32)
You should call GiveMe() to update the capacity you are requesting. You will always specify the new amount of capacity you require. For instance, if you have a large queue of records to process, you might call GiveMe() every time new records are added to the queue and every time a batch is completed. Another common pattern is to call GiveMe() on a timer to keep it generally consistent with the capacity you need.
func (*AzureSharedResource) MaxCapacity ¶
func (r *AzureSharedResource) MaxCapacity() uint32
This returns the maximum capacity that could ever be obtained by the rate limiter. It is `SharedCapacity + ReservedCapacity`.
func (*AzureSharedResource) Provision ¶
func (r *AzureSharedResource) Provision(ctx context.Context) (err error)
Call this method before calling Start() to provision any needed partition blobs in the configured Azure Storage Account and container.
func (*AzureSharedResource) RemoveListener ¶
func (*AzureSharedResource) Start ¶
func (r *AzureSharedResource) Start(ctx context.Context) (err error)
Call this method to start the processing loop. It must be called after Provision(). The processing loop runs on a random interval not to exceed MaxInterval and attempts to obtain an exclusive lease on blob partitions to fulfill the capacity requests.
func (*AzureSharedResource) Stop ¶
func (r *AzureSharedResource) Stop()
Call this method to stop the processing loop. You may not restart after stopping.
func (*AzureSharedResource) WithFactor ¶
func (r *AzureSharedResource) WithFactor(val uint32) *AzureSharedResource
You may provide a factor that determines how much capacity each partition is worth. For instance, if you provision a Cosmos database with 20k RU, you might use a factor of 1000, meaning 20 partitions would be created, each worth 1k RU. If not provided, the factor defaults to `1`. There is a limit of 500 partitions, so if you have a shared capacity in excess of 500, you must provide a factor.
func (*AzureSharedResource) WithMasterKey ¶
func (r *AzureSharedResource) WithMasterKey(val string) *AzureSharedResource
You must provide credentials for the AzureSharedResource to access the Azure Storage Account. Currently, the only supported method is to provide a read/write key via WithMasterKey(). This method is required unless you calling WithMocks().
func (*AzureSharedResource) WithMaxInterval ¶
func (r *AzureSharedResource) WithMaxInterval(val uint32) *AzureSharedResource
The rate limiter will attempt to obtain an exclusive lease on a partition (when needed) every so often. The interval is random to reduce the number of collisions and to provide an equal opportunity for processes to compete for partitions. This setting determines the maximum amount of time between intervals. It defaults to `500` and is measured in milliseconds.
func (*AzureSharedResource) WithMocks ¶
func (r *AzureSharedResource) WithMocks(container IAzureContainer, blob IAzureBlob) *AzureSharedResource
This allows you to provide mocked objects for container and blob for unit tests.
func (*AzureSharedResource) WithReservedCapacity ¶
func (r *AzureSharedResource) WithReservedCapacity(val uint32) *AzureSharedResource
You may provide a reserved capacity. The capacity is always available to the rate limiter and is in addition to the shared capacity. For instance, if you have 4 processes and provision a Cosmos database with 28k RU, you might give each process 2,000 reserved capacity and 20,000 shared capacity. Any of the processes could obtain a maximum of 22,000 capacity. Capacity is renewed every 1 second. Generally you use reserved capacity to reduce your latency - you no longer have to wait on a partition to be acquired in order to process a small number of records.
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
func (*Batcher) AddListener ¶
func (*Batcher) Enqueue ¶
func (r *Batcher) Enqueue(op IOperation) error
Call this method to add an Operation into the buffer.
func (*Batcher) Flush ¶
func (r *Batcher) Flush()
Call this method to manually flush as if the flushInterval were triggered.
func (*Batcher) NeedsCapacity ¶
This tells you how much capacity the Batcher believes it needs to process everything outstanding. Outstanding operations include those in the buffer and operations and any that have been sent as a batch but not marked done yet.
func (*Batcher) OperationsInBuffer ¶
This tells you how many operations are still in the buffer. This does not include operations that have been sent back to the Watcher as part of a batch for processing.
func (*Batcher) Pause ¶
func (r *Batcher) Pause()
Call this method when your datastore is throwing transient errors. This pauses the processing loop to ensure that you are not flooding the datastore with additional data it cannot process making the situation worse.
func (*Batcher) RemoveListener ¶
func (*Batcher) Start ¶
Call this method to start the processing loop. The processing loop requests capacity at the CapacityInterval, organizes operations into batches at the FlushInterval, and audits the capacity target at the AuditInterval.
func (*Batcher) Stop ¶
func (r *Batcher) Stop()
Call this method to stop the processing loop. You may not restart after stopping.
func (*Batcher) WithAuditInterval ¶
The AuditInterval determines how often the target capacity is audited to ensure it still seems legitimate. The default is `10s`. The target capacity is the amount of capacity the Batcher thinks it needs to process all outstanding Operations. Only atomic operatios are performed on the target and there are other failsafes such as MaxOperationTime, however, since it is critical that the target capacity be correct, this is one final failsafe to ensure the Batcher isn't asking for the wrong capacity. Generally you should leave this set at the default.
func (*Batcher) WithCapacityInterval ¶
The CapacityInterval determines how often the processing loop asks the rate limiter for capacity by calling GiveMe(). The default is `100ms`. The Batcher asks for capacity equal to every Operation's cost that has not been marked done. In other words, when you Enqueue() an Operation it increments a target based on cost. When you call done() on a batch (or the MaxOperationTime is exceeded), the target is decremented by the cost of all Operations in the batch. If there is no rate limiter attached, this interval does nothing.
func (*Batcher) WithEmitBatch ¶
DO NOT SET THIS IN PRODUCTION. For unit tests, it may be beneficial to raise an event for each batch of operations.
func (*Batcher) WithErrorOnFullBuffer ¶
Setting this option changes Enqueue() such that it throws an error if the buffer is full. Normal behavior is for the Enqueue() func to block until it is able to add to the buffer.
func (*Batcher) WithFlushInterval ¶
The FlushInterval determines how often the processing loop attempts to flush buffered Operations. The default is `100ms`. If a rate limiter is being used, the interval determines the capacity that each flush has to work with. For instance, with the default 100ms and 10,000 available capacity, there would be 10 flushes per second, each dispatching one or more batches of Operations that aim for 1,000 total capacity. If no rate limiter is used, each flush will attempt to empty the buffer.
func (*Batcher) WithMaxOperationTime ¶
The MaxOperationTime determines how long Batcher waits until marking a batch done after releasing it to the Watcher. The default is `1m`. You should always call the done() func when your batch has completed processing instead of relying on MaxOperationTime. The MaxOperationTime on Batcher will be superceded by MaxOperationTime on Watcher if provided.
func (*Batcher) WithPauseTime ¶
The PauseTime determines how long Batcher suspends the processing loop once Pause() is called. The default is `500ms`. Typically, Pause() is called because errors are being received from the datastore such as TooManyRequests or Timeout. Pausing hopefully allows the datastore to catch up without making the problem worse.
func (*Batcher) WithRateLimiter ¶
func (r *Batcher) WithRateLimiter(rl RateLimiter) IBatcher
Use AzureSharedResource or ProvisionedResource as a rate limiter with Batcher to throttle the requests made against a datastore. This is optional; the default behavior does not rate limit.
type BatcherImproperOrderError ¶
type BatcherImproperOrderError struct{}
func (BatcherImproperOrderError) Error ¶
func (e BatcherImproperOrderError) Error() string
type BufferFullError ¶
type BufferFullError struct{}
func (BufferFullError) Error ¶
func (e BufferFullError) Error() string
type BufferNotAllocated ¶
type BufferNotAllocated struct{}
func (BufferNotAllocated) Error ¶
func (e BufferNotAllocated) Error() string
type IAzureBlob ¶
type IAzureBlob interface { Upload(context.Context, io.ReadSeeker, azblob.BlobHTTPHeaders, azblob.Metadata, azblob.BlobAccessConditions, azblob.AccessTierType, azblob.BlobTagsMap, azblob.ClientProvidedKeyOptions) (*azblob.BlockBlobUploadResponse, error) AcquireLease(context.Context, string, int32, azblob.ModifiedAccessConditions) (*azblob.BlobAcquireLeaseResponse, error) }
This interface describes an Azure Storage Blob that can be mocked.
type IAzureContainer ¶
type IAzureContainer interface { Create(context.Context, azblob.Metadata, azblob.PublicAccessType) (*azblob.ContainerCreateResponse, error) NewBlockBlobURL(string) azblob.BlockBlobURL }
This interface describes an Azure Storage Container that can be mocked.
type IBatcher ¶
type IBatcher interface { WithRateLimiter(rl RateLimiter) IBatcher WithFlushInterval(val time.Duration) IBatcher WithCapacityInterval(val time.Duration) IBatcher WithAuditInterval(val time.Duration) IBatcher WithMaxOperationTime(val time.Duration) IBatcher WithPauseTime(val time.Duration) IBatcher WithErrorOnFullBuffer() IBatcher WithEmitBatch() IBatcher Enqueue(op IOperation) error Pause() Flush() OperationsInBuffer() uint32 NeedsCapacity() uint32 Start() (err error) Stop() // contains filtered or unexported methods }
func NewBatcher ¶
func NewBatcher() IBatcher
This method creates a new Batcher. Generally you should have 1 Batcher per datastore. Commonly after calling NewBatcher() you will chain some WithXXXX methods, for instance... `NewBatcher().WithRateLimiter(limiter)`.
func NewBatcherWithBuffer ¶
type IOperation ¶
type IOperation interface { Payload() interface{} Attempt() uint32 Cost() uint32 Watcher() IWatcher IsBatchable() bool MakeAttempt() }
func NewOperation ¶
func NewOperation(watcher IWatcher, cost uint32, payload interface{}, batchable bool) IOperation
type IWatcher ¶
type IWatcher interface { WithMaxAttempts(val uint32) IWatcher WithMaxBatchSize(val uint32) IWatcher WithMaxOperationTime(val time.Duration) IWatcher MaxAttempts() uint32 MaxBatchSize() uint32 MaxOperationTime() time.Duration ProcessBatch(ops []IOperation) }
func NewWatcher ¶
func NewWatcher(onReady func(batch []IOperation)) IWatcher
type NoOperationError ¶
type NoOperationError struct{}
func (NoOperationError) Error ¶
func (e NoOperationError) Error() string
type NoWatcherError ¶
type NoWatcherError struct{}
func (NoWatcherError) Error ¶
func (e NoWatcherError) Error() string
type Operation ¶
type Operation struct {
// contains filtered or unexported fields
}
func (*Operation) IsBatchable ¶
func (*Operation) MakeAttempt ¶
func (o *Operation) MakeAttempt()
type PartitionsOutOfRangeError ¶
func (PartitionsOutOfRangeError) Error ¶
func (e PartitionsOutOfRangeError) Error() string
type ProvisionedResource ¶
type ProvisionedResource struct {
// contains filtered or unexported fields
}
func NewProvisionedResource ¶
func NewProvisionedResource(capacity uint32) *ProvisionedResource
func (*ProvisionedResource) AddListener ¶
func (*ProvisionedResource) Capacity ¶
func (r *ProvisionedResource) Capacity() uint32
func (*ProvisionedResource) GiveMe ¶
func (r *ProvisionedResource) GiveMe(target uint32)
func (*ProvisionedResource) MaxCapacity ¶
func (r *ProvisionedResource) MaxCapacity() uint32
func (*ProvisionedResource) Provision ¶
func (r *ProvisionedResource) Provision(ctx context.Context) error
func (*ProvisionedResource) RemoveListener ¶
func (*ProvisionedResource) Stop ¶
func (r *ProvisionedResource) Stop()
type RateLimiter ¶
type RateLimiterImproperOrderError ¶
type RateLimiterImproperOrderError struct{}
func (RateLimiterImproperOrderError) Error ¶
func (e RateLimiterImproperOrderError) Error() string
type TooExpensiveError ¶
type TooExpensiveError struct{}
func (TooExpensiveError) Error ¶
func (e TooExpensiveError) Error() string
type TooManyAttemptsError ¶
type TooManyAttemptsError struct{}
func (TooManyAttemptsError) Error ¶
func (e TooManyAttemptsError) Error() string
type UndefinedLeaseManagerError ¶
type UndefinedLeaseManagerError struct{}
func (UndefinedLeaseManagerError) Error ¶
func (e UndefinedLeaseManagerError) Error() string
type UndefinedSharedCapacityError ¶
type UndefinedSharedCapacityError struct{}
func (UndefinedSharedCapacityError) Error ¶
func (e UndefinedSharedCapacityError) Error() string
type Watcher ¶
type Watcher struct {
// contains filtered or unexported fields
}
func (*Watcher) MaxAttempts ¶
func (*Watcher) MaxBatchSize ¶
func (*Watcher) MaxOperationTime ¶
func (*Watcher) ProcessBatch ¶
func (w *Watcher) ProcessBatch(batch []IOperation)