Documentation
¶
Overview ¶
Package delaybuffer provides a generic, time-based buffer to regulate the flow of data items.
The buffer works by holding items for a minimum specified duration before releasing them to an output channel. This is useful for cushioning against unpredictable processing delays upstream, ensuring a smoother, more regulated data flow downstream.
It is safe for concurrent use.
Usage ¶
To use the buffer, you need a data type that implements the `delaybuffer.DataItem` interface by providing a `CreatedTime() time.Time` method.
Example:
First, define your data type:
type MyItem struct {
ID string
Timestamp time.Time
}
func (i MyItem) CreatedTime() time.Time {
return i.Timestamp
}
Next, create and use the buffer:
// Create an input channel and the buffer.
inChan := make(chan MyItem)
bufferDelay := 2 * time.Second
tickerInterval := 100 * time.Millisecond // Check for items every 100ms
buffer := delaybuffer.NewBuffer(inChan, bufferDelay, tickerInterval)
// Start a goroutine to listen for processed items.
go func() {
for item := range buffer.Out {
fmt.Printf("Processed item: %s\n", item.ID)
}
}()
// Send items to the buffer.
inChan <- MyItem{ID: "item-1", Timestamp: time.Now()}
// To shut down gracefully, close the input channel and call Close.
close(inChan)
buffer.Close()
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Buffer ¶
type Buffer[T DataItem] struct { // Out is the channel from which buffered items can be consumed. Out chan T // contains filtered or unexported fields }
Buffer is a generic, time-delayed buffer that holds items for a minimum duration before releasing them. It is safe for concurrent use.
func NewBuffer ¶
func NewBuffer[T DataItem](in <-chan T, delay time.Duration, tickerDuration time.Duration) *Buffer[T]
NewBuffer creates and starts a new Buffer.
Parameters:
in: The channel to receive items from.
delay: The minimum time an item should stay in the buffer before being released.
tickerDuration: How often the buffer checks to release items. If a non-positive
value (<= 0) is provided, it defaults to 100ms.
func (*Buffer[T]) Close ¶
func (b *Buffer[T]) Close()
Close signals the buffer to immediately release all of its contained items to the Out channel, regardless of their delay. It then waits for the buffer to shut down completely, cleaning up all resources. This provides a graceful shutdown mechanism.