persistence

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2023 License: MIT Imports: 18 Imported by: 1

Documentation

Index

Constants

View Source
const (
	TimestampLength   = 8
	HashLength        = 32
	DigestLength      = HashLength
	PubsubTopicLength = HashLength
	DBKeyLength       = TimestampLength + PubsubTopicLength + DigestLength
)
View Source
const (
	UndefinedDriver = iota
	PostgresDriver
	SQLiteDriver
)
View Source
const MaxTimeVariance = time.Duration(20) * time.Second

MaxTimeVariance is the maximum duration in the future allowed for a message timestamp

View Source
const WALMode = "wal"

WALMode for sqlite.

Variables

View Source
var ErrFutureMessage = errors.New("message timestamp in the future")

ErrFutureMessage indicates that a message with timestamp in future was requested to be stored

View Source
var (
	// ErrInvalidByteSize is returned when DBKey can't be created
	// from a byte slice because it has invalid length.
	ErrInvalidByteSize = errors.New("byte slice has invalid length")
)
View Source
var ErrInvalidCursor = errors.New("invalid cursor")

ErrInvalidCursor indicates that an invalid cursor has been passed to access store

View Source
var ErrMessageTooOld = errors.New("message too old")

ErrMessageTooOld indicates that a message that was too old was requested to be stored.

Functions

func GetDriverType added in v0.9.0

func GetDriverType(db *sql.DB) int

Types

type ConnectionPoolOptions

type ConnectionPoolOptions struct {
	MaxOpenConnections    int
	MaxIdleConnections    int
	ConnectionMaxLifetime time.Duration
	ConnectionMaxIdleTime time.Duration
}

ConnectionPoolOptions is the options to be used for DB connection pooling

type DBKey

type DBKey struct {
	// contains filtered or unexported fields
}

DBKey key to be stored in a db.

func NewDBKey

func NewDBKey(senderTimestamp uint64, receiverTimestamp uint64, pubsubTopic string, digest []byte) *DBKey

NewDBKey creates a new DBKey with the given values.

func (*DBKey) Bytes

func (k *DBKey) Bytes() []byte

Bytes returns a bytes representation of the DBKey.

type DBOption

type DBOption func(*DBStore) error

DBOption is an optional setting that can be used to configure the DBStore

func DefaultOptions

func DefaultOptions() []DBOption

DefaultOptions returns the default DBoptions to be used.

func WithDB

func WithDB(db *sql.DB) DBOption

WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore.

func WithDriver

func WithDriver(driverName string, datasourceName string, connectionPoolOptions ...ConnectionPoolOptions) DBOption

WithDriver is a DBOption that will open a *sql.DB connection

func WithMigrations added in v0.4.0

func WithMigrations(migrationFn MigrationFn) DBOption

WithMigrations is a DBOption used to determine if migrations should be executed, and what driver to use

func WithRetentionPolicy

func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption

WithRetentionPolicy is a DBOption that specifies the max number of messages to be stored and duration before they're removed from the message store

type DBStore

type DBStore struct {
	MessageProvider
	// contains filtered or unexported fields
}

DBStore is a MessageProvider that has a *sql.DB connection

func NewDBStore

func NewDBStore(reg prometheus.Registerer, log *zap.Logger, options ...DBOption) (*DBStore, error)

Creates a new DB store using the db specified via options. It will create a messages table if it does not exist and clean up records according to the retention policy used

func (*DBStore) Count

func (d *DBStore) Count() (int, error)

Count returns the number of rows in the message table

func (*DBStore) GetAll

func (d *DBStore) GetAll() ([]StoredMessage, error)

GetAll returns all the stored WakuMessages

func (*DBStore) GetStoredMessage

func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error)

GetStoredMessage is a helper function used to convert a `*sql.Rows` into a `StoredMessage`

func (*DBStore) MostRecentTimestamp

func (d *DBStore) MostRecentTimestamp() (int64, error)

MostRecentTimestamp returns an unix timestamp with the most recent senderTimestamp in the message table

func (*DBStore) Put

func (d *DBStore) Put(env *protocol.Envelope) error

Put inserts a WakuMessage into the DB

func (*DBStore) Query

func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, error)

Query retrieves messages from the DB

func (*DBStore) Start

func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) error

Start starts the store server functionality

func (*DBStore) Stop

func (d *DBStore) Stop()

Stop closes a DB connection

func (*DBStore) Validate added in v0.6.0

func (d *DBStore) Validate(env *protocol.Envelope) error

Validate validates the message to be stored against possible fradulent conditions.

type Hash

type Hash [HashLength]byte

type MessageProvider

type MessageProvider interface {
	GetAll() ([]StoredMessage, error)
	Validate(env *protocol.Envelope) error
	Put(env *protocol.Envelope) error
	Query(query *pb.HistoryQuery) ([]StoredMessage, error)
	MostRecentTimestamp() (int64, error)
	Start(ctx context.Context, timesource timesource.Timesource) error
	Stop()
}

MessageProvider is an interface that provides access to store/retrieve messages from a persistence store.

type Metrics added in v0.8.0

type Metrics interface {
	RecordMessage(num int)
	RecordError(err metricsErrCategory)
	RecordInsertDuration(duration time.Duration)
	RecordQueryDuration(duration time.Duration)
}

Metrics exposes the functions required to update prometheus metrics for archive protocol

type MigrationFn added in v0.8.0

type MigrationFn func(db *sql.DB) error

type Queries added in v0.8.0

type Queries struct {
	// contains filtered or unexported fields
}

Queries are the SQL queries for a given table.

func CreateQueries added in v0.8.0

func CreateQueries(tbl string) *Queries

CreateQueries Function creates a set of queries for an SQL table. Note: Do not use this function to create queries for a table, rather use <rdb>.NewQueries to create table as well as queries.

func (Queries) Delete added in v0.8.0

func (q Queries) Delete() string

Delete returns the query for deleting a row.

func (Queries) Exists added in v0.8.0

func (q Queries) Exists() string

Exists returns the query for determining if a row exists.

func (Queries) Get added in v0.8.0

func (q Queries) Get() string

Get returns the query for getting a row.

func (Queries) GetSize added in v0.8.0

func (q Queries) GetSize() string

GetSize returns the query for determining the size of a value.

func (Queries) Limit added in v0.8.0

func (q Queries) Limit() string

Limit returns the query fragment for limiting results.

func (Queries) Offset added in v0.8.0

func (q Queries) Offset() string

Offset returns the query fragment for returning rows from a given offset.

func (Queries) Prefix added in v0.8.0

func (q Queries) Prefix() string

Prefix returns the query fragment for getting a rows with a key prefix.

func (Queries) Put added in v0.8.0

func (q Queries) Put() string

Put returns the query for putting a row.

func (Queries) Query added in v0.8.0

func (q Queries) Query() string

Query returns the query for getting multiple rows.

type StoredMessage

type StoredMessage struct {
	ID           []byte
	PubsubTopic  string
	ReceiverTime int64
	Message      *wpb.WakuMessage
}

StoredMessage is the format of the message stored in persistence store

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL