ingest

package
v0.0.0-...-fb9541f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2020 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type LedgerReporter

type LedgerReporter interface {
	// OnNewLedger is called when the session begins processing
	// a ledger at the given sequence number.
	OnNewLedger(sequence uint32)
	// OnLedgerEntry is called when the session processes a transaction
	// from the io.LedgerReader
	OnLedgerTransaction()
	// OnEndLedger is called when the session finishes processing
	// a ledger.
	// if err is not nil it means that the session stoped processing the
	// ledger because of an error.
	// if shutdown is true it means the session stopped processing the
	// ledger because it received a shutdown signal
	OnEndLedger(err error, shutdown bool)
}

LedgerReporter can be used by a session to log progress or update metrics as the session runs its ledger pipelines.

type LiveSession

type LiveSession struct {
	Archive          historyarchive.ArchiveInterface
	LedgerBackend    ledgerbackend.LedgerBackend
	PaydexCoreClient *paydexcore.Client
	// PaydexCoreCursor defines cursor name used in `setcursor` command of
	// Paydex-core. If you run multiple sessions against a single paydex-core
	// instance the cursor name needs to be different in each session.
	PaydexCoreCursor string
	StatePipeline    *pipeline.StatePipeline
	StateReporter    StateReporter
	LedgerPipeline   *pipeline.LedgerPipeline
	LedgerReporter   LedgerReporter
	// TempSet is a store used to hold temporary objects generated during
	// state processing. If nil, defaults to io.MemoryTempSet.
	TempSet io.TempSet
	// MaxStreamRetries determines how many times the reader will retry when encountering
	// errors while streaming xdr bucket entries from the history archive.
	// Default MaxStreamRetries value (0) means that there should be no retry attempts
	MaxStreamRetries int
	// contains filtered or unexported fields
}

LiveSession initializes the ledger state using `Archive` and `statePipeline`, then starts processing ledger data using `LedgerBackend` and `ledgerPipeline`.

func (*LiveSession) GetArchive

GetArchive returns the archive configured for the current session

func (*LiveSession) GetLatestSuccessfullyProcessedLedger

func (s *LiveSession) GetLatestSuccessfullyProcessedLedger() (ledgerSequence uint32, processed bool)

GetLatestSuccessfullyProcessedLedger returns the last SUCCESSFULLY processed ledger. Returns (0, false) if no ledgers have been successfully processed yet to prevent situations where `GetLatestSuccessfullyProcessedLedger()` value is not properly checked in a loop resulting in ingesting ledger 0+1=1. Please check `Resume` godoc to understand possible implications.

func (*LiveSession) QueryLock

func (s *LiveSession) QueryLock()

func (*LiveSession) QueryUnlock

func (s *LiveSession) QueryUnlock()

func (*LiveSession) Resume

func (s *LiveSession) Resume(ledgerSequence uint32) error

Resume resumes the session from `ledgerSequence`. Returns nil when session has been shutdown.

WARNING: it's likely that developers will use `GetLatestSuccessfullyProcessedLedger()` to get the latest successfuly processed ledger after `Resume` returns error. It's critical to understand that `GetLatestSuccessfullyProcessedLedger()` will return `(0, false)` when no ledgers have been successfully processed, ex. error while trying to process a ledger after application restart. You should always check if the second returned value is equal `false` before overwriting your local variable.

func (*LiveSession) Run

func (s *LiveSession) Run() error

Run runs the session starting from the last checkpoint ledger. Returns nil when session has been shutdown.

func (*LiveSession) Shutdown

func (s *LiveSession) Shutdown()

Shutdown gracefully stops the pipelines and the session. This method blocks until pipelines are gracefully shutdown.

func (*LiveSession) UpdateLock

func (s *LiveSession) UpdateLock()

func (*LiveSession) UpdateUnlock

func (s *LiveSession) UpdateUnlock()

type Session

type Session interface {
	// Run start the session and works as long as the session is active. If
	// you want to resume a session use Resume().
	Run() error
	// Resume resumes the session at ledger with a given sequence. It's up to
	// Session user to determine what was the last ledger processed by a
	// Session as it's stateless (or if Run() should be called first).
	Resume(ledgerSequence uint32) error
	// Shutdown gracefully stops running session and stops all internal
	// objects.
	// Calling Shutdown() does not trigger post processing hooks.
	Shutdown()
	// QueryLock locks the session for sending a query. This ensures a consistent
	// view when data is saved to multiple stores (ex. postgres/redis/memory).
	// TODO this is fine for a demo but we need to check if it works for systems
	// that don't provide strong consistency. This may also slow down readers
	// if commiting data to stores take longer time. This probably doesn't work
	// in distributed apps.
	QueryLock()
	// QueryUnlock unlocks read lock of the session.
	QueryUnlock()
	// UpdateLock locks the session for updating data. This ensures a consistent
	// view when data is saved to multiple stores (ex. postgres/redis/memory).
	// TODO this is fine for a demo but we need to check if it works for systems
	// that don't provide strong consistency. This may also slow down readers
	// if commiting data to stores take longer time. This probably doesn't work
	// in distributed apps.
	UpdateLock()
	// UpdateUnlock unlocks write lock of the session.
	UpdateUnlock()
}

Session is an implementation of a ingesting scenario. Some useful sessions can be found in this package.

type SingleLedgerSession

type SingleLedgerSession struct {
	Archive        *historyarchive.Archive
	LedgerSequence uint32
	StatePipeline  *pipeline.StatePipeline
	StateReporter  StateReporter
	// TempSet is a store used to hold temporary objects generated during
	// state processing. If nil, defaults to io.MemoryTempSet.
	TempSet io.TempSet
	// MaxStreamRetries determines how many times the reader will retry when encountering
	// errors while streaming xdr bucket entries from the history archive.
	// Set MaxStreamRetries to 0 if there should be no retry attempts
	MaxStreamRetries int
	// contains filtered or unexported fields
}

SingleLedgerSession initializes the ledger state using `Archive` and `StatePipeline` and terminates. Useful for finding stats for a single ledger. Set `LedgerSequence` to `0` to process the latest checkpoint.

func (*SingleLedgerSession) QueryLock

func (s *SingleLedgerSession) QueryLock()

func (*SingleLedgerSession) QueryUnlock

func (s *SingleLedgerSession) QueryUnlock()

func (*SingleLedgerSession) Resume

func (s *SingleLedgerSession) Resume(ledgerSequence uint32) error

func (*SingleLedgerSession) Run

func (s *SingleLedgerSession) Run() error

func (*SingleLedgerSession) Shutdown

func (s *SingleLedgerSession) Shutdown()

func (*SingleLedgerSession) UpdateLock

func (s *SingleLedgerSession) UpdateLock()

func (*SingleLedgerSession) UpdateUnlock

func (s *SingleLedgerSession) UpdateUnlock()

type StateReporter

type StateReporter interface {
	// OnStartState is called when the session begins processing
	// a history archive snapshot at the given sequence number.
	OnStartState(sequence uint32)
	// OnStateEntry is called when the session processes an entry
	// from the io.StateReader
	OnStateEntry()
	// OnEndState is called when the session finishes processing
	// a history archive snapshot.
	// if err is not nil it means that the session stoped processing the
	// history archive snapshot because of an error.
	// if shutdown is true it means the session stopped processing the
	// history archive snapshot because it received a shutdown signal
	OnEndState(err error, shutdown bool)
}

StateReporter can be used by a session to log progress or update metrics as the session runs its state pipelines.

Directories

Path Synopsis
processors package contains the most commonly used processors.
processors package contains the most commonly used processors.
Package verify provides helpers used for verifying if the ingested data is correct.
Package verify provides helpers used for verifying if the ingested data is correct.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL