Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AckFunc ¶
AckFunc is a function used to acknowledge receipt of a message batch from a buffer. The provided error indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type ReaderWriter ¶
type ReaderWriter interface {
// Read the next oldest message batch. If the buffer has a persisted store
// the message is preserved until the returned AckFunc is called. Some
// temporal buffer implementations such as windowers will ignore the ack
// func.
Read(context.Context) (types.Message, AckFunc, error)
// Write a new message batch to the stack.
Write(context.Context, types.Message, AckFunc) error
// EndOfInput indicates to the buffer that the input has ended and that once
// the buffer is depleted it should return types.ErrTypeClosed from Read in
// order to gracefully shut down the pipeline.
//
// EndOfInput should be idempotent as it may be called more than once.
EndOfInput()
// Close the buffer and all resources it has, messages should no longer be
// written or read by the implementation and it should clean up all
// resources.
Close(context.Context) error
}
ReaderWriter is a read/write interface implemented by buffers.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream wraps a read/write buffer implementation with a channel based streaming component that satisfies the internal Benthos Consumer and Producer interfaces.
func (*Stream) CloseAsync ¶
func (m *Stream) CloseAsync()
CloseAsync shuts down the Stream and stops processing messages.
func (*Stream) Consume ¶
func (m *Stream) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*Stream) StopConsuming ¶
func (m *Stream) StopConsuming()
StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.
func (*Stream) TransactionChan ¶
func (m *Stream) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this buffer.