proxyd

package module
v0.0.0-...-5cda5f8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2023 License: MIT Imports: 37 Imported by: 0

README

rpc-proxy

This tool implements proxyd, an RPC request router and proxy. It does the following things:

  1. Whitelists RPC methods.
  2. Routes RPC methods to groups of backend services.
  3. Automatically retries failed backend requests.
  4. Provides metrics the measure request latency, error rates, and the like.

Usage

Run make proxyd to build the binary. No additional dependencies are necessary.

To configure proxyd for use, you'll need to create a configuration file to define your proxy backends and routing rules. Check out example.config.toml for how to do this alongside a full list of all options with commentary.

Once you have a config file, start the daemon via proxyd <path-to-config>.toml.

Metrics

See metrics.go for a list of all available metrics.

The metrics port is configurable via the metrics.port and metrics.host keys in the config.

Adding Backend SSL Certificates in Docker

The Docker image runs on Alpine Linux. If you get SSL errors when connecting to a backend within Docker, you may need to add additional certificates to Alpine's certificate store. To do this, bind mount the certificate bundle into a file in /usr/local/share/ca-certificates. The entrypoint.sh script will then update the store with whatever is in the ca-certificates directory prior to starting proxyd.

Documentation

Index

Constants

View Source
const (
	JSONRPCVersion       = "2.0"
	JSONRPCErrorInternal = -32000
)
View Source
const (
	MetricsNamespace = "proxyd"

	RPCRequestSourceHTTP = "http"
	RPCRequestSourceWS   = "ws"

	BackendProxyd = "proxyd"
	SourceClient  = "client"
	SourceBackend = "backend"
	MethodUnknown = "unknown"
)
View Source
const (
	ContextKeyAuth          = "authorization"
	ContextKeyReqID         = "req_id"
	ContextKeyXForwardedFor = "x_forwarded_for"
	MaxBatchRPCCalls        = 100
)
View Source
const MaxConcurrentWSConnsScript = `` /* 351-byte string literal not displayed */
View Source
const MaxRPSScript = `` /* 130-byte string literal not displayed */

Variables

View Source
var (
	ErrParseErr = &RPCErr{
		Code:          -32700,
		Message:       "parse error",
		HTTPErrorCode: 400,
	}
	ErrInternal = &RPCErr{
		Code:          JSONRPCErrorInternal,
		Message:       "internal error",
		HTTPErrorCode: 500,
	}
	ErrMethodNotWhitelisted = &RPCErr{
		Code:          JSONRPCErrorInternal - 1,
		Message:       "rpc method is not whitelisted",
		HTTPErrorCode: 403,
	}
	ErrBackendOffline = &RPCErr{
		Code:          JSONRPCErrorInternal - 10,
		Message:       "backend offline",
		HTTPErrorCode: 503,
	}
	ErrNoBackends = &RPCErr{
		Code:          JSONRPCErrorInternal - 11,
		Message:       "no backends available for method",
		HTTPErrorCode: 503,
	}
	ErrBackendOverCapacity = &RPCErr{
		Code:          JSONRPCErrorInternal - 12,
		Message:       "backend is over capacity",
		HTTPErrorCode: 429,
	}
	ErrBackendBadResponse = &RPCErr{
		Code:          JSONRPCErrorInternal - 13,
		Message:       "backend returned an invalid response",
		HTTPErrorCode: 500,
	}
	ErrTooManyBatchRequests = &RPCErr{
		Code:    JSONRPCErrorInternal - 14,
		Message: "too many RPC calls in batch request",
	}
	ErrGatewayTimeout = &RPCErr{
		Code:          JSONRPCErrorInternal - 15,
		Message:       "gateway timeout",
		HTTPErrorCode: 504,
	}
	ErrOverRateLimit = &RPCErr{
		Code:          JSONRPCErrorInternal - 16,
		Message:       "rate limited",
		HTTPErrorCode: 429,
	}

	ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response")
)
View Source
var MillisecondDurationBuckets = []float64{1, 10, 50, 100, 500, 1000, 5000, 10000, 100000}
View Source
var PayloadSizeBuckets = []float64{10, 50, 100, 500, 1000, 5000, 10000, 100000, 1000000}

Functions

func CreateTLSClient

func CreateTLSClient(ca string) (*tls.Config, error)

func GetAuthCtx

func GetAuthCtx(ctx context.Context) string

func GetReqID

func GetReqID(ctx context.Context) string

func GetXForwardedFor

func GetXForwardedFor(ctx context.Context) string

func IsBatch

func IsBatch(raw []byte) bool

func IsValidID

func IsValidID(id json.RawMessage) bool

func MaybeRecordErrorsInRPCRes

func MaybeRecordErrorsInRPCRes(ctx context.Context, backendName string, reqs []*RPCReq, resBatch []*RPCRes)

func MaybeRecordSpecialRPCError

func MaybeRecordSpecialRPCError(ctx context.Context, backendName, method string, rpcErr *RPCErr)

func ParseBatchRPCReq

func ParseBatchRPCReq(body []byte) ([]json.RawMessage, error)

func ParseKeyPair

func ParseKeyPair(crt, key string) (tls.Certificate, error)

func ReadFromEnvOrConfig

func ReadFromEnvOrConfig(value string) (string, error)

func RecordBatchRPCError

func RecordBatchRPCError(ctx context.Context, backendName string, reqs []*RPCReq, err error)

func RecordBatchRPCForward

func RecordBatchRPCForward(ctx context.Context, backendName string, reqs []*RPCReq, source string)

func RecordCacheHit

func RecordCacheHit(method string)

func RecordCacheMiss

func RecordCacheMiss(method string)

func RecordRPCError

func RecordRPCError(ctx context.Context, backendName, method string, err error)

func RecordRPCForward

func RecordRPCForward(ctx context.Context, backendName, method, source string)

func RecordRedisError

func RecordRedisError(source string)

func RecordRequestPayloadSize

func RecordRequestPayloadSize(ctx context.Context, payloadSize int)

func RecordResponsePayloadSize

func RecordResponsePayloadSize(ctx context.Context, payloadSize int)

func RecordUnserviceableRequest

func RecordUnserviceableRequest(ctx context.Context, source string)

func RecordWSMessage

func RecordWSMessage(ctx context.Context, backendName, source string)

func Start

func Start(config *Config) (func(), error)

func ValidateRPCReq

func ValidateRPCReq(req *RPCReq) error

Types

type Backend

type Backend struct {
	Name string
	// contains filtered or unexported fields
}

func NewBackend

func NewBackend(
	name string,
	rpcURL string,
	wsURL string,
	rateLimiter BackendRateLimiter,
	rpcSemaphore *semaphore.Weighted,
	opts ...BackendOpt,
) *Backend

func (*Backend) Forward

func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error)

func (*Backend) IsRateLimited

func (b *Backend) IsRateLimited() bool

func (*Backend) IsWSSaturated

func (b *Backend) IsWSSaturated() bool

func (*Backend) Online

func (b *Backend) Online() bool

func (*Backend) ProxyWS

func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error)

type BackendConfig

type BackendConfig struct {
	Username         string `toml:"username"`
	Password         string `toml:"password"`
	RPCURL           string `toml:"rpc_url"`
	WSURL            string `toml:"ws_url"`
	MaxRPS           int    `toml:"max_rps"`
	MaxWSConns       int    `toml:"max_ws_conns"`
	CAFile           string `toml:"ca_file"`
	ClientCertFile   string `toml:"client_cert_file"`
	ClientKeyFile    string `toml:"client_key_file"`
	StripTrailingXFF bool   `toml:"strip_trailing_xff"`
}

type BackendGroup

type BackendGroup struct {
	Name     string
	Backends []*Backend
}

func (*BackendGroup) Forward

func (b *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error)

func (*BackendGroup) ProxyWS

func (b *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error)

type BackendGroupConfig

type BackendGroupConfig struct {
	Backends []string `toml:"backends"`
}

type BackendGroupsConfig

type BackendGroupsConfig map[string]*BackendGroupConfig

type BackendOpt

type BackendOpt func(b *Backend)

func WithBasicAuth

func WithBasicAuth(username, password string) BackendOpt

func WithMaxRPS

func WithMaxRPS(maxRPS int) BackendOpt

func WithMaxResponseSize

func WithMaxResponseSize(size int64) BackendOpt

func WithMaxRetries

func WithMaxRetries(retries int) BackendOpt

func WithMaxWSConns

func WithMaxWSConns(maxConns int) BackendOpt

func WithOutOfServiceDuration

func WithOutOfServiceDuration(interval time.Duration) BackendOpt

func WithProxydIP

func WithProxydIP(ip string) BackendOpt

func WithStrippedTrailingXFF

func WithStrippedTrailingXFF() BackendOpt

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) BackendOpt

func WithTimeout

func WithTimeout(timeout time.Duration) BackendOpt

type BackendOptions

type BackendOptions struct {
	ResponseTimeoutSeconds int   `toml:"response_timeout_seconds"`
	MaxResponseSizeBytes   int64 `toml:"max_response_size_bytes"`
	MaxRetries             int   `toml:"max_retries"`
	OutOfServiceSeconds    int   `toml:"out_of_service_seconds"`
}

type BackendRateLimiter

type BackendRateLimiter interface {
	IsBackendOnline(name string) (bool, error)
	SetBackendOffline(name string, duration time.Duration) error
	IncBackendRPS(name string) (int, error)
	IncBackendWSConns(name string, max int) (bool, error)
	DecBackendWSConns(name string) error
	FlushBackendWSConns(names []string) error
}

func NewRedisRateLimiter

func NewRedisRateLimiter(url string) (BackendRateLimiter, error)

type BackendsConfig

type BackendsConfig map[string]*BackendConfig

type Cache

type Cache interface {
	Get(ctx context.Context, key string) (string, error)
	Put(ctx context.Context, key string, value string) error
}

type CacheConfig

type CacheConfig struct {
	Enabled               bool   `toml:"enabled"`
	BlockSyncRPCURL       string `toml:"block_sync_rpc_url"`
	NumBlockConfirmations int    `toml:"num_block_confirmations"`
}

type Config

type Config struct {
	WSBackendGroup    string              `toml:"ws_backend_group"`
	Server            ServerConfig        `toml:"server"`
	Cache             CacheConfig         `toml:"cache"`
	Redis             RedisConfig         `toml:"redis"`
	Metrics           MetricsConfig       `toml:"metrics"`
	RateLimit         RateLimitConfig     `toml:"rate_limit"`
	BackendOptions    BackendOptions      `toml:"backend"`
	Backends          BackendsConfig      `toml:"backends"`
	Authentication    map[string]string   `toml:"authentication"`
	BackendGroups     BackendGroupsConfig `toml:"backend_groups"`
	RPCMethodMappings map[string]string   `toml:"rpc_method_mappings"`
	WSMethodWhitelist []string            `toml:"ws_method_whitelist"`
}

type EthBlockNumberMethodHandler

type EthBlockNumberMethodHandler struct {
	// contains filtered or unexported fields
}

func (*EthBlockNumberMethodHandler) GetRPCMethod

func (e *EthBlockNumberMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*RPCRes, error)

func (*EthBlockNumberMethodHandler) PutRPCMethod

type EthCallMethodHandler

type EthCallMethodHandler struct {
	// contains filtered or unexported fields
}

func (*EthCallMethodHandler) GetRPCMethod

func (e *EthCallMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*RPCRes, error)

func (*EthCallMethodHandler) PutRPCMethod

func (e *EthCallMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res *RPCRes) error

type EthGasPriceMethodHandler

type EthGasPriceMethodHandler struct {
	// contains filtered or unexported fields
}

func (*EthGasPriceMethodHandler) GetRPCMethod

func (e *EthGasPriceMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*RPCRes, error)

func (*EthGasPriceMethodHandler) PutRPCMethod

type EthGetBlockByNumberMethodHandler

type EthGetBlockByNumberMethodHandler struct {
	// contains filtered or unexported fields
}

func (*EthGetBlockByNumberMethodHandler) GetRPCMethod

func (e *EthGetBlockByNumberMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*RPCRes, error)

func (*EthGetBlockByNumberMethodHandler) PutRPCMethod

func (e *EthGetBlockByNumberMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res *RPCRes) error

type EthGetBlockRangeMethodHandler

type EthGetBlockRangeMethodHandler struct {
	// contains filtered or unexported fields
}

func (*EthGetBlockRangeMethodHandler) GetRPCMethod

func (e *EthGetBlockRangeMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*RPCRes, error)

func (*EthGetBlockRangeMethodHandler) PutRPCMethod

func (e *EthGetBlockRangeMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res *RPCRes) error

type EthLastValueCache

type EthLastValueCache struct {
	// contains filtered or unexported fields
}

func (*EthLastValueCache) Read

func (h *EthLastValueCache) Read(ctx context.Context) (string, error)

func (*EthLastValueCache) Start

func (h *EthLastValueCache) Start()

func (*EthLastValueCache) Stop

func (h *EthLastValueCache) Stop()

type GetLatestBlockNumFn

type GetLatestBlockNumFn func(ctx context.Context) (uint64, error)

type GetLatestGasPriceFn

type GetLatestGasPriceFn func(ctx context.Context) (uint64, error)

type LimitedHTTPClient

type LimitedHTTPClient struct {
	http.Client
	// contains filtered or unexported fields
}

func (*LimitedHTTPClient) DoLimited

func (c *LimitedHTTPClient) DoLimited(req *http.Request) (*http.Response, error)

type LocalBackendRateLimiter

type LocalBackendRateLimiter struct {
	// contains filtered or unexported fields
}

func NewLocalBackendRateLimiter

func NewLocalBackendRateLimiter() *LocalBackendRateLimiter

func (*LocalBackendRateLimiter) DecBackendWSConns

func (l *LocalBackendRateLimiter) DecBackendWSConns(name string) error

func (*LocalBackendRateLimiter) FlushBackendWSConns

func (l *LocalBackendRateLimiter) FlushBackendWSConns(names []string) error

func (*LocalBackendRateLimiter) IncBackendRPS

func (l *LocalBackendRateLimiter) IncBackendRPS(name string) (int, error)

func (*LocalBackendRateLimiter) IncBackendWSConns

func (l *LocalBackendRateLimiter) IncBackendWSConns(name string, max int) (bool, error)

func (*LocalBackendRateLimiter) IsBackendOnline

func (l *LocalBackendRateLimiter) IsBackendOnline(name string) (bool, error)

func (*LocalBackendRateLimiter) SetBackendOffline

func (l *LocalBackendRateLimiter) SetBackendOffline(name string, duration time.Duration) error

type MethodMappingsConfig

type MethodMappingsConfig map[string]string

type MetricsConfig

type MetricsConfig struct {
	Enabled bool   `toml:"enabled"`
	Host    string `toml:"host"`
	Port    int    `toml:"port"`
}

type NoopRPCCache

type NoopRPCCache struct{}

func (*NoopRPCCache) GetRPC

func (n *NoopRPCCache) GetRPC(context.Context, *RPCReq) (*RPCRes, error)

func (*NoopRPCCache) PutRPC

func (n *NoopRPCCache) PutRPC(context.Context, *RPCReq, *RPCRes) error

type RPCCache

type RPCCache interface {
	GetRPC(ctx context.Context, req *RPCReq) (*RPCRes, error)
	PutRPC(ctx context.Context, req *RPCReq, res *RPCRes) error
}

type RPCErr

type RPCErr struct {
	Code          int    `json:"code"`
	Message       string `json:"message"`
	HTTPErrorCode int    `json:"-"`
}

func ErrInvalidRequest

func ErrInvalidRequest(msg string) *RPCErr

func (*RPCErr) Clone

func (r *RPCErr) Clone() *RPCErr

func (*RPCErr) Error

func (r *RPCErr) Error() string

type RPCMethodHandler

type RPCMethodHandler interface {
	GetRPCMethod(context.Context, *RPCReq) (*RPCRes, error)
	PutRPCMethod(context.Context, *RPCReq, *RPCRes) error
}

type RPCReq

type RPCReq struct {
	JSONRPC string          `json:"jsonrpc"`
	Method  string          `json:"method"`
	Params  json.RawMessage `json:"params"`
	ID      json.RawMessage `json:"id"`
}

func ParseRPCReq

func ParseRPCReq(body []byte) (*RPCReq, error)

type RPCRes

type RPCRes struct {
	JSONRPC string
	Result  interface{}
	Error   *RPCErr
	ID      json.RawMessage
}

func NewRPCErrorRes

func NewRPCErrorRes(id json.RawMessage, err error) *RPCRes

func NewRPCRes

func NewRPCRes(id json.RawMessage, result interface{}) *RPCRes

func ParseRPCRes

func ParseRPCRes(r io.Reader) (*RPCRes, error)

func (*RPCRes) IsError

func (r *RPCRes) IsError() bool

func (*RPCRes) MarshalJSON

func (r *RPCRes) MarshalJSON() ([]byte, error)

type RateLimitConfig

type RateLimitConfig struct {
	RatePerSecond    int      `toml:"rate_per_second"`
	ExemptOrigins    []string `toml:"exempt_origins"`
	ExemptUserAgents []string `toml:"exempt_user_agents"`
	ErrorMessage     string   `toml:"error_message"`
}

type RedisBackendRateLimiter

type RedisBackendRateLimiter struct {
	// contains filtered or unexported fields
}

func (*RedisBackendRateLimiter) DecBackendWSConns

func (r *RedisBackendRateLimiter) DecBackendWSConns(name string) error

func (*RedisBackendRateLimiter) FlushBackendWSConns

func (r *RedisBackendRateLimiter) FlushBackendWSConns(names []string) error

func (*RedisBackendRateLimiter) IncBackendRPS

func (r *RedisBackendRateLimiter) IncBackendRPS(name string) (int, error)

func (*RedisBackendRateLimiter) IncBackendWSConns

func (r *RedisBackendRateLimiter) IncBackendWSConns(name string, max int) (bool, error)

func (*RedisBackendRateLimiter) IsBackendOnline

func (r *RedisBackendRateLimiter) IsBackendOnline(name string) (bool, error)

func (*RedisBackendRateLimiter) SetBackendOffline

func (r *RedisBackendRateLimiter) SetBackendOffline(name string, duration time.Duration) error

type RedisConfig

type RedisConfig struct {
	URL string `toml:"url"`
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(
	backendGroups map[string]*BackendGroup,
	wsBackendGroup *BackendGroup,
	wsMethodWhitelist *StringSet,
	rpcMethodMappings map[string]string,
	maxBodySize int64,
	authenticatedPaths map[string]string,
	timeout time.Duration,
	maxUpstreamBatchSize int,
	cache RPCCache,
	rateLimitConfig RateLimitConfig,
	enableRequestLog bool,
	maxRequestBodyLogLen int,
) (*Server, error)

func (*Server) HandleHealthz

func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request)

func (*Server) HandleRPC

func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request)

func (*Server) HandleWS

func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request)

func (*Server) RPCListenAndServe

func (s *Server) RPCListenAndServe(host string, port int) error

func (*Server) Shutdown

func (s *Server) Shutdown()

func (*Server) WSListenAndServe

func (s *Server) WSListenAndServe(host string, port int) error

type ServerConfig

type ServerConfig struct {
	RPCHost           string `toml:"rpc_host"`
	RPCPort           int    `toml:"rpc_port"`
	WSHost            string `toml:"ws_host"`
	WSPort            int    `toml:"ws_port"`
	MaxBodySizeBytes  int64  `toml:"max_body_size_bytes"`
	MaxConcurrentRPCs int64  `toml:"max_concurrent_rpcs"`

	// TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections
	TimeoutSeconds int `toml:"timeout_seconds"`

	MaxUpstreamBatchSize int `toml:"max_upstream_batch_size"`

	EnableRequestLog     bool `toml:"enable_request_log"`
	MaxRequestBodyLogLen int  `toml:"max_request_body_log_len"`
}

type ServerRateLimiter

type ServerRateLimiter struct {
}

type StaticMethodHandler

type StaticMethodHandler struct {
	// contains filtered or unexported fields
}

func (*StaticMethodHandler) GetRPCMethod

func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*RPCRes, error)

func (*StaticMethodHandler) PutRPCMethod

func (e *StaticMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res *RPCRes) error

type StringSet

type StringSet struct {
	// contains filtered or unexported fields
}

func NewStringSet

func NewStringSet() *StringSet

func NewStringSetFromStrings

func NewStringSetFromStrings(in []string) *StringSet

func (*StringSet) Add

func (s *StringSet) Add(str string)

func (*StringSet) Entries

func (s *StringSet) Entries() []string

func (*StringSet) Extend

func (s *StringSet) Extend(in []string) *StringSet

func (*StringSet) Has

func (s *StringSet) Has(test string) bool

type WSProxier

type WSProxier struct {
	// contains filtered or unexported fields
}

func NewWSProxier

func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn, methodWhitelist *StringSet) *WSProxier

func (*WSProxier) Proxy

func (w *WSProxier) Proxy(ctx context.Context) error

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL