xatu

package
v0.0.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OutputTypeStdout = "stdout"
	OutputTypeHTTP   = "http"
	OutputTypeXatu   = "xatu"
	OutputTypeKafka  = "kafka"
)

Output type constants.

View Source
const (
	// DefaultPublishTimeout is the context timeout for publishing events.
	DefaultPublishTimeout = 5 * time.Second

	// DefaultPendingCapacity is the initial capacity for pending call maps.
	DefaultPendingCapacity = 100
)

Handler constants.

Variables

This section is empty.

Functions

func ParseHeaderFlag

func ParseHeaderFlag(s string) (name, value string, err error)

ParseHeaderFlag parses a header flag value in "name=value" format.

func ParseLabelFlag

func ParseLabelFlag(s string) (key, value string, err error)

ParseLabelFlag parses a label flag value in "key=value" format.

Types

type ClientVersionV1

type ClientVersionV1 struct {
	Code    string `json:"code"`    // 2-letter client code (e.g., "GE" for Geth)
	Name    string `json:"name"`    // Human-readable name (e.g., "Geth")
	Version string `json:"version"` // Version string (e.g., "v1.14.0")
	Commit  string `json:"commit"`  // 4-byte commit hash
}

ClientVersionV1 represents the response from engine_getClientVersionV1. See: https://github.com/ethereum/execution-apis/blob/main/src/engine/identification.md

func (ClientVersionV1) String added in v0.0.15

func (c ClientVersionV1) String() string

String returns a web3_clientVersion-style string (Name/Version).

type Config

type Config struct {
	// Enabled controls whether Xatu event publishing is active.
	Enabled bool

	// Name identifies this rpc-snooper instance in events.
	Name string

	// NetworkName is the name of the Ethereum network (e.g., "mainnet", "sepolia").
	NetworkName string

	// NetworkID is the network ID of the Ethereum network.
	NetworkID uint64

	// Labels are custom key-value pairs added to event metadata.
	Labels map[string]string

	// Outputs defines where events are published.
	Outputs []OutputConfig

	// TLS enables TLS for xatu:// outputs.
	TLS bool

	// Headers are custom headers for HTTP/Xatu outputs.
	Headers map[string]string

	// MaxQueueSize is the maximum number of events to buffer before dropping.
	MaxQueueSize int

	// MaxExportBatchSize is the maximum number of events per batch export.
	MaxExportBatchSize int

	// Workers is the number of concurrent export workers.
	Workers int

	// BatchTimeout is how long to wait before exporting a partial batch.
	BatchTimeout time.Duration

	// ExportTimeout is the timeout for each export operation.
	ExportTimeout time.Duration

	// KeepAlive configures gRPC keepalive settings.
	KeepAlive KeepAliveConfig
}

Config holds the Xatu integration configuration.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is valid.

type EngineClientVersionHandler

type EngineClientVersionHandler struct {
	// contains filtered or unexported fields
}

EngineClientVersionHandler handles engine_getClientVersionV1 events. This handler does not publish events to xatu - it only observes responses and updates the cached execution metadata for use in other events.

func NewEngineClientVersionHandler

func NewEngineClientVersionHandler(log logrus.FieldLogger, updateFn MetadataUpdateFunc) *EngineClientVersionHandler

NewEngineClientVersionHandler creates a new engine_getClientVersionV1 handler.

func (*EngineClientVersionHandler) HandleRequest

func (h *EngineClientVersionHandler) HandleRequest(_ *RequestEvent) bool

HandleRequest processes the request. We don't need to store anything from the request.

func (*EngineClientVersionHandler) HandleResponse

func (h *EngineClientVersionHandler) HandleResponse(event *ResponseEvent)

HandleResponse processes the response and updates the cached execution metadata.

func (*EngineClientVersionHandler) MethodMatcher

func (h *EngineClientVersionHandler) MethodMatcher() func(method string) bool

MethodMatcher returns a function that checks if a method matches engine_getClientVersionV*.

func (*EngineClientVersionHandler) Name

Name returns the handler name.

type EngineGetBlobsHandler

type EngineGetBlobsHandler struct {
	// contains filtered or unexported fields
}

EngineGetBlobsHandler handles engine_getBlobs* events.

func NewEngineGetBlobsHandler

func NewEngineGetBlobsHandler(publisher Publisher, log logrus.FieldLogger) *EngineGetBlobsHandler

NewEngineGetBlobsHandler creates a new engine_getBlobs handler.

func (*EngineGetBlobsHandler) HandleRequest

func (h *EngineGetBlobsHandler) HandleRequest(event *RequestEvent) bool

HandleRequest processes the request and stores pending data.

func (*EngineGetBlobsHandler) HandleResponse

func (h *EngineGetBlobsHandler) HandleResponse(event *ResponseEvent)

HandleResponse processes the response, correlates with request, and publishes the event.

func (*EngineGetBlobsHandler) MethodMatcher

func (h *EngineGetBlobsHandler) MethodMatcher() func(method string) bool

MethodMatcher returns a function that checks if a method matches engine_getBlobs*.

func (*EngineGetBlobsHandler) Name

func (h *EngineGetBlobsHandler) Name() string

Name returns the handler name.

type EngineNewPayloadHandler

type EngineNewPayloadHandler struct {
	// contains filtered or unexported fields
}

EngineNewPayloadHandler handles engine_newPayload* events.

func NewEngineNewPayloadHandler

func NewEngineNewPayloadHandler(publisher Publisher, log logrus.FieldLogger) *EngineNewPayloadHandler

NewEngineNewPayloadHandler creates a new engine_newPayload handler.

func (*EngineNewPayloadHandler) HandleRequest

func (h *EngineNewPayloadHandler) HandleRequest(event *RequestEvent) bool

HandleRequest processes the request and stores pending data.

func (*EngineNewPayloadHandler) HandleResponse

func (h *EngineNewPayloadHandler) HandleResponse(event *ResponseEvent)

HandleResponse processes the response, correlates with request, and publishes the event.

func (*EngineNewPayloadHandler) MethodMatcher

func (h *EngineNewPayloadHandler) MethodMatcher() func(method string) bool

MethodMatcher returns a function that checks if a method matches engine_newPayload*.

func (*EngineNewPayloadHandler) Name

func (h *EngineNewPayloadHandler) Name() string

Name returns the handler name.

type EventHandler

type EventHandler interface {
	// Name returns the handler name for logging and metrics.
	Name() string

	// MethodMatcher returns a function that checks if a JSON-RPC method matches this handler.
	MethodMatcher() func(method string) bool

	// HandleRequest processes the request and stores pending data.
	// Returns true if this handler should also process the corresponding response.
	HandleRequest(ctx *RequestEvent) bool

	// HandleResponse processes the response, correlates with the request data, and publishes the event.
	HandleResponse(ctx *ResponseEvent)
}

EventHandler defines the interface for handling specific JSON-RPC method events. Each handler is responsible for processing requests and responses for a specific set of methods and publishing corresponding Xatu events.

type ExecutionMetadata

type ExecutionMetadata struct {
	Implementation string
	Version        string
	VersionMajor   string
	VersionMinor   string
	VersionPatch   string
}

ExecutionMetadata holds cached execution client information.

func (*ExecutionMetadata) ToProto

ToProto converts the metadata to the xatu proto format.

type ExecutionMetadataProvider

type ExecutionMetadataProvider interface {
	Get() *ExecutionMetadata
}

ExecutionMetadataProvider provides execution client metadata.

type KeepAliveConfig

type KeepAliveConfig struct {
	// Enabled controls whether keepalive is active.
	Enabled bool

	// Time is the duration after which a keepalive ping is sent.
	Time time.Duration

	// Timeout is the duration to wait for a keepalive response.
	Timeout time.Duration
}

KeepAliveConfig holds gRPC keepalive settings.

type MetadataUpdateFunc

type MetadataUpdateFunc func(versions []ClientVersionV1)

MetadataUpdateFunc is a callback function for updating execution metadata.

type OutputConfig

type OutputConfig struct {
	// Type is the output type: "stdout", "http", "xatu", "kafka".
	Type string

	// Address is the output address (URL, host:port, or brokers/topic).
	Address string
}

OutputConfig defines a single output sink configuration.

func ParseOutputFlag

func ParseOutputFlag(s string) (OutputConfig, error)

ParseOutputFlag parses an output flag value in "type:address" or "type" format. Examples:

  • "stdout" -> {Type: "stdout", Address: ""}
  • "http:https://example.com" -> {Type: "http", Address: "https://example.com"}
  • "xatu:xatu.example.com:8080" -> {Type: "xatu", Address: "xatu.example.com:8080"}
  • "kafka:broker1:9092,broker2:9092/topic" -> {Type: "kafka", Address: "broker1:9092,broker2:9092/topic"}

func (*OutputConfig) Validate

func (o *OutputConfig) Validate() error

Validate checks if the output configuration is valid.

type PendingGetBlobsCall

type PendingGetBlobsCall struct {
	CallID           uint64
	RequestTimestamp time.Time
	VersionedHashes  []string
	MethodVersion    string
}

PendingGetBlobsCall stores request data awaiting response correlation.

type PendingNewPayloadCall

type PendingNewPayloadCall struct {
	CallID           uint64
	RequestTimestamp time.Time
	MethodVersion    string

	// Execution payload fields
	BlockNumber uint64
	BlockHash   string
	ParentHash  string
	GasUsed     uint64
	GasLimit    uint64
	TxCount     uint32
	BlobCount   uint32
}

PendingNewPayloadCall stores request data awaiting response correlation.

type Publisher

type Publisher interface {
	// Start initializes all sinks.
	Start(ctx context.Context) error

	// Stop gracefully shuts down all sinks.
	Stop(ctx context.Context) error

	// Publish sends a decorated event to all sinks.
	Publish(ctx context.Context, event *xatu.DecoratedEvent) error

	// ClientMeta returns the base client metadata for events.
	ClientMeta() *xatu.ClientMeta

	// SetMetadataProvider sets the execution metadata provider.
	SetMetadataProvider(provider ExecutionMetadataProvider)
}

Publisher manages event sinks and publishes decorated events.

func NewNoopPublisher

func NewNoopPublisher() Publisher

NewNoopPublisher creates a Publisher that does nothing.

func NewPublisher

func NewPublisher(config *Config, log logrus.FieldLogger) Publisher

NewPublisher creates a new Publisher instance.

type RPCError

type RPCError struct {
	Code    int
	Message string
}

RPCError represents a JSON-RPC error response.

type RequestEvent

type RequestEvent struct {
	// CallID is the unique identifier for this request/response pair.
	CallID uint64

	// Timestamp is when the request was received.
	Timestamp time.Time

	// Method is the JSON-RPC method name (e.g., "engine_getBlobsV1").
	Method string

	// Params are the JSON-RPC parameters.
	Params []any

	// BodyBytes contains the raw request body bytes (useful for SSZ-encoded data).
	BodyBytes []byte
}

RequestEvent contains data from an intercepted JSON-RPC request.

type ResponseEvent

type ResponseEvent struct {
	// CallID is the unique identifier for this request/response pair.
	CallID uint64

	// Timestamp is when the response was received.
	Timestamp time.Time

	// Duration is the time taken for the request to complete.
	Duration time.Duration

	// Result is the JSON-RPC result field (nil if there was an error).
	Result any

	// Error contains the JSON-RPC error if present.
	Error *RPCError

	// BodyBytes contains the raw response body bytes.
	BodyBytes []byte
}

ResponseEvent contains data from an intercepted JSON-RPC response.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router routes JSON-RPC methods to their corresponding event handlers.

func NewRouter

func NewRouter(log logrus.FieldLogger) *Router

NewRouter creates a new Router instance.

func (*Router) HandlerCount

func (r *Router) HandlerCount() int

HandlerCount returns the number of registered handlers.

func (*Router) Register

func (r *Router) Register(handler EventHandler)

Register adds a handler to the router.

func (*Router) RouteRequest

func (r *Router) RouteRequest(event *RequestEvent) (EventHandler, bool)

RouteRequest finds a matching handler for the request and calls HandleRequest. Returns the matched handler (or nil) and whether a handler was matched.

type Service

type Service interface {
	// Start initializes the service and all its components.
	Start(ctx context.Context) error

	// Stop gracefully shuts down the service.
	Stop(ctx context.Context) error

	// Router returns the event router for module integration.
	Router() *Router

	// Publisher returns the event publisher.
	Publisher() Publisher

	// IsEnabled returns whether the service is enabled.
	IsEnabled() bool

	// SetMetadataProvider sets the execution metadata provider.
	// This should be called before Start() to wire up metadata for events.
	SetMetadataProvider(provider ExecutionMetadataProvider)

	// RegisterMetadataUpdateCallback registers a callback for engine_getClientVersion responses.
	// This allows passive metadata updates when the CL queries the EL.
	RegisterMetadataUpdateCallback(callback MetadataUpdateFunc)
}

Service is the main Xatu integration service that coordinates event handlers and publishing.

func NewService

func NewService(config *Config, log logrus.FieldLogger) (Service, error)

NewService creates a new Xatu Service instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL