Documentation ¶
Index ¶
- type LedgerReporter
- type LiveSession
- func (s *LiveSession) GetArchive() historyarchive.ArchiveInterface
- func (s *LiveSession) GetLatestSuccessfullyProcessedLedger() (ledgerSequence uint32, processed bool)
- func (s *LiveSession) QueryLock()
- func (s *LiveSession) QueryUnlock()
- func (s *LiveSession) Resume(ledgerSequence uint32) error
- func (s *LiveSession) Run() error
- func (s *LiveSession) Shutdown()
- func (s *LiveSession) UpdateLock()
- func (s *LiveSession) UpdateUnlock()
- type Session
- type SingleLedgerSession
- func (s *SingleLedgerSession) QueryLock()
- func (s *SingleLedgerSession) QueryUnlock()
- func (s *SingleLedgerSession) Resume(ledgerSequence uint32) error
- func (s *SingleLedgerSession) Run() error
- func (s *SingleLedgerSession) Shutdown()
- func (s *SingleLedgerSession) UpdateLock()
- func (s *SingleLedgerSession) UpdateUnlock()
- type StateReporter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type LedgerReporter ¶
type LedgerReporter interface { // OnNewLedger is called when the session begins processing // a ledger at the given sequence number. OnNewLedger(sequence uint32) // OnLedgerEntry is called when the session processes a transaction // from the io.LedgerReader OnLedgerTransaction() // OnEndLedger is called when the session finishes processing // a ledger. // if err is not nil it means that the session stoped processing the // ledger because of an error. // if shutdown is true it means the session stopped processing the // ledger because it received a shutdown signal OnEndLedger(err error, shutdown bool) }
LedgerReporter can be used by a session to log progress or update metrics as the session runs its ledger pipelines.
type LiveSession ¶
type LiveSession struct { Archive historyarchive.ArchiveInterface LedgerBackend ledgerbackend.LedgerBackend PaydexCoreClient *paydexcore.Client // PaydexCoreCursor defines cursor name used in `setcursor` command of // Paydex-core. If you run multiple sessions against a single paydex-core // instance the cursor name needs to be different in each session. PaydexCoreCursor string StatePipeline *pipeline.StatePipeline StateReporter StateReporter LedgerPipeline *pipeline.LedgerPipeline LedgerReporter LedgerReporter // TempSet is a store used to hold temporary objects generated during // state processing. If nil, defaults to io.MemoryTempSet. TempSet io.TempSet // MaxStreamRetries determines how many times the reader will retry when encountering // errors while streaming xdr bucket entries from the history archive. // Default MaxStreamRetries value (0) means that there should be no retry attempts MaxStreamRetries int // contains filtered or unexported fields }
LiveSession initializes the ledger state using `Archive` and `statePipeline`, then starts processing ledger data using `LedgerBackend` and `ledgerPipeline`.
func (*LiveSession) GetArchive ¶
func (s *LiveSession) GetArchive() historyarchive.ArchiveInterface
GetArchive returns the archive configured for the current session
func (*LiveSession) GetLatestSuccessfullyProcessedLedger ¶
func (s *LiveSession) GetLatestSuccessfullyProcessedLedger() (ledgerSequence uint32, processed bool)
GetLatestSuccessfullyProcessedLedger returns the last SUCCESSFULLY processed ledger. Returns (0, false) if no ledgers have been successfully processed yet to prevent situations where `GetLatestSuccessfullyProcessedLedger()` value is not properly checked in a loop resulting in ingesting ledger 0+1=1. Please check `Resume` godoc to understand possible implications.
func (*LiveSession) QueryUnlock ¶
func (s *LiveSession) QueryUnlock()
func (*LiveSession) Resume ¶
func (s *LiveSession) Resume(ledgerSequence uint32) error
Resume resumes the session from `ledgerSequence`. Returns nil when session has been shutdown.
WARNING: it's likely that developers will use `GetLatestSuccessfullyProcessedLedger()` to get the latest successfuly processed ledger after `Resume` returns error. It's critical to understand that `GetLatestSuccessfullyProcessedLedger()` will return `(0, false)` when no ledgers have been successfully processed, ex. error while trying to process a ledger after application restart. You should always check if the second returned value is equal `false` before overwriting your local variable.
func (*LiveSession) Run ¶
func (s *LiveSession) Run() error
Run runs the session starting from the last checkpoint ledger. Returns nil when session has been shutdown.
func (*LiveSession) Shutdown ¶
func (s *LiveSession) Shutdown()
Shutdown gracefully stops the pipelines and the session. This method blocks until pipelines are gracefully shutdown.
func (*LiveSession) UpdateLock ¶
func (s *LiveSession) UpdateLock()
func (*LiveSession) UpdateUnlock ¶
func (s *LiveSession) UpdateUnlock()
type Session ¶
type Session interface { // Run start the session and works as long as the session is active. If // you want to resume a session use Resume(). Run() error // Resume resumes the session at ledger with a given sequence. It's up to // Session user to determine what was the last ledger processed by a // Session as it's stateless (or if Run() should be called first). Resume(ledgerSequence uint32) error // Shutdown gracefully stops running session and stops all internal // objects. // Calling Shutdown() does not trigger post processing hooks. Shutdown() // QueryLock locks the session for sending a query. This ensures a consistent // view when data is saved to multiple stores (ex. postgres/redis/memory). // TODO this is fine for a demo but we need to check if it works for systems // that don't provide strong consistency. This may also slow down readers // if commiting data to stores take longer time. This probably doesn't work // in distributed apps. QueryLock() // QueryUnlock unlocks read lock of the session. QueryUnlock() // UpdateLock locks the session for updating data. This ensures a consistent // view when data is saved to multiple stores (ex. postgres/redis/memory). // TODO this is fine for a demo but we need to check if it works for systems // that don't provide strong consistency. This may also slow down readers // if commiting data to stores take longer time. This probably doesn't work // in distributed apps. UpdateLock() // UpdateUnlock unlocks write lock of the session. UpdateUnlock() }
Session is an implementation of a ingesting scenario. Some useful sessions can be found in this package.
type SingleLedgerSession ¶
type SingleLedgerSession struct { Archive *historyarchive.Archive LedgerSequence uint32 StatePipeline *pipeline.StatePipeline StateReporter StateReporter // TempSet is a store used to hold temporary objects generated during // state processing. If nil, defaults to io.MemoryTempSet. TempSet io.TempSet // MaxStreamRetries determines how many times the reader will retry when encountering // errors while streaming xdr bucket entries from the history archive. // Set MaxStreamRetries to 0 if there should be no retry attempts MaxStreamRetries int // contains filtered or unexported fields }
SingleLedgerSession initializes the ledger state using `Archive` and `StatePipeline` and terminates. Useful for finding stats for a single ledger. Set `LedgerSequence` to `0` to process the latest checkpoint.
func (*SingleLedgerSession) QueryUnlock ¶
func (s *SingleLedgerSession) QueryUnlock()
func (*SingleLedgerSession) Resume ¶
func (s *SingleLedgerSession) Resume(ledgerSequence uint32) error
func (*SingleLedgerSession) Run ¶
func (s *SingleLedgerSession) Run() error
func (*SingleLedgerSession) UpdateLock ¶
func (s *SingleLedgerSession) UpdateLock()
func (*SingleLedgerSession) UpdateUnlock ¶
func (s *SingleLedgerSession) UpdateUnlock()
type StateReporter ¶
type StateReporter interface { // OnStartState is called when the session begins processing // a history archive snapshot at the given sequence number. OnStartState(sequence uint32) // OnStateEntry is called when the session processes an entry // from the io.StateReader OnStateEntry() // OnEndState is called when the session finishes processing // a history archive snapshot. // if err is not nil it means that the session stoped processing the // history archive snapshot because of an error. // if shutdown is true it means the session stopped processing the // history archive snapshot because it received a shutdown signal OnEndState(err error, shutdown bool) }
StateReporter can be used by a session to log progress or update metrics as the session runs its state pipelines.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
processors package contains the most commonly used processors.
|
processors package contains the most commonly used processors. |
Package verify provides helpers used for verifying if the ingested data is correct.
|
Package verify provides helpers used for verifying if the ingested data is correct. |