Documentation
¶
Index ¶
- Constants
- func ParseHeaderFlag(s string) (name, value string, err error)
- func ParseLabelFlag(s string) (key, value string, err error)
- type ClientVersionV1
- type Config
- type EngineClientVersionHandler
- type EngineGetBlobsHandler
- type EngineNewPayloadHandler
- type EventHandler
- type ExecutionMetadata
- type ExecutionMetadataProvider
- type KeepAliveConfig
- type MetadataUpdateFunc
- type OutputConfig
- type PendingGetBlobsCall
- type PendingNewPayloadCall
- type Publisher
- type RPCError
- type RequestEvent
- type ResponseEvent
- type Router
- type Service
Constants ¶
const ( OutputTypeStdout = "stdout" OutputTypeHTTP = "http" OutputTypeXatu = "xatu" OutputTypeKafka = "kafka" )
Output type constants.
const ( // DefaultPublishTimeout is the context timeout for publishing events. DefaultPublishTimeout = 5 * time.Second // DefaultPendingCapacity is the initial capacity for pending call maps. DefaultPendingCapacity = 100 )
Handler constants.
Variables ¶
This section is empty.
Functions ¶
func ParseHeaderFlag ¶
ParseHeaderFlag parses a header flag value in "name=value" format.
func ParseLabelFlag ¶
ParseLabelFlag parses a label flag value in "key=value" format.
Types ¶
type ClientVersionV1 ¶
type ClientVersionV1 struct {
Code string `json:"code"` // 2-letter client code (e.g., "GE" for Geth)
Name string `json:"name"` // Human-readable name (e.g., "Geth")
Version string `json:"version"` // Version string (e.g., "v1.14.0")
Commit string `json:"commit"` // 4-byte commit hash
}
ClientVersionV1 represents the response from engine_getClientVersionV1. See: https://github.com/ethereum/execution-apis/blob/main/src/engine/identification.md
func (ClientVersionV1) String ¶ added in v0.0.15
func (c ClientVersionV1) String() string
String returns a web3_clientVersion-style string (Name/Version).
type Config ¶
type Config struct {
// Enabled controls whether Xatu event publishing is active.
Enabled bool
// Name identifies this rpc-snooper instance in events.
Name string
// NetworkName is the name of the Ethereum network (e.g., "mainnet", "sepolia").
NetworkName string
// NetworkID is the network ID of the Ethereum network.
NetworkID uint64
// Labels are custom key-value pairs added to event metadata.
Labels map[string]string
// Outputs defines where events are published.
Outputs []OutputConfig
// TLS enables TLS for xatu:// outputs.
TLS bool
// Headers are custom headers for HTTP/Xatu outputs.
Headers map[string]string
// MaxQueueSize is the maximum number of events to buffer before dropping.
MaxQueueSize int
// MaxExportBatchSize is the maximum number of events per batch export.
MaxExportBatchSize int
// Workers is the number of concurrent export workers.
Workers int
// BatchTimeout is how long to wait before exporting a partial batch.
BatchTimeout time.Duration
// ExportTimeout is the timeout for each export operation.
ExportTimeout time.Duration
// KeepAlive configures gRPC keepalive settings.
KeepAlive KeepAliveConfig
}
Config holds the Xatu integration configuration.
type EngineClientVersionHandler ¶
type EngineClientVersionHandler struct {
// contains filtered or unexported fields
}
EngineClientVersionHandler handles engine_getClientVersionV1 events. This handler does not publish events to xatu - it only observes responses and updates the cached execution metadata for use in other events.
func NewEngineClientVersionHandler ¶
func NewEngineClientVersionHandler(log logrus.FieldLogger, updateFn MetadataUpdateFunc) *EngineClientVersionHandler
NewEngineClientVersionHandler creates a new engine_getClientVersionV1 handler.
func (*EngineClientVersionHandler) HandleRequest ¶
func (h *EngineClientVersionHandler) HandleRequest(_ *RequestEvent) bool
HandleRequest processes the request. We don't need to store anything from the request.
func (*EngineClientVersionHandler) HandleResponse ¶
func (h *EngineClientVersionHandler) HandleResponse(event *ResponseEvent)
HandleResponse processes the response and updates the cached execution metadata.
func (*EngineClientVersionHandler) MethodMatcher ¶
func (h *EngineClientVersionHandler) MethodMatcher() func(method string) bool
MethodMatcher returns a function that checks if a method matches engine_getClientVersionV*.
func (*EngineClientVersionHandler) Name ¶
func (h *EngineClientVersionHandler) Name() string
Name returns the handler name.
type EngineGetBlobsHandler ¶
type EngineGetBlobsHandler struct {
// contains filtered or unexported fields
}
EngineGetBlobsHandler handles engine_getBlobs* events.
func NewEngineGetBlobsHandler ¶
func NewEngineGetBlobsHandler(publisher Publisher, log logrus.FieldLogger) *EngineGetBlobsHandler
NewEngineGetBlobsHandler creates a new engine_getBlobs handler.
func (*EngineGetBlobsHandler) HandleRequest ¶
func (h *EngineGetBlobsHandler) HandleRequest(event *RequestEvent) bool
HandleRequest processes the request and stores pending data.
func (*EngineGetBlobsHandler) HandleResponse ¶
func (h *EngineGetBlobsHandler) HandleResponse(event *ResponseEvent)
HandleResponse processes the response, correlates with request, and publishes the event.
func (*EngineGetBlobsHandler) MethodMatcher ¶
func (h *EngineGetBlobsHandler) MethodMatcher() func(method string) bool
MethodMatcher returns a function that checks if a method matches engine_getBlobs*.
func (*EngineGetBlobsHandler) Name ¶
func (h *EngineGetBlobsHandler) Name() string
Name returns the handler name.
type EngineNewPayloadHandler ¶
type EngineNewPayloadHandler struct {
// contains filtered or unexported fields
}
EngineNewPayloadHandler handles engine_newPayload* events.
func NewEngineNewPayloadHandler ¶
func NewEngineNewPayloadHandler(publisher Publisher, log logrus.FieldLogger) *EngineNewPayloadHandler
NewEngineNewPayloadHandler creates a new engine_newPayload handler.
func (*EngineNewPayloadHandler) HandleRequest ¶
func (h *EngineNewPayloadHandler) HandleRequest(event *RequestEvent) bool
HandleRequest processes the request and stores pending data.
func (*EngineNewPayloadHandler) HandleResponse ¶
func (h *EngineNewPayloadHandler) HandleResponse(event *ResponseEvent)
HandleResponse processes the response, correlates with request, and publishes the event.
func (*EngineNewPayloadHandler) MethodMatcher ¶
func (h *EngineNewPayloadHandler) MethodMatcher() func(method string) bool
MethodMatcher returns a function that checks if a method matches engine_newPayload*.
func (*EngineNewPayloadHandler) Name ¶
func (h *EngineNewPayloadHandler) Name() string
Name returns the handler name.
type EventHandler ¶
type EventHandler interface {
// Name returns the handler name for logging and metrics.
Name() string
// MethodMatcher returns a function that checks if a JSON-RPC method matches this handler.
MethodMatcher() func(method string) bool
// HandleRequest processes the request and stores pending data.
// Returns true if this handler should also process the corresponding response.
HandleRequest(ctx *RequestEvent) bool
// HandleResponse processes the response, correlates with the request data, and publishes the event.
HandleResponse(ctx *ResponseEvent)
}
EventHandler defines the interface for handling specific JSON-RPC method events. Each handler is responsible for processing requests and responses for a specific set of methods and publishing corresponding Xatu events.
type ExecutionMetadata ¶
type ExecutionMetadata struct {
Implementation string
Version string
VersionMajor string
VersionMinor string
VersionPatch string
}
ExecutionMetadata holds cached execution client information.
func (*ExecutionMetadata) ToProto ¶
func (m *ExecutionMetadata) ToProto() *xatu.ClientMeta_Ethereum_Execution
ToProto converts the metadata to the xatu proto format.
type ExecutionMetadataProvider ¶
type ExecutionMetadataProvider interface {
Get() *ExecutionMetadata
}
ExecutionMetadataProvider provides execution client metadata.
type KeepAliveConfig ¶
type KeepAliveConfig struct {
// Enabled controls whether keepalive is active.
Enabled bool
// Time is the duration after which a keepalive ping is sent.
Time time.Duration
// Timeout is the duration to wait for a keepalive response.
Timeout time.Duration
}
KeepAliveConfig holds gRPC keepalive settings.
type MetadataUpdateFunc ¶
type MetadataUpdateFunc func(versions []ClientVersionV1)
MetadataUpdateFunc is a callback function for updating execution metadata.
type OutputConfig ¶
type OutputConfig struct {
// Type is the output type: "stdout", "http", "xatu", "kafka".
Type string
// Address is the output address (URL, host:port, or brokers/topic).
Address string
}
OutputConfig defines a single output sink configuration.
func ParseOutputFlag ¶
func ParseOutputFlag(s string) (OutputConfig, error)
ParseOutputFlag parses an output flag value in "type:address" or "type" format. Examples:
- "stdout" -> {Type: "stdout", Address: ""}
- "http:https://example.com" -> {Type: "http", Address: "https://example.com"}
- "xatu:xatu.example.com:8080" -> {Type: "xatu", Address: "xatu.example.com:8080"}
- "kafka:broker1:9092,broker2:9092/topic" -> {Type: "kafka", Address: "broker1:9092,broker2:9092/topic"}
func (*OutputConfig) Validate ¶
func (o *OutputConfig) Validate() error
Validate checks if the output configuration is valid.
type PendingGetBlobsCall ¶
type PendingGetBlobsCall struct {
CallID uint64
RequestTimestamp time.Time
VersionedHashes []string
MethodVersion string
}
PendingGetBlobsCall stores request data awaiting response correlation.
type PendingNewPayloadCall ¶
type PendingNewPayloadCall struct {
CallID uint64
RequestTimestamp time.Time
MethodVersion string
// Execution payload fields
BlockNumber uint64
BlockHash string
ParentHash string
GasUsed uint64
GasLimit uint64
TxCount uint32
BlobCount uint32
}
PendingNewPayloadCall stores request data awaiting response correlation.
type Publisher ¶
type Publisher interface {
// Start initializes all sinks.
Start(ctx context.Context) error
// Stop gracefully shuts down all sinks.
Stop(ctx context.Context) error
// Publish sends a decorated event to all sinks.
Publish(ctx context.Context, event *xatu.DecoratedEvent) error
// ClientMeta returns the base client metadata for events.
ClientMeta() *xatu.ClientMeta
// SetMetadataProvider sets the execution metadata provider.
SetMetadataProvider(provider ExecutionMetadataProvider)
}
Publisher manages event sinks and publishes decorated events.
func NewNoopPublisher ¶
func NewNoopPublisher() Publisher
NewNoopPublisher creates a Publisher that does nothing.
func NewPublisher ¶
func NewPublisher(config *Config, log logrus.FieldLogger) Publisher
NewPublisher creates a new Publisher instance.
type RequestEvent ¶
type RequestEvent struct {
// CallID is the unique identifier for this request/response pair.
CallID uint64
// Timestamp is when the request was received.
Timestamp time.Time
// Method is the JSON-RPC method name (e.g., "engine_getBlobsV1").
Method string
// Params are the JSON-RPC parameters.
Params []any
// BodyBytes contains the raw request body bytes (useful for SSZ-encoded data).
BodyBytes []byte
}
RequestEvent contains data from an intercepted JSON-RPC request.
type ResponseEvent ¶
type ResponseEvent struct {
// CallID is the unique identifier for this request/response pair.
CallID uint64
// Timestamp is when the response was received.
Timestamp time.Time
// Duration is the time taken for the request to complete.
Duration time.Duration
// Result is the JSON-RPC result field (nil if there was an error).
Result any
// Error contains the JSON-RPC error if present.
Error *RPCError
// BodyBytes contains the raw response body bytes.
BodyBytes []byte
}
ResponseEvent contains data from an intercepted JSON-RPC response.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router routes JSON-RPC methods to their corresponding event handlers.
func NewRouter ¶
func NewRouter(log logrus.FieldLogger) *Router
NewRouter creates a new Router instance.
func (*Router) HandlerCount ¶
HandlerCount returns the number of registered handlers.
func (*Router) Register ¶
func (r *Router) Register(handler EventHandler)
Register adds a handler to the router.
func (*Router) RouteRequest ¶
func (r *Router) RouteRequest(event *RequestEvent) (EventHandler, bool)
RouteRequest finds a matching handler for the request and calls HandleRequest. Returns the matched handler (or nil) and whether a handler was matched.
type Service ¶
type Service interface {
// Start initializes the service and all its components.
Start(ctx context.Context) error
// Stop gracefully shuts down the service.
Stop(ctx context.Context) error
// Router returns the event router for module integration.
Router() *Router
// Publisher returns the event publisher.
Publisher() Publisher
// IsEnabled returns whether the service is enabled.
IsEnabled() bool
// SetMetadataProvider sets the execution metadata provider.
// This should be called before Start() to wire up metadata for events.
SetMetadataProvider(provider ExecutionMetadataProvider)
// RegisterMetadataUpdateCallback registers a callback for engine_getClientVersion responses.
// This allows passive metadata updates when the CL queries the EL.
RegisterMetadataUpdateCallback(callback MetadataUpdateFunc)
}
Service is the main Xatu integration service that coordinates event handlers and publishing.
func NewService ¶
func NewService(config *Config, log logrus.FieldLogger) (Service, error)
NewService creates a new Xatu Service instance.