ingest

package
v0.0.0-...-a27ef0f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2019 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type LedgerReporter

type LedgerReporter interface {
	// OnNewLedger is called when the session begins processing
	// a ledger at the given sequence number.
	OnNewLedger(sequence uint32)
	// OnLedgerEntry is called when the session processes a transaction
	// from the io.LedgerReader
	OnLedgerTransaction()
	// OnEndLedger is called when the session finishes processing
	// a ledger.
	// if err is not nil it means that the session stoped processing the
	// ledger because of an error.
	// if shutdown is true it means the session stopped processing the
	// ledger because it received a shutdown signal
	OnEndLedger(err error, shutdown bool)
}

LedgerReporter can be used by a session to log progress or update metrics as the session runs its ledger pipelines.

type LiveSession

type LiveSession struct {
	Archive           historyarchive.ArchiveInterface
	LedgerBackend     ledgerbackend.LedgerBackend
	StellarCoreClient *stellarcore.Client
	// StellarCoreCursor defines cursor name used in `setcursor` command of
	// stellar-core. If you run multiple sessions against a single stellar-core
	// instance the cursor name needs to be different in each session.
	StellarCoreCursor string
	StatePipeline     *pipeline.StatePipeline
	StateReporter     StateReporter
	LedgerPipeline    *pipeline.LedgerPipeline
	LedgerReporter    LedgerReporter
	// TempSet is a store used to hold temporary objects generated during
	// state processing. If nil, defaults to io.MemoryTempSet.
	TempSet io.TempSet
	// contains filtered or unexported fields
}

LiveSession initializes the ledger state using `Archive` and `statePipeline`, then starts processing ledger data using `LedgerBackend` and `ledgerPipeline`.

func (*LiveSession) GetLatestSuccessfullyProcessedLedger

func (s *LiveSession) GetLatestSuccessfullyProcessedLedger() uint32

func (*LiveSession) QueryLock

func (s *LiveSession) QueryLock()

func (*LiveSession) QueryUnlock

func (s *LiveSession) QueryUnlock()

func (*LiveSession) Resume

func (s *LiveSession) Resume(ledgerSequence uint32) error

func (*LiveSession) Run

func (s *LiveSession) Run() error

func (*LiveSession) Shutdown

func (s *LiveSession) Shutdown()

func (*LiveSession) UpdateLock

func (s *LiveSession) UpdateLock()

func (*LiveSession) UpdateUnlock

func (s *LiveSession) UpdateUnlock()

type Session

type Session interface {
	// Run start the session and works as long as the session is active. If
	// you want to resume a session use Resume().
	Run() error
	// Resume resumes the session at ledger with a given sequence. It's up to
	// Session user to determine what was the last ledger processed by a
	// Session as it's stateless (or if Run() should be called first).
	Resume(ledgerSequence uint32) error
	// GetLatestSuccessfullyProcessedLedger returns the ledger sequence of the
	// latest successfully processed ledger in the session. Return 0 if no
	// ledgers were processed yet.
	// Please note that this value is not synchronized with pipeline hooks.
	GetLatestSuccessfullyProcessedLedger() uint32
	// Shutdown gracefully stops running session and stops all internal
	// objects.
	// Calling Shutdown() does not trigger post processing hooks.
	Shutdown()
	// QueryLock locks the session for sending a query. This ensures a consistent
	// view when data is saved to multiple stores (ex. postgres/redis/memory).
	// TODO this is fine for a demo but we need to check if it works for systems
	// that don't provide strong consistency. This may also slow down readers
	// if commiting data to stores take longer time. This probably doesn't work
	// in distributed apps.
	QueryLock()
	// QueryUnlock unlocks read lock of the session.
	QueryUnlock()
	// UpdateLock locks the session for updating data. This ensures a consistent
	// view when data is saved to multiple stores (ex. postgres/redis/memory).
	// TODO this is fine for a demo but we need to check if it works for systems
	// that don't provide strong consistency. This may also slow down readers
	// if commiting data to stores take longer time. This probably doesn't work
	// in distributed apps.
	UpdateLock()
	// UpdateUnlock unlocks write lock of the session.
	UpdateUnlock()
}

Session is an implementation of a ingesting scenario. Some useful sessions can be found in this package.

type SingleLedgerSession

type SingleLedgerSession struct {
	Archive        *historyarchive.Archive
	LedgerSequence uint32
	StatePipeline  *pipeline.StatePipeline
	StateReporter  StateReporter
	// TempSet is a store used to hold temporary objects generated during
	// state processing. If nil, defaults to io.MemoryTempSet.
	TempSet io.TempSet
	// contains filtered or unexported fields
}

SingleLedgerSession initializes the ledger state using `Archive` and `StatePipeline` and terminates. Useful for finding stats for a single ledger. Set `LedgerSequence` to `0` to process the latest checkpoint.

func (*SingleLedgerSession) GetLatestSuccessfullyProcessedLedger

func (s *SingleLedgerSession) GetLatestSuccessfullyProcessedLedger() uint32

func (*SingleLedgerSession) QueryLock

func (s *SingleLedgerSession) QueryLock()

func (*SingleLedgerSession) QueryUnlock

func (s *SingleLedgerSession) QueryUnlock()

func (*SingleLedgerSession) Resume

func (s *SingleLedgerSession) Resume(ledgerSequence uint32) error

func (*SingleLedgerSession) Run

func (s *SingleLedgerSession) Run() error

func (*SingleLedgerSession) Shutdown

func (s *SingleLedgerSession) Shutdown()

func (*SingleLedgerSession) UpdateLock

func (s *SingleLedgerSession) UpdateLock()

func (*SingleLedgerSession) UpdateUnlock

func (s *SingleLedgerSession) UpdateUnlock()

type StateReporter

type StateReporter interface {
	// OnStartState is called when the session begins processing
	// a history archive snapshot at the given sequence number.
	OnStartState(sequence uint32)
	// OnStateEntry is called when the session processes an entry
	// from the io.StateReader
	OnStateEntry()
	// OnEndState is called when the session finishes processing
	// a history archive snapshot.
	// if err is not nil it means that the session stoped processing the
	// history archive snapshot because of an error.
	// if shutdown is true it means the session stopped processing the
	// history archive snapshot because it received a shutdown signal
	OnEndState(err error, shutdown bool)
}

StateReporter can be used by a session to log progress or update metrics as the session runs its state pipelines.

Directories

Path Synopsis
processors package contains the most commonly used processors.
processors package contains the most commonly used processors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL