events

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 5, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package events provides data structures and functions to stream and store events (logs) from the Ethereum blockchain.

Messages in the event stream have three possible actions:

Append a Block
Rollback to a given Block (happens on chain reorganization)
SetNext to a given block number.

Depending on the event filter used to retrieve logs, the stream may not contain logs for every block. The SetNext message allows the stream to signal that blocks have been read, but no events found. The Append and Rollback messages are straightforward.

The central interfaces are Streamer and EventLog (which is also a Streamer). A Streamer knows how to stream events. The ChainStreamer implements this to stream from the Ethereum blockchain by overlapping eth_getLogs calls, sending Rollback messages when a chain reorganization is detected. An EventLog implements both the receiving methods of a stream (Append, Rollback, SetNext), and the Streamer interface to emit the stored events.

Index

Constants

View Source
const DefaultBatchOverlap uint64 = 10 // overlap between polls
View Source
const DefaultFetchBatchSize uint64 = 2000 // size of call to getLogs
View Source
const DefaultPollInterval int = 15 // seconds
View Source
const MaxEventlogSize uint64 = 1024 // blocks

Variables

This section is empty.

Functions

func AddTransactionData added in v0.2.0

func AddTransactionData(ctx context.Context, client *ethclient.Client, bs *BlockSlice) error

func BigIntFromString

func BigIntFromString(s string) (*big.Int, error)

func BigIntToString

func BigIntToString(x *big.Int) string

func BlockSliceToProto

func BlockSliceToProto(bs *BlockSlice) *epb.BlockSlice

func BlockToProto

func BlockToProto(b *Block) *epb.Block
message Block {
    uint64 number = 1;
    bytes hash = 2;
    repeated Event events = 3;
}

func EventToProto

func EventToProto(e *Event) *epb.Event

EventToProto creates a proto representation of an Event.

func FilterQueryFromProto

func FilterQueryFromProto(pb *epb.FilterQuery) (ethereum.FilterQuery, error)

func FilterQueryToProto

func FilterQueryToProto(q *ethereum.FilterQuery) *epb.FilterQuery

func MatchBlocks

func MatchBlocks(new, old *BlockSlice) (bool, uint64, error)

MatchHistory compares the new blocks with the old where they overlap. It returns true if all blocks in the overlap have the same hash. The second return value indicates the latest block that agrees. If no block agrees, it returns (false, 0, nil).

Types

type Action

type Action int
const (
	Append Action = iota
	Rollback
	SetNext
)

type Block

type Block struct {
	Number uint64
	Hash   common.Hash
	Events []Event
}

func BlockFromProto

func BlockFromProto(pb *epb.Block) (*Block, error)

type BlockSlice

type BlockSlice struct {
	Start            uint64
	End              uint64
	DistanceFromHead uint64
	Blocks           []*Block
}

func BlockSliceFromProto

func BlockSliceFromProto(pb *epb.BlockSlice) (*BlockSlice, error)

func EmptyBlockSlice

func EmptyBlockSlice(from uint64) *BlockSlice

func GetLogs

func GetLogs(ctx context.Context, client *ethclient.Client, q *ethereum.FilterQuery) (*BlockSlice, error)

GetLogs returns a batch of logs matching a query. The blocks in the block are guaranteed to be sorted by increasing Number, and the events therein by Index.

func (*BlockSlice) Append

func (b *BlockSlice) Append(blk *Block) error

func (*BlockSlice) Concat

func (b *BlockSlice) Concat(other *BlockSlice) error

func (*BlockSlice) DeleteBeforeBlock

func (b *BlockSlice) DeleteBeforeBlock(n uint64)

func (*BlockSlice) DeleteFromBlock

func (b *BlockSlice) DeleteFromBlock(n uint64)

func (*BlockSlice) Extend

func (b *BlockSlice) Extend(n uint64) error

func (*BlockSlice) Rollback

func (b *BlockSlice) Rollback(n uint64) error

type CanceledError

type CanceledError string
const Canceled CanceledError = CanceledError("")

func (CanceledError) Error

func (CanceledError) Error() string

type ChainStreamer

type ChainStreamer struct {
	Ctx            context.Context
	Url            string
	Filter         ethereum.FilterQuery
	FetchBatchSize uint64
	BatchOverlap   uint64
	FetchTxDetails bool
}

ChainStreamer implements a Streamer for the Ethereum blockchain.

func (*ChainStreamer) Stream

func (cr *ChainStreamer) Stream(done chan struct{}, from uint64) (*Subscription, error)

type Event

type Event struct {
	Address common.Address
	Topics  []common.Hash
	Data    []byte

	BlockNumber uint64
	BlockHash   common.Hash
	Index       uint64 // index of log in block

	TxHash  common.Hash
	TxIndex uint64 // index of tx in block
	TxData  []byte
	TxValue *big.Int
	TxFrom  common.Address
	TxGas   uint64
}

func EventFromProto

func EventFromProto(pb *epb.Event) (*Event, error)

EventFromProto creates an Event from its proto representation.

func (*Event) Log

func (e *Event) Log() *types.Log

type EventLog

type EventLog interface {
	Streamer

	Append(*Block) error
	Rollback(uint64) error
	SetNext(uint64) error
	FirstBlock() uint64
	NextBlock() uint64
	Filter() ethereum.FilterQuery
	Close() error
}

EventLog represents a sequence of events matching a filter.

type InMemoryEventLog

type InMemoryEventLog struct {
	// contains filtered or unexported fields
}

InMemoryEventLog is an in-memory implementation of the EventLog interface.

func InMemoryEventLogFromProto

func InMemoryEventLogFromProto(pb *epb.EventLogFile) (*InMemoryEventLog, error)

func NewInMemoryEventLog

func NewInMemoryEventLog(from uint64, filter ethereum.FilterQuery) *InMemoryEventLog

func (*InMemoryEventLog) Append

func (l *InMemoryEventLog) Append(b *Block) error

func (*InMemoryEventLog) Close

func (l *InMemoryEventLog) Close() error

func (*InMemoryEventLog) Filter

func (l *InMemoryEventLog) Filter() ethereum.FilterQuery

func (*InMemoryEventLog) FirstBlock

func (l *InMemoryEventLog) FirstBlock() uint64

func (*InMemoryEventLog) NextBlock

func (l *InMemoryEventLog) NextBlock() uint64

func (*InMemoryEventLog) Rollback

func (l *InMemoryEventLog) Rollback(n uint64) error

func (*InMemoryEventLog) SetNext

func (l *InMemoryEventLog) SetNext(n uint64) error

func (*InMemoryEventLog) Stream

func (l *InMemoryEventLog) Stream(done chan struct{}, from uint64) (*Subscription, error)

func (*InMemoryEventLog) ToProto

func (l *InMemoryEventLog) ToProto() *epb.EventLogFile

type LiveEventLog

type LiveEventLog struct {
	// contains filtered or unexported fields
}

LiveEventLog combines an EventLog and a ChainStreamer to make a new Streamer that streams first from the EventLog, and then from the ChainStreamer. When streaming from the ChainStreamer the messages are both sent to the EventLog and the subscriber.

func NewLiveEventLog

func NewLiveEventLog(e EventLog, s ChainStreamer) *LiveEventLog

func (*LiveEventLog) Stream

func (l *LiveEventLog) Stream(done chan struct{}, from uint64) (*Subscription, error)

type Message

type Message struct {
	Action Action
	Number uint64
	Block  *Block
}

type Streamer

type Streamer interface {
	Stream(done chan struct{}, from uint64) (*Subscription, error)
}

type Subscription

type Subscription struct {
	C    chan *Message
	Err  chan error
	Done chan struct{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL