stream

package
v1.2.1-0...-f94cca8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2021 License: BSD-3-Clause Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrUnknownProcessorType is returned when encountering a client type with no
	// known implementation
	ErrUnknownProcessorType = errors.New("unknown processor type")

	// ErrNoMessage is no message
	ErrNoMessage = errors.New("no message")
)
View Source
var (
	ErrUnknownVM = errors.New("unknown VM")

	ErrInvalidTopicName    = errors.New("invalid topic name")
	ErrWrongTopicEventType = errors.New("wrong topic event type")
	ErrWrongTopicNetworkID = errors.New("wrong topic networkID")
)

Functions

func GetTopicName

func GetTopicName(networkID uint32, chainID string, eventType EventType) string

func NewMessage

func NewMessage(id string,
	chainID string,
	body []byte,
	timestamp int64,
	nanosecond int64,
) services.Consumable

func NewMessageWithKafka

func NewMessageWithKafka(id string,
	chainID string,
	body []byte,
	timestamp int64,
	nanosecond int64,
	kafkaMessage *kafkaMessage.Message,
) services.Consumable

func NewProducerCChain

func NewProducerCChain() utils.ListenCloserFactory

Types

type BufferContainer

type BufferContainer struct {
	// contains filtered or unexported fields
}

type EventType

type EventType string
const (
	EventTypeConsensus EventType = "consensus"
	EventTypeDecisions EventType = "decisions"
)

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is a message on the event stream

func (*Message) Body

func (m *Message) Body() []byte

func (*Message) ChainID

func (m *Message) ChainID() string

func (*Message) ID

func (m *Message) ID() string

func (*Message) KafkaMessage

func (m *Message) KafkaMessage() *kafkaMessage.Message

func (*Message) Nanosecond

func (m *Message) Nanosecond() int64

func (*Message) Timestamp

func (m *Message) Timestamp() int64

type Processor

type Processor interface {
	ProcessNextMessage() error
	Close() error
	Failure()
	Success()
	ID() string
}

Processor handles writing and reading to/from the event stream

func NewConsensusProducerProcessor

func NewConsensusProducerProcessor(sc *services.Control, conf cfg.Config, chainVM string, chainID string, _ int, _ int) (Processor, error)

NewConsensusProducerProcessor creates a producer for consensus events

func NewDecisionsProducerProcessor

func NewDecisionsProducerProcessor(sc *services.Control, conf cfg.Config, chainVM string, chainID string, _ int, _ int) (Processor, error)

NewDecisionsProducerProcessor creates a producer for decision events

type ProcessorDB

type ProcessorDB interface {
	Process(*services.Connections, *services.TxPool) error
	Close() error
	ID() string
	Topic() []string
}

type ProcessorFactory

type ProcessorFactory func(*services.Control, cfg.Config, string, string, int, int) (Processor, error)

ProcessorFactory takes in configuration and returns a stream Processor

type ProcessorFactoryChainDB

type ProcessorFactoryChainDB func(*services.Control, cfg.Config, string, string) (ProcessorDB, error)

func NewConsumerDBFactory

func NewConsumerDBFactory(factory serviceConsumerFactory, eventType EventType) ProcessorFactoryChainDB

NewConsumerFactory returns a processorFactory for the given service consumer

type ProcessorFactoryInstDB

type ProcessorFactoryInstDB func(*services.Control, cfg.Config) (ProcessorDB, error)

func NewConsumerCChainDB

func NewConsumerCChainDB() ProcessorFactoryInstDB

type ProcessorManager

type ProcessorManager struct {
	// contains filtered or unexported fields
}

ProcessorManager supervises the Processor lifecycle; it will use the given configuration and ProcessorFactory to keep a Processor active

func NewProcessorManager

func NewProcessorManager(sc *services.Control, conf cfg.Config, factory ProcessorFactory, idx int, maxIdx int) *ProcessorManager

NewProcessorManager creates a new *ProcessorManager ready for listening

func (*ProcessorManager) Close

func (c *ProcessorManager) Close() error

Close tells the workers to shutdown and waits for them to all stop

func (*ProcessorManager) Listen

func (c *ProcessorManager) Listen() error

Listen sets a client to listen for and handle incoming messages

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

producer reads from the socket and writes to the event stream

func NewProducer

func NewProducer(sc *services.Control, conf cfg.Config, _ string, chainID string, eventType EventType) (*Producer, error)

NewProducer creates a producer using the given config

func (*Producer) Close

func (p *Producer) Close() error

Close shuts down the producer

func (*Producer) Failure

func (p *Producer) Failure()

func (*Producer) ID

func (p *Producer) ID() string

func (*Producer) ProcessNextMessage

func (p *Producer) ProcessNextMessage() error

ProcessNextMessage takes in a Message from the IPC socket and writes it to the db

func (*Producer) Success

func (p *Producer) Success()

type ProducerCChain

type ProducerCChain struct {
	// contains filtered or unexported fields
}

func (*ProducerCChain) Close

func (p *ProducerCChain) Close() error

Close shuts down the producer

func (*ProducerCChain) Failure

func (p *ProducerCChain) Failure()

func (*ProducerCChain) ID

func (p *ProducerCChain) ID() string

func (*ProducerCChain) Listen

func (p *ProducerCChain) Listen() error

func (*ProducerCChain) ProcessNextMessage

func (p *ProducerCChain) ProcessNextMessage() error

func (*ProducerCChain) Success

func (p *ProducerCChain) Success()

type TracerParam

type TracerParam struct {
	Tracer string `json:"tracer"`
}

type WorkPacket

type WorkPacket struct {
	// contains filtered or unexported fields
}

type WorkPacketCChain

type WorkPacketCChain struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL