collector

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2025 License: MIT Imports: 38 Imported by: 0

Documentation

Overview

Package collector contains the mempool collector service

Index

Constants

View Source
const (
	KeyStatsAll       = "all"
	KeyStatsFirst     = "first"
	KeyStatsUnique    = "unique"
	KeyStatsTxOnChain = "tx-onchain"
	KeyStatsTxTrash   = "tx-trash"
)

Variables

View Source
var ErrNoDSN = fmt.Errorf("Clickhouse DSN is required")

Functions

This section is empty.

Types

type BlxNodeConnection

type BlxNodeConnection struct {
	// contains filtered or unexported fields
}

func NewBlxNodeConnection

func NewBlxNodeConnection(opts BlxNodeOpts) *BlxNodeConnection

func (*BlxNodeConnection) Start

func (nc *BlxNodeConnection) Start()

type BlxNodeConnectionGRPC added in v0.5.2

type BlxNodeConnectionGRPC struct {
	// contains filtered or unexported fields
}

func NewBlxNodeConnectionGRPC added in v0.5.2

func NewBlxNodeConnectionGRPC(opts BlxNodeOpts) *BlxNodeConnectionGRPC

func (*BlxNodeConnectionGRPC) Start added in v0.5.2

func (nc *BlxNodeConnectionGRPC) Start()

type BlxNodeOpts

type BlxNodeOpts struct {
	TxC        chan common.TxIn
	Log        *zap.SugaredLogger
	AuthHeader string
	URL        string // optional override, default: blxDefaultURL
	SourceTag  string // optional override, default: "blx" (common.BloxrouteTag)
}

type ChainboundNodeConnection

type ChainboundNodeConnection struct {
	// contains filtered or unexported fields
}

func NewChainboundNodeConnection

func NewChainboundNodeConnection(opts ChainboundNodeOpts) *ChainboundNodeConnection

func (*ChainboundNodeConnection) Start

func (cbc *ChainboundNodeConnection) Start()

type ChainboundNodeOpts

type ChainboundNodeOpts struct {
	TxC       chan common.TxIn
	Log       *zap.SugaredLogger
	APIKey    string
	URL       string // optional override, default: ChainboundDefaultURL
	SourceTag string // optional override, default: "Chainbound"
}

type Clickhouse added in v1.1.0

type Clickhouse struct {
	// contains filtered or unexported fields
}

func NewClickhouse added in v1.1.0

func NewClickhouse(opts ClickhouseOpts) (*Clickhouse, error)

NewClickhouse creates a new Clickhouse instance with a connection to the database.

func (*Clickhouse) AddSourceLog added in v1.1.0

func (ch *Clickhouse) AddSourceLog(timeReceived time.Time, hash, source, location string)

AddSourceLog adds a source log to the Clickhouse batch. If the batch size exceeds the configured limit, it sends the batch to Clickhouse. This function is thread-safe.

func (*Clickhouse) AddTransaction added in v1.1.0

func (ch *Clickhouse) AddTransaction(tx common.TxIn) error

AddTransaction adds a transaction to the Clickhouse batch. If the batch size exceeds the configured limit, it sends the batch to Clickhouse. This function is thread-safe.

func (*Clickhouse) FlushCurrentBatches added in v1.2.0

func (ch *Clickhouse) FlushCurrentBatches()

type ClickhouseOpts added in v1.1.0

type ClickhouseOpts struct {
	Log *zap.SugaredLogger
	DSN string
}

type Collector added in v1.2.0

type Collector struct {
	// contains filtered or unexported fields
}

func New added in v1.2.0

func New(opts CollectorOpts) *Collector

func (*Collector) Shutdown added in v1.2.0

func (c *Collector) Shutdown()

Shutdown stops the collector and flush all pending transactions

func (*Collector) Start added in v1.2.0

func (c *Collector) Start()

Start kicks off all the service components in the background

func (*Collector) StartAPIServer added in v1.2.0

func (c *Collector) StartAPIServer() *api.Server

func (*Collector) StartMetricsServer added in v1.2.0

func (c *Collector) StartMetricsServer()

type CollectorOpts

type CollectorOpts struct {
	Log      *zap.SugaredLogger
	UID      string
	Location string // location of the collector, will be stored in sourcelogs
	Nodes    []string
	OutDir   string

	CheckNodeURI  string
	ClickhouseDSN string

	BloxrouteAuth  []string
	EdenAuth       []string
	ChainboundAuth []string

	Receivers               []string
	ReceiversAllowedSources []string

	APIListenAddr     string
	MetricsListenAddr string
	EnablePprof       bool // if true, enables pprof on the metrics server
}

type EdenNodeConnection added in v0.6.0

type EdenNodeConnection struct {
	// contains filtered or unexported fields
}

func NewEdenNodeConnection added in v0.6.0

func NewEdenNodeConnection(opts EdenNodeOpts) *EdenNodeConnection

func (*EdenNodeConnection) Start added in v0.6.0

func (nc *EdenNodeConnection) Start()

type EdenNodeConnectionGRPC added in v0.6.0

type EdenNodeConnectionGRPC struct {
	// contains filtered or unexported fields
}

func NewEdenNodeConnectionGRPC added in v0.6.0

func NewEdenNodeConnectionGRPC(opts EdenNodeOpts) *EdenNodeConnectionGRPC

func (*EdenNodeConnectionGRPC) Start added in v0.6.0

func (nc *EdenNodeConnectionGRPC) Start()

type EdenNodeOpts added in v0.6.0

type EdenNodeOpts struct {
	TxC        chan common.TxIn
	Log        *zap.SugaredLogger
	AuthHeader string
	URL        string // optional override, default: edenDefaultURL
	SourceTag  string // optional override, default: "eden" (common.SourceTagEden)
}

type HTTPReceiver added in v0.8.0

type HTTPReceiver struct {
	// contains filtered or unexported fields
}

func NewHTTPReceiver added in v0.8.0

func NewHTTPReceiver(url string) *HTTPReceiver

func (*HTTPReceiver) SendTx added in v0.8.0

func (r *HTTPReceiver) SendTx(ctx context.Context, tx *common.TxIn) error

type NodeConnection

type NodeConnection struct {
	// contains filtered or unexported fields
}

func NewNodeConnection

func NewNodeConnection(log *zap.SugaredLogger, nodeURI string, txC chan common.TxIn) *NodeConnection

func (*NodeConnection) StartInBackground added in v0.5.2

func (nc *NodeConnection) StartInBackground()

type OutFiles added in v0.5.2

type OutFiles struct {
	FTxs       *os.File
	FSourcelog *os.File
	FTrash     *os.File
}

type SourceLogEntry added in v1.1.0

type SourceLogEntry struct {
	ReceivedAt time.Time
	Hash       string
	Source     string
	Location   string
}

type SourceMetrics added in v0.5.2

type SourceMetrics struct {
	// contains filtered or unexported fields
}

func NewMetricsCounter added in v0.5.2

func NewMetricsCounter() SourceMetrics

func (*SourceMetrics) Get added in v0.5.2

func (sc *SourceMetrics) Get(cntType string) map[string]map[string]uint64

func (*SourceMetrics) Inc added in v0.5.2

func (sc *SourceMetrics) Inc(cntType, source string)

func (*SourceMetrics) IncKey added in v0.5.2

func (sc *SourceMetrics) IncKey(cntType, source, key string)

func (*SourceMetrics) Logger added in v0.5.2

func (sc *SourceMetrics) Logger(log *zap.SugaredLogger, cntType string, useLen bool) *zap.SugaredLogger

func (*SourceMetrics) Reset added in v0.5.2

func (sc *SourceMetrics) Reset()

type TxDetail

type TxDetail struct {
	Timestamp int64  `json:"timestamp"`
	Hash      string `json:"hash"`
	RawTx     string `json:"rawTx"`
}

type TxProcessor

type TxProcessor struct {
	// contains filtered or unexported fields
}

func NewTxProcessor

func NewTxProcessor(opts TxProcessorOpts) *TxProcessor

func (*TxProcessor) Shutdown added in v1.2.0

func (p *TxProcessor) Shutdown()

func (*TxProcessor) Start

func (p *TxProcessor) Start()

type TxProcessorOpts added in v0.5.2

type TxProcessorOpts struct {
	Log                     *zap.SugaredLogger
	OutDir                  string // if empty no files will be written
	UID                     string
	Location                string // location of the collector, will be stored in sourcelogs
	CheckNodeURI            string
	ClickhouseDSN           string
	HTTPReceivers           []string
	ReceiversAllowedSources []string
	APIServer               *api.Server
}

type TxReceiver added in v0.8.0

type TxReceiver interface {
	SendTx(ctx context.Context, tx *common.TxIn) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL