subscription

package
v0.43.1-rc.1.access-me... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2025 License: AGPL-3.0 Imports: 11 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// DefaultSendBufferSize is the default buffer size for the subscription's send channel.
	// The size is chosen to balance memory overhead from each subscription with performance when
	// streaming existing data.
	DefaultSendBufferSize = 10

	// DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time.
	DefaultMaxGlobalStreams = 1000

	// DefaultCacheSize defines the default max number of objects for the execution data cache.
	DefaultCacheSize = 100

	// DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout
	// expires, the connection is closed.
	DefaultSendTimeout = 30 * time.Second

	// DefaultResponseLimit is default max responses per second allowed on a stream. After exceeding
	// the limit, the stream is paused until more capacity is available.
	DefaultResponseLimit = float64(0)

	// DefaultHeartbeatInterval specifies the block interval at which heartbeat messages should be sent.
	DefaultHeartbeatInterval = 1
)

Variables

View Source
var ErrBlockNotReady = errors.New("block not ready")

ErrBlockNotReady represents an error indicating that a block is not yet available or ready.

View Source
var ErrEndOfData = errors.New("end of data")

ErrEndOfData represents an error indicating that no more data available for streaming.

Functions

func HandleResponse added in v0.38.0

func HandleResponse[T any](send chan<- interface{}, transform func(resp T) (interface{}, error)) func(resp T) error

HandleResponse processes a generic response of type and sends it to the provided channel.

Parameters: - send: The channel to which the processed response is sent. - transform: A function to transform the response into the expected interface{} type.

No errors are expected during normal operations.

func HandleSubscription

func HandleSubscription[T any](sub Subscription, handleResponse func(resp T) error) error

HandleSubscription is a generic handler for subscriptions to a specific type. It continuously listens to the subscription channel, handles the received responses, and sends the processed information to the client via the provided stream using handleResponse.

Parameters: - sub: The subscription. - handleResponse: The function responsible for handling the response of the subscribed type.

No errors are expected during normal operations.

Types

type GetDataByHeightFunc

type GetDataByHeightFunc func(ctx context.Context, height uint64) (interface{}, error)

GetDataByHeightFunc is a callback used by subscriptions to retrieve data for a given height. Expected errors: - storage.ErrNotFound - execution_data.BlobNotFoundError All other errors are considered exceptions

type HeightBasedSubscription

type HeightBasedSubscription struct {
	*SubscriptionImpl
	// contains filtered or unexported fields
}

HeightBasedSubscription is a subscription that retrieves data sequentially by block height

func NewHeightBasedSubscription

func NewHeightBasedSubscription(bufferSize int, firstHeight uint64, getData GetDataByHeightFunc) *HeightBasedSubscription

func (*HeightBasedSubscription) Next

func (s *HeightBasedSubscription) Next(ctx context.Context) (interface{}, error)

Next returns the value for the next height from the subscription

type Streamable

type Streamable interface {
	// ID returns the subscription ID
	// Note: this is not a cryptographic hash
	ID() string
	// Close is called when a subscription ends gracefully, and closes the subscription channel
	Close()
	// Fail registers an error and closes the subscription channel
	Fail(error)
	// Send sends a value to the subscription channel or returns an error
	// Expected errors:
	// - context.DeadlineExceeded if send timed out
	// - context.Canceled if the client disconnected
	Send(context.Context, interface{}, time.Duration) error
	// Next returns the value for the next height from the subscription
	Next(context.Context) (interface{}, error)
}

Streamable represents a subscription that can be streamed.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer represents a streaming subscription that delivers data to clients.

func NewStreamer

func NewStreamer(
	log zerolog.Logger,
	broadcaster *engine.Broadcaster,
	sendTimeout time.Duration,
	limit float64,
	sub Streamable,
) *Streamer

NewStreamer creates a new Streamer instance.

func (*Streamer) Stream

func (s *Streamer) Stream(ctx context.Context)

Stream is a blocking method that streams data to the subscription until either the context is cancelled or it encounters an error. This function follows a somewhat unintuitive contract: if the context is canceled, it is treated as an error and written to the subscription. However, you can rely on this behavior in the subscription to handle it as a graceful shutdown.

type StreamingData

type StreamingData struct {
	MaxStreams  int32
	StreamCount atomic.Int32
}

StreamingData represents common streaming data configuration for access and state_stream handlers.

func NewStreamingData

func NewStreamingData(maxStreams uint32) StreamingData

type Subscription

type Subscription interface {
	// ID returns the unique identifier for this subscription used for logging
	ID() string

	// Channel returns the channel from which subscription data can be read
	Channel() <-chan interface{}

	// Err returns the error that caused the subscription to fail
	Err() error
}

Subscription represents a streaming request, and handles the communication between the grpc handler and the backend implementation.

type SubscriptionHandler

type SubscriptionHandler struct {
	// contains filtered or unexported fields
}

SubscriptionHandler represents common streaming data configuration for creating streaming subscription.

func NewSubscriptionHandler

func NewSubscriptionHandler(
	log zerolog.Logger,
	broadcaster *engine.Broadcaster,
	sendTimeout time.Duration,
	responseLimit float64,
	sendBufferSize uint,
) *SubscriptionHandler

NewSubscriptionHandler creates a new SubscriptionHandler instance.

Parameters: - log: The logger to use for logging. - broadcaster: The engine broadcaster for publishing notifications. - sendTimeout: The duration after which a send operation will timeout. - responseLimit: The maximum allowed response time for a single stream. - sendBufferSize: The size of the response buffer for sending messages to the client.

Returns a new SubscriptionHandler instance.

func (*SubscriptionHandler) Subscribe

func (h *SubscriptionHandler) Subscribe(
	ctx context.Context,
	startHeight uint64,
	getData GetDataByHeightFunc,
) Subscription

Subscribe creates and starts a new subscription.

Parameters: - ctx: The context for the operation. - startHeight: The height to start subscription from. - getData: The function to retrieve data by height.

type SubscriptionImpl

type SubscriptionImpl struct {
	// contains filtered or unexported fields
}

func NewFailedSubscription

func NewFailedSubscription(err error, msg string) *SubscriptionImpl

NewFailedSubscription returns a new subscription that has already failed with the given error and message. This is useful to return an error that occurred during subscription setup.

func NewSubscription

func NewSubscription(bufferSize int) *SubscriptionImpl

func (*SubscriptionImpl) Channel

func (sub *SubscriptionImpl) Channel() <-chan interface{}

Channel returns the channel from which subscription data can be read

func (*SubscriptionImpl) Close

func (sub *SubscriptionImpl) Close()

Close is called when a subscription ends gracefully, and closes the subscription channel

func (*SubscriptionImpl) Err

func (sub *SubscriptionImpl) Err() error

Err returns the error that caused the subscription to fail

func (*SubscriptionImpl) Fail

func (sub *SubscriptionImpl) Fail(err error)

Fail registers an error and closes the subscription channel

func (*SubscriptionImpl) ID

func (sub *SubscriptionImpl) ID() string

ID returns the subscription ID Note: this is not a cryptographic hash

func (*SubscriptionImpl) Send

func (sub *SubscriptionImpl) Send(ctx context.Context, v interface{}, timeout time.Duration) error

Send sends a value to the subscription channel or returns an error Expected errors: - context.DeadlineExceeded if send timed out - context.Canceled if the client disconnected

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL