bstream

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2020 License: Apache-2.0 Imports: 26 Imported by: 120

README

dfuse Blocks Streaming Library

reference License

The bstream package manages flows of blocks and forks in a blockchain through a Handler-based interface similar to net/http.

Usage

Flows are composed by assembling Handlers:

type HandlerFunc func(blk *Block, obj interface{}) error

and are kicked off by passing them to a Source

Overview

All streaming features of dfuse use this package.

Sources include:

  • FileSource feeds from 100-blocks files in some dstore-based location (some object storage, or local filesystem files)
  • LiveSource streams from a gRPC-based block streamer (fed from instrumented blockchain nodes directly).
  • JoiningSource which bridges a FileSource and a LiveSource transparently, so you can stream from files and then handoff to a real-time stream.

Handlers include:

  • Forkable (in forkable/) which manages chain reorganizations, undos, according to the chain's consensus (longest chain, etc..)
  • SubscriptionHub (in hub/): In-process hub to dispatch blocks from a remote source to all consumers inside a Go process
  • A few gates, that allow the flowing of blocks only upon certain conditions (BlockNumGate, BlockIDGate, RealtimeGate, RealtimeTripper, which can be inclusive or exclusive). See gates.go.

Contributing

Issues and PR in this repo related strictly to the low-level functionalities of bstream

Report any protocol-specific issues in their respective repositories

Please first refer to the general dfuse contribution guide, if you wish to contribute to this code base.

Please write and run tests.

License

Apache 2.0

Documentation

Index

Constants

View Source
const (
	GateInclusive = GateType(iota)
	GateExclusive
)

Variables

View Source
var BlockRefEmpty = NewBlockRef("", 0)
View Source
var GetProtocolFirstBlock = uint64(0)
View Source
var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream"))
View Source
var TestBlockReaderFactory = BlockReaderFactoryFunc(testBlockReaderFactory)
View Source
var TestProtocol = pbbstream.Protocol(0xEADBEEF)

Hopefully, this block kind value will never be used!

Functions

func DoForProtocol

func DoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func() error) error

DoForProtocol extra the worker (a lambda) that will be invoked based on the received `kind` parameter. If the mapping exists, the worker is invoked and the error returned with the call. If the mapping does not exist, an error is returned. In all other cases, this function returns `nil`.

func MustDoForProtocol

func MustDoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func())

MustDoForProtocol perform the same work, but accept only non-error lambdas as the worker and an inexistant mapping will panic.

func ValidateRegistry

func ValidateRegistry() error

Types

type BasicBlockRef

type BasicBlockRef struct {
	// contains filtered or unexported fields
}

BasicBlockRef assumes the id and num are completely separated and represents two independent piece of information. The `ID()` in this case is the `id` field and the `Num()` is the `num` field.

func NewBlockRef

func NewBlockRef(id string, num uint64) *BasicBlockRef

func (*BasicBlockRef) ID

func (b *BasicBlockRef) ID() string

func (*BasicBlockRef) Num

func (b *BasicBlockRef) Num() uint64

func (*BasicBlockRef) String

func (b *BasicBlockRef) String() string

type BasicBlockRefFromID

type BasicBlockRefFromID struct {
	// contains filtered or unexported fields
}

BasicBlockRefFromID is a struct wrapper around `BlockRefFromID` but with the `num` field cached extracted from the `BlockRefFromID`. This implementation can be used for performanace critical part where you don't want to extract the block number over and over again and instead having it cached once.

func NewBlockRefFromID

func NewBlockRefFromID(id BlockRefFromID) *BasicBlockRefFromID

func (*BasicBlockRefFromID) ID

func (b *BasicBlockRefFromID) ID() string

func (*BasicBlockRefFromID) Num

func (b *BasicBlockRefFromID) Num() uint64

func (*BasicBlockRefFromID) String

func (b *BasicBlockRefFromID) String() string

type Block

type Block struct {
	Id         string
	Number     uint64
	PreviousId string
	Timestamp  time.Time
	LibNum     uint64

	PayloadKind    pbbstream.Protocol
	PayloadVersion int32
	PayloadBuffer  []byte
	// contains filtered or unexported fields
}

Block reprensents a block abstraction across all dfuse systems and for now is wide enough to accomodate a varieties of implementation. It's the actual stucture that flows all around `bstream`.

func BlockFromBytes

func BlockFromBytes(bytes []byte) (*Block, error)

func BlockFromProto

func BlockFromProto(b *pbbstream.Block) (*Block, error)

func MustBlockFromProto

func MustBlockFromProto(b *pbbstream.Block) *Block

func TestBlock

func TestBlock(id, prev string) *Block

func TestBlockFromJSON

func TestBlockFromJSON(jsonContent string) *Block

func TestBlockWithLIBNum

func TestBlockWithLIBNum(id, previousID string, newLIB uint64) *Block

func TestBlockWithTimestamp

func TestBlockWithTimestamp(id, prev string, timestamp time.Time) *Block

func (*Block) ID

func (b *Block) ID() string

func (*Block) Kind

func (b *Block) Kind() pbbstream.Protocol

func (*Block) LIBNum

func (b *Block) LIBNum() uint64

func (*Block) Num

func (b *Block) Num() uint64

func (*Block) Payload

func (b *Block) Payload() []byte

func (*Block) PreviousID

func (b *Block) PreviousID() string

func (*Block) String

func (b *Block) String() string

func (*Block) Time

func (b *Block) Time() time.Time

func (*Block) ToNative

func (b *Block) ToNative() interface{}

func (*Block) ToProto

func (b *Block) ToProto() (*pbbstream.Block, error)

func (*Block) Version

func (b *Block) Version() int32

type BlockDecoder

type BlockDecoder interface {
	Decode(blk *Block) (interface{}, error)
}
var GetBlockDecoder BlockDecoder

type BlockDecoderFunc

type BlockDecoderFunc func(blk *Block) (interface{}, error)

func (BlockDecoderFunc) Decode

func (f BlockDecoderFunc) Decode(blk *Block) (interface{}, error)

type BlockIDGate

type BlockIDGate struct {
	Name string

	MaxHoldOff int
	// contains filtered or unexported fields
}

func NewBlockIDGate

func NewBlockIDGate(blockID string, gateType GateType, h Handler) *BlockIDGate

func (*BlockIDGate) ProcessBlock

func (g *BlockIDGate) ProcessBlock(blk *Block, obj interface{}) error

type BlockNumGate

type BlockNumGate struct {
	Name string

	MaxHoldOff int
	// contains filtered or unexported fields
}

func NewBlockNumGate

func NewBlockNumGate(blockNum uint64, gateType GateType, h Handler) *BlockNumGate

func (*BlockNumGate) ProcessBlock

func (g *BlockNumGate) ProcessBlock(blk *Block, obj interface{}) error

type BlockNumberGator

type BlockNumberGator struct {
	// contains filtered or unexported fields
}

func NewBlockNumberGator

func NewBlockNumberGator(blockNum uint64) *BlockNumberGator

func NewExclusiveBlockNumberGator

func NewExclusiveBlockNumberGator(blockNum uint64) *BlockNumberGator

func (*BlockNumberGator) Pass

func (g *BlockNumberGator) Pass(block *Block) bool

func (*BlockNumberGator) SetName

func (g *BlockNumberGator) SetName(name string)

type BlockReader

type BlockReader interface {
	Read() (*Block, error)
}

BlockReader is a reader protocol reading out bstream `Block` from a stream source. The reader respects the `io.Reader` contract in respect to `io.EOF`, i.e. it's possible to that both `block, io.EOF` be returned by the reader.

You shall treat a non-nil block regardless of the `err` as if present, it's guaranteed it's valid. The subsequent call will still return `nil, io.EOF`.

type BlockReaderFactory

type BlockReaderFactory interface {
	New(reader io.Reader) (BlockReader, error)
}
var GetBlockReaderFactory BlockReaderFactory

type BlockReaderFactoryFunc

type BlockReaderFactoryFunc func(reader io.Reader) (BlockReader, error)

func (BlockReaderFactoryFunc) New

type BlockRef

type BlockRef interface {
	ID() string
	Num() uint64
	String() string
}

BlockRef represents a reference to a block and is mainly define as the pair `<BlockID, BlockNum>`. A `Block` interface should always implements the `BlockRef` interface.

The interface enforce also the creation of a `Stringer` object. We expected all format to be rendered in the form `#<BlockNum> (<Id>)`. This is to easy formatted output when using `zap.Stringer(...)`.

type BlockRefFromID

type BlockRefFromID string

BlockRefFromID is a simple wrapper around a string assuming the block number is in the first 8 characters of the id as a big endian encoded hexadecimal number and the full string represents the ID.

func (BlockRefFromID) ID

func (b BlockRefFromID) ID() string

func (BlockRefFromID) Num

func (b BlockRefFromID) Num() uint64

func (BlockRefFromID) String

func (b BlockRefFromID) String() string

type BlockWriter

type BlockWriter interface {
	Write(block *Block) error
}

type BlockWriterFactory

type BlockWriterFactory interface {
	New(writer io.Writer) (BlockWriter, error)
}
var GetBlockWriterFactory BlockWriterFactory

type BlockWriterFactoryFunc

type BlockWriterFactoryFunc func(writer io.Writer) (BlockWriter, error)

func (BlockWriterFactoryFunc) New

type Buffer

type Buffer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer(name string) *Buffer

func (*Buffer) AllBlocks

func (b *Buffer) AllBlocks() (out []BlockRef)

func (*Buffer) AppendHead

func (b *Buffer) AppendHead(blk BlockRef)

func (*Buffer) Contains

func (b *Buffer) Contains(blockNum uint64) bool

func (*Buffer) Delete

func (b *Buffer) Delete(blk BlockRef)

func (*Buffer) Exists

func (b *Buffer) Exists(id string) bool

func (*Buffer) GetByID

func (b *Buffer) GetByID(id string) (blk BlockRef)

func (*Buffer) Head

func (b *Buffer) Head() (blk BlockRef)

func (*Buffer) HeadBlocks

func (b *Buffer) HeadBlocks(count int) []BlockRef

func (*Buffer) Len

func (b *Buffer) Len() int

Len() locks the buffer and returns its length. Watch out for deadlocks between buffer.lock and promises.lock if using this internally.

func (*Buffer) PopTail

func (b *Buffer) PopTail() (blockRef BlockRef)

func (*Buffer) Tail

func (b *Buffer) Tail() (blk BlockRef)

func (*Buffer) TruncateTail

func (b *Buffer) TruncateTail(lowBlockNumInclusive uint64) (truncated []BlockRef)

type EternalSource

type EternalSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewDelegatingEternalSource

func NewDelegatingEternalSource(sf SourceFromRefFactory, startBackAt EternalSourceStartBackAtBlock, h Handler) *EternalSource

func NewEternalSource

func NewEternalSource(sf SourceFromRefFactory, h Handler) *EternalSource

func (*EternalSource) Run

func (s *EternalSource) Run()

type EternalSourceStartBackAtBlock

type EternalSourceStartBackAtBlock func() (BlockRef, error)

type FileSource

type FileSource struct {
	Name string

	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewFileSource

func NewFileSource(
	blocksStore dstore.Store,
	startBlockNum uint64,
	parallelDownloads int,
	preprocFunc PreprocessFunc,
	h Handler,
	options ...FileSourceOption,
) *FileSource

NewFileSource will pipe potentially stream you 99 blocks before the given `startBlockNum`.

func (*FileSource) Run

func (s *FileSource) Run()

func (*FileSource) SetLogger

func (s *FileSource) SetLogger(logger *zap.Logger)

func (*FileSource) SetNotFoundCallback

func (s *FileSource) SetNotFoundCallback(f func(missingBlockNum uint64))

SetNotFoundCallback sets a callback function to be triggered when a blocks file is not found. Useful for joining with unmerged blocks

type FileSourceOption

type FileSourceOption = func(s *FileSource)

func FileSourceWithTimeThresholdGator

func FileSourceWithTimeThresholdGator(threshold time.Duration) FileSourceOption

type GateType

type GateType int

func (GateType) String

func (g GateType) String() string

type Gator

type Gator interface {
	Pass(block *Block) bool
}

type Handler

type Handler interface {
	ProcessBlock(blk *Block, obj interface{}) error
}

type HandlerFunc

type HandlerFunc func(blk *Block, obj interface{}) error

func (HandlerFunc) ProcessBlock

func (h HandlerFunc) ProcessBlock(blk *Block, obj interface{}) error

type JoiningSource

type JoiningSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewJoiningSource

func NewJoiningSource(fileSourceFactory, liveSourceFactory SourceFactory, h Handler, options ...JoiningSourceOption) *JoiningSource

func (*JoiningSource) Run

func (s *JoiningSource) Run()

func (*JoiningSource) SetName

func (s *JoiningSource) SetName(name string)

type JoiningSourceOption

type JoiningSourceOption = func(s *JoiningSource)

func JoiningSourceMergerAddr

func JoiningSourceMergerAddr(mergerAddr string) JoiningSourceOption

func JoiningSourceName

func JoiningSourceName(name string) JoiningSourceOption

func JoiningSourceRateLimit

func JoiningSourceRateLimit(rampLength int, sleepBetweenBlocks time.Duration) JoiningSourceOption

func JoiningSourceTargetBlockID

func JoiningSourceTargetBlockID(id string) JoiningSourceOption

JoiningSourceTargetBlockID is an option for when we know right away the ID of the block where we want to start. In this case we'll accept that block coming from live stream right away (if we have not started processing blocks from file) it prevents waiting for a file to be merged when the live stream is serving us our startBlockID This is not recommended from a block number because we could have missed a "version" of that block number from the live source, but in a filesource, we always start from the first occurence of a blocknum.

func JoiningSourceTargetBlockNum

func JoiningSourceTargetBlockNum(num uint64) JoiningSourceOption

JoiningSourceTargetBlockNum is like JoiningSourceTargetBlockID but allows starting immediately from block num == 2 on EOS or similar You better be DAMN SURE that you won't get a forked block at this number, beware

type MockSource

type MockSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewMockSource

func NewMockSource(blocks []*Block, handler Handler) *MockSource

func (*MockSource) Run

func (s *MockSource) Run()

type MultiplexedSource

type MultiplexedSource struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

MultiplexedSource contains a gator based on realtime

func NewMultiplexedSource

func NewMultiplexedSource(sourceFactories []SourceFactory, h Handler) *MultiplexedSource

func (*MultiplexedSource) Run

func (s *MultiplexedSource) Run()

type Pipeline

type Pipeline interface {
	ProcessBlock(blk *Block, obj interface{}) error
}

Pipeline will process all blocks through the `ProcessBlock()` function, unless the pipeline implements also `PipelinePreprocessor`, the `obj` will always nil.

If the pipeline was initialized with `irreversibleOnly`, only blk.Irreversible blocks will be processed (and post-processed). This is when used in conjunction with the irreversible index in parallel operations.

type PipelineFunc

type PipelineFunc func(blk *Block, obj interface{}) error

func (PipelineFunc) ProcessBlock

func (f PipelineFunc) ProcessBlock(blk *Block, obj interface{}) error

type PipelinePreprocessor

type PipelinePreprocessor interface {
	Pipeline
	PreprocessBlock(blk *Block) (obj interface{}, err error)
}

ParallelPipeline pre-processes the `blk` in parallel. If the returned `obj` is non-nil, it is passed to the `ProcessBlock()` function. If it is nil, the `ProcessBlock` call is skipped for this block.

Even if `ParallelPreprocess` is called in parallel, the calls to `ProcessBlock()` are guaranteed to be run linearly in the order received from the logs. Note that some blocks can arrive out of order when forked. You should be certain of a linear order if the Afterburner is configured to feed only irreversible blocks.

type PipelineStateFlusher

type PipelineStateFlusher interface {
	FlushState(exitError error) error
}

PipelineStateFlusher implements the FlushState() method, called when shutting down the Pipeliner

type PreprocessFunc

type PreprocessFunc func(blk *Block) (interface{}, error)

type PreprocessedBlock

type PreprocessedBlock struct {
	Block *Block
	Obj   interface{}
}

func (*PreprocessedBlock) ID

func (p *PreprocessedBlock) ID() string

func (*PreprocessedBlock) Num

func (p *PreprocessedBlock) Num() uint64

func (*PreprocessedBlock) String

func (p *PreprocessedBlock) String() string

type Preprocessor

type Preprocessor struct {
	// contains filtered or unexported fields
}

func NewPreprocessor

func NewPreprocessor(preprocFunc PreprocessFunc, next Handler) *Preprocessor

func (*Preprocessor) ProcessBlock

func (p *Preprocessor) ProcessBlock(blk *Block, obj interface{}) (err error)

type Publisher

type Publisher interface {
	Publish(*Block) (relayed bool)
	Listen() error
}

type RealtimeGate

type RealtimeGate struct {
	Name string
	// contains filtered or unexported fields
}

func NewRealtimeGate

func NewRealtimeGate(timeToRealtime time.Duration, h Handler) *RealtimeGate

func (*RealtimeGate) ProcessBlock

func (g *RealtimeGate) ProcessBlock(blk *Block, obj interface{}) error

func (*RealtimeGate) SetName

func (g *RealtimeGate) SetName(name string)

type RealtimeTripper

type RealtimeTripper struct {
	// contains filtered or unexported fields
}

RealtimeTripper is a pass-through handler that executes a function before the first block goes through.

func NewRealtimeTripper

func NewRealtimeTripper(timeToRealtime time.Duration, tripFunc func(), h Handler) *RealtimeTripper

func (*RealtimeTripper) ProcessBlock

func (t *RealtimeTripper) ProcessBlock(blk *Block, obj interface{}) error

type RecentBlockGetter

type RecentBlockGetter struct {
	// contains filtered or unexported fields
}

RecentBlockGetter requires a source that shuts down when ProcessBlock fails

func NewRecentBlockGetter

func NewRecentBlockGetter(sampleSize int) *RecentBlockGetter

func (*RecentBlockGetter) LatestBlock

func (g *RecentBlockGetter) LatestBlock() *Block

func (*RecentBlockGetter) ProcessBlock

func (g *RecentBlockGetter) ProcessBlock(blk *Block, obj interface{}) error

type ShortPipelineFunc

type ShortPipelineFunc func(blk *Block) error

func (ShortPipelineFunc) ProcessBlock

func (f ShortPipelineFunc) ProcessBlock(blk *Block, obj interface{}) error

type Shutterer

type Shutterer interface {
	Shutdown(error)
	Terminating() <-chan struct{}
	IsTerminating() bool
	Terminated() <-chan struct{}
	IsTerminated() bool
	OnTerminating(f func(error))
	OnTerminated(f func(error))
	Err() error
}

type SimpleTailManager

type SimpleTailManager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewSimpleTailManager

func NewSimpleTailManager(buffer *Buffer, bufferSize int) *SimpleTailManager

func (*SimpleTailManager) Launch

func (m *SimpleTailManager) Launch()

func (*SimpleTailManager) TailLock

func (m *SimpleTailManager) TailLock(blockNum uint64) (releaseFunc func(), err error)

type Source

type Source interface {
	Run()
	Shutterer // now an interface! WOW!
}

type SourceFactory

type SourceFactory func(h Handler) Source

type SourceFromNumFactory

type SourceFromNumFactory func(startBlockNum uint64, h Handler) Source

type SourceFromNumFactoryWithErr

type SourceFromNumFactoryWithErr func(startBlockNum uint64, h Handler) (Source, error)

type SourceFromRefFactory

type SourceFromRefFactory func(startBlockRef BlockRef, h Handler) Source

type StartBlockGetter

type StartBlockGetter func() (blockNum uint64)

type StartBlockResolver

type StartBlockResolver interface {
	Resolve(ctx context.Context, targetBlockNum uint64) (startBlockNum uint64, previousIrreversibleID string, err error)
}

StartBlockResolver should give you a start block number that will guarantee covering all necessary blocks to handle forks before the block that you want. This requires chain-specific implementations.

A StartBlockResolver helps determine what is the lowest block that you have to fetch from your block source to ensure that you can handle forks for a given target start block

ex: I want to start at block 1000 and I may have to start at block 700 if I don't have knowledge of which block 1000 is "irreversible")

  • the DumbStartBlockResolver may simply tell you to start at block 500 and be done with it.
  • a StartBlockResolver based on more data could tell you that you can start at block 1000 but that you need to set the irreversible ID to "00001000deadbeef" in your `forkable` (InclusiveLIB) so that you don't start on a forked block that can't be resolved
  • a StartBlockResolver based on a blocksource for EOSIO could fetch the "dposLIBNum" of your targetStartBlock, and tell you to start at that block (ex: 727)

type StartBlockResolverFunc

type StartBlockResolverFunc func(context.Context, uint64) (uint64, string, error)

func DumbStartBlockResolver

func DumbStartBlockResolver(precedingBlocks uint64) StartBlockResolverFunc

DumbStartBlockResolver will help you start x blocks before your target start block

func ParallelStartResolver

func ParallelStartResolver(resolvers []StartBlockResolver, attempts int) StartBlockResolverFunc

ParallelStartResolver will call multiple resolvers to get the fastest answer. It retries each resolver 'attempts' time before bailing out. If attempts<0, it will retry forever.

func (StartBlockResolverFunc) Resolve

func (s StartBlockResolverFunc) Resolve(ctx context.Context, targetBlockNum uint64) (uint64, string, error)

type Subscriber

type Subscriber interface {
	Read() (*Block, error)
	StartAtBlockID(ID string) bool
	GetBlockIDInBuffer(blockNum uint64) string
	Start(channelSize int)
	Started() bool
	WaitFor(ID string) <-chan interface{}

	Shutterer
}

Subscriber is a live blocks subscriber implementation

type TailLock

type TailLock struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TailLock manages inflight block queries, to feed into truncation mechanism, so it happens only when buffer is full, and no one is querying the blocks.

func NewTailLock

func NewTailLock() *TailLock

func (*TailLock) LowerBound

func (g *TailLock) LowerBound() uint64

func (*TailLock) TailLock

func (g *TailLock) TailLock(blockNum uint64) (releaseFunc func())

type TestAfterProcessBlockFunc

type TestAfterProcessBlockFunc func(blk *Block, obj interface{}, result error)

type TestBlockReader

type TestBlockReader struct {
	// contains filtered or unexported fields
}

func (*TestBlockReader) Read

func (r *TestBlockReader) Read() (*Block, error)

type TestBlockReaderBin

type TestBlockReaderBin struct {
	DBinReader *dbin.Reader
}

func (*TestBlockReaderBin) Read

func (l *TestBlockReaderBin) Read() (*Block, error)

type TestBlockWriterBin

type TestBlockWriterBin struct {
	DBinWriter *dbin.Writer
}

func (*TestBlockWriterBin) Write

func (w *TestBlockWriterBin) Write(block *Block) error

type TestPipeline

type TestPipeline struct {
	// contains filtered or unexported fields
}

TestPipeline is an instrumented Pipeline object.

func NewTestPipeline

func NewTestPipeline() *TestPipeline

func (*TestPipeline) Error

func (p *TestPipeline) Error(err error) (blk *Block, obj interface{}, readErr error)

Error consumes the next ProcessBlock and returns the provided error.

func (*TestPipeline) Next

func (p *TestPipeline) Next() (blk *Block, obj interface{}, err error)

Next consumes the net block and provides a `nil` error.

func (*TestPipeline) ProcessBlock

func (p *TestPipeline) ProcessBlock(blk *Block, obj interface{}) error

ProcessBlock implements the `Pipeline` interface.

type TestPipelineMiddleware

type TestPipelineMiddleware struct {
	Pipeline
	// contains filtered or unexported fields
}

TestPipelineMiddleware is a simplistic middleware with support only for ProcessBlock. It does *not* handle FlushState (yet), nor PreprocessBlock.

func NewTestPipelineMiddleware

func NewTestPipelineMiddleware(child Pipeline, afterProcessBlock TestAfterProcessBlockFunc) *TestPipelineMiddleware

func (*TestPipelineMiddleware) ProcessBlock

func (p *TestPipelineMiddleware) ProcessBlock(blk *Block, obj interface{}) error

type TestPublisher

type TestPublisher struct {
	Blocks []*Block
}

func NewTestPublisher

func NewTestPublisher() *TestPublisher

func (TestPublisher) Listen

func (TestPublisher) Listen() error

func (*TestPublisher) Publish

func (p *TestPublisher) Publish(blk *Block) (relayed bool)

type TestSource

type TestSource struct {
	*shutter.Shutter

	StartBlockID  string
	StartBlockNum uint64
	// contains filtered or unexported fields
}

func NewTestSource

func NewTestSource(h Handler) *TestSource

func (*TestSource) Push

func (t *TestSource) Push(b *Block, obj interface{}) error

func (*TestSource) Run

func (t *TestSource) Run()

type TestSourceFactory

type TestSourceFactory struct {
	Created chan *TestSource
}

func NewTestSourceFactory

func NewTestSourceFactory() *TestSourceFactory

func (*TestSourceFactory) NewSource

func (t *TestSourceFactory) NewSource(h Handler) Source

func (*TestSourceFactory) NewSourceFromNum

func (t *TestSourceFactory) NewSourceFromNum(blockNum uint64, h Handler) Source

func (*TestSourceFactory) NewSourceFromRef

func (t *TestSourceFactory) NewSourceFromRef(ref BlockRef, h Handler) Source

type TestSubscriber

type TestSubscriber struct {
	*shutter.Shutter

	WeAreThereYet bool
	// contains filtered or unexported fields
}

TestSubscriber instruments a Subscriber, implementing `Read()` and `Shutdown()`.

func NewTestSubscriber

func NewTestSubscriber() *TestSubscriber

func (*TestSubscriber) GetBlockIDInBuffer

func (s *TestSubscriber) GetBlockIDInBuffer(blockNum uint64) string

func (*TestSubscriber) PushBlock

func (s *TestSubscriber) PushBlock(blk *Block)

func (*TestSubscriber) PushError

func (s *TestSubscriber) PushError(err error)

func (*TestSubscriber) Read

func (s *TestSubscriber) Read() (*Block, error)

func (*TestSubscriber) Start

func (s *TestSubscriber) Start(channelSize int)

func (*TestSubscriber) StartAtBlockID

func (s *TestSubscriber) StartAtBlockID(ID string) bool

func (*TestSubscriber) Started

func (s *TestSubscriber) Started() bool

func (*TestSubscriber) WaitFor

func (s *TestSubscriber) WaitFor(ID string) <-chan interface{}

type TimeThresholdGator

type TimeThresholdGator struct {
	// contains filtered or unexported fields
}

func NewTimeThresholdGator

func NewTimeThresholdGator(threshold time.Duration) *TimeThresholdGator

func (*TimeThresholdGator) Pass

func (g *TimeThresholdGator) Pass(block *Block) bool

func (*TimeThresholdGator) SetName

func (g *TimeThresholdGator) SetName(name string)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL