Documentation
¶
Index ¶
- Constants
- Variables
- func HandleResponse[T any](send chan<- interface{}, transform func(resp T) (interface{}, error)) func(resp T) error
- func HandleSubscription[T any](sub Subscription, handleResponse func(resp T) error) error
- type GetDataByHeightFunc
- type HeightBasedSubscription
- type Streamable
- type Streamer
- type StreamingData
- type Subscription
- type SubscriptionHandler
- type SubscriptionImpl
- func (sub *SubscriptionImpl) Channel() <-chan interface{}
- func (sub *SubscriptionImpl) Close()
- func (sub *SubscriptionImpl) Err() error
- func (sub *SubscriptionImpl) Fail(err error)
- func (sub *SubscriptionImpl) ID() string
- func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error
Constants ¶
const ( // DefaultSendBufferSize is the default buffer size for the subscription's send channel. // The size is chosen to balance memory overhead from each subscription with performance when // streaming existing data. DefaultSendBufferSize = 10 // DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time. DefaultMaxGlobalStreams = 1000 // DefaultCacheSize defines the default max number of objects for the execution data cache. DefaultCacheSize = 100 // DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout // expires, the connection is closed. DefaultSendTimeout = 30 * time.Second // DefaultResponseLimit is default max responses per second allowed on a stream. After exceeding // the limit, the stream is paused until more capacity is available. DefaultResponseLimit = float64(0) // DefaultHeartbeatInterval specifies the block interval at which heartbeat messages should be sent. DefaultHeartbeatInterval = 1 )
Variables ¶
var ErrBlockNotReady = errors.New("block not ready")
ErrBlockNotReady represents an error indicating that a block is not yet available or ready.
var ErrEndOfData = errors.New("end of data")
ErrEndOfData represents an error indicating that no more data available for streaming.
Functions ¶
func HandleResponse ¶ added in v0.38.0
func HandleResponse[T any](send chan<- interface{}, transform func(resp T) (interface{}, error)) func(resp T) error
HandleResponse processes a generic response of type and sends it to the provided channel.
Parameters: - send: The channel to which the processed response is sent. - transform: A function to transform the response into the expected interface{} type.
No errors are expected during normal operations.
func HandleSubscription ¶
func HandleSubscription[T any](sub Subscription, handleResponse func(resp T) error) error
HandleSubscription is a generic handler for subscriptions to a specific type. It continuously listens to the subscription channel, handles the received responses, and sends the processed information to the client via the provided stream using handleResponse.
Parameters: - sub: The subscription. - handleResponse: The function responsible for handling the response of the subscribed type.
No errors are expected during normal operations.
Types ¶
type GetDataByHeightFunc ¶
GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions
type HeightBasedSubscription ¶
type HeightBasedSubscription struct { *SubscriptionImpl // contains filtered or unexported fields }
HeightBasedSubscription is a subscription that retrieves data sequentially by block height
func NewHeightBasedSubscription ¶
func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription
type Streamable ¶
type Streamable interface { // ID returns the subscription ID // Note: this is not a cryptographic hash ID() string // Close is called when a subscription ends gracefully, and closes the subscription channel Close() // Fail registers an error and closes the subscription channel Fail(error) // Send sends a value to the subscription channel or returns an error // Expected errors: // - context.DeadlineExceeded if send timed out // - context.Canceled if the client disconnected Send(context.Context, interface{}, time.Duration) error // Next returns the value for the next height from the subscription Next(context.Context) (interface{}, error) }
Streamable represents a subscription that can be streamed.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer represents a streaming subscription that delivers data to clients.
func NewStreamer ¶
func NewStreamer( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, limit float64, sub Streamable, ) *Streamer
NewStreamer creates a new Streamer instance.
func (*Streamer) Stream ¶
Stream is a blocking method that streams data to the subscription until either the context is cancelled or it encounters an error. This function follows a somewhat unintuitive contract: if the context is canceled, it is treated as an error and written to the subscription. However, you can rely on this behavior in the subscription to handle it as a graceful shutdown.
type StreamingData ¶
StreamingData represents common streaming data configuration for access and state_stream handlers.
func NewStreamingData ¶
func NewStreamingData(maxStreams uint32) StreamingData
type Subscription ¶
type Subscription interface { // ID returns the unique identifier for this subscription used for logging ID() string // Channel returns the channel from which subscription data can be read Channel() <-chan interface{} // Err returns the error that caused the subscription to fail Err() error }
Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.
type SubscriptionHandler ¶
type SubscriptionHandler struct {
// contains filtered or unexported fields
}
SubscriptionHandler represents common streaming data configuration for creating streaming subscription.
func NewSubscriptionHandler ¶
func NewSubscriptionHandler( log zerolog.Logger, broadcaster *engine.Broadcaster, sendTimeout time.Duration, responseLimit float64, sendBufferSize uint, ) *SubscriptionHandler
NewSubscriptionHandler creates a new SubscriptionHandler instance.
Parameters: - log: The logger to use for logging. - broadcaster: The engine broadcaster for publishing notifications. - sendTimeout: The duration after which a send operation will timeout. - responseLimit: The maximum allowed response time for a single stream. - sendBufferSize: The size of the response buffer for sending messages to the client.
Returns a new SubscriptionHandler instance.
func (*SubscriptionHandler) Subscribe ¶
func (h *SubscriptionHandler) Subscribe( ctx context.Context, startHeight uint64, getData GetDataByHeightFunc, ) Subscription
Subscribe creates and starts a new subscription.
Parameters: - ctx: The context for the operation. - startHeight: The height to start subscription from. - getData: The function to retrieve data by height.
type SubscriptionImpl ¶
type SubscriptionImpl struct {
// contains filtered or unexported fields
}
func NewFailedSubscription ¶
func NewFailedSubscription(err error, msg string) *SubscriptionImpl
NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.
func NewSubscription ¶
func NewSubscription(bufferSize int) *SubscriptionImpl
func (*SubscriptionImpl) Channel ¶
func (sub *SubscriptionImpl) Channel() <-chan interface{}
Channel returns the channel from which subscription data can be read
func (*SubscriptionImpl) Close ¶
func (sub *SubscriptionImpl) Close()
Close is called when a subscription ends gracefully, and closes the subscription channel
func (*SubscriptionImpl) Err ¶
func (sub *SubscriptionImpl) Err() error
Err returns the error that caused the subscription to fail
func (*SubscriptionImpl) Fail ¶
func (sub *SubscriptionImpl) Fail(err error)
Fail registers an error and closes the subscription channel
func (*SubscriptionImpl) ID ¶
func (sub *SubscriptionImpl) ID() string
ID returns the subscription ID Note: this is not a cryptographic hash