worker

package
v0.0.0-...-020e20f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Overview

Package worker defines miner node logic for database storage and sql-chain accounting integration.

Index

Constants

View Source
const (
	// StorageFileName defines storage file name of database instance.
	StorageFileName = "storage.db3"

	// KayakWalFileName defines log pool name of database instance.
	KayakWalFileName = "kayak.ldb"

	// SQLChainFileName defines sqlchain storage file name.
	SQLChainFileName = "chain.db"

	// MaxRecordedConnectionSequences defines the max connection slots to anti reply attack.
	MaxRecordedConnectionSequences = 1000

	// PrepareThreshold defines the prepare complete threshold.
	PrepareThreshold = 1.0

	// CommitThreshold defines the commit complete threshold.
	CommitThreshold = 0.0

	// PrepareTimeout defines the prepare timeout config.
	PrepareTimeout = 10 * time.Second

	// CommitTimeout defines the commit timeout config.
	CommitTimeout = time.Minute

	// LogWaitTimeout defines the missing log wait timeout config.
	LogWaitTimeout = 10 * time.Second

	// SlowQuerySampleSize defines the maximum slow query log size (default: 1KB).
	SlowQuerySampleSize = 1 << 10
)
View Source
const (
	// DBKayakRPCName defines rpc service name of database internal consensus.
	DBKayakRPCName = "DBC" // aka. database consensus

	// DBMetaFileName defines dbms meta file name.
	DBMetaFileName = "db.meta"

	// DefaultSlowQueryTime defines the default slow query log time
	DefaultSlowQueryTime = time.Second * 5
)
View Source
const (
	// DBKayakApplyMethodName defines the database kayak apply rpc method name.
	DBKayakApplyMethodName = "Apply"
	// DBKayakFetchMethodName defines the database kayak fetch rpc method name.
	DBKayakFetchMethodName = "Fetch"
)

Variables

View Source
var (
	// ErrInvalidRequest defines invalid request structure during request.
	ErrInvalidRequest = errors.New("invalid request supplied")
	// ErrInvalidRequestSeq defines invalid sequence no of request.
	ErrInvalidRequestSeq = errors.New("invalid request sequence applied")
	// ErrAlreadyExists defines error on re-creating existing database instance.
	ErrAlreadyExists = errors.New("database instance already exists")
	// ErrNotExists defines errors on manipulating a non-exists database instance.
	ErrNotExists = errors.New("database instance not exists")
	// ErrInvalidDBConfig defines errors on received invalid db config from block producer.
	ErrInvalidDBConfig = errors.New("invalid database configuration")
	// ErrSpaceLimitExceeded defines errors on disk space exceeding limit.
	ErrSpaceLimitExceeded = errors.New("space limit exceeded")
	// ErrUnknownMuxRequest indicates that the a multiplexing request endpoint is not found.
	ErrUnknownMuxRequest = errors.New("unknown multiplexing request")
	// ErrPermissionDeny indicates that the requester has no permission to send read or write query.
	ErrPermissionDeny = errors.New("permission deny")
	// ErrInvalidPermission indicates that the requester sends a unrecognized permission.
	ErrInvalidPermission = errors.New("invalid permission")
	// ErrInvalidTransactionType indicates that the transaction type is invalid.
	ErrInvalidTransactionType = errors.New("invalid transaction type")
)
View Source
var (
	// DefaultMaxReqTimeGap defines max time gap between request and server.
	DefaultMaxReqTimeGap = time.Minute
)

Functions

This section is empty.

Types

type BusService

type BusService struct {
	chainbus.Bus
	// contains filtered or unexported fields
}

BusService defines the man chain bus service type.

func NewBusService

func NewBusService(
	ctx context.Context, addr proto.AccountAddress, checkInterval time.Duration) (_ *BusService,
)

NewBusService creates a new chain bus instance.

func (*BusService) GetCurrentDBMapping

func (bs *BusService) GetCurrentDBMapping() (dbMap map[proto.DatabaseID]*types.SQLChainProfile)

GetCurrentDBMapping returns current cached db mapping.

func (*BusService) RequestPermStat

func (bs *BusService) RequestPermStat(
	dbID proto.DatabaseID, user proto.AccountAddress) (permStat *types.PermStat, ok bool,
)

RequestPermStat fetches permission state from bus service.

func (*BusService) RequestSQLProfile

func (bs *BusService) RequestSQLProfile(dbID proto.DatabaseID) (p *types.SQLChainProfile, ok bool)

RequestSQLProfile get specified database profile.

func (*BusService) Start

func (bs *BusService) Start()

Start starts a chain bus service.

func (*BusService) Stop

func (bs *BusService) Stop()

Stop stops the chain bus service.

type DBConfig

type DBConfig struct {
	DatabaseID             proto.DatabaseID
	RootDir                string
	DataDir                string
	KayakMux               *DBKayakMuxService
	ChainMux               *sqlchain.MuxService
	MaxWriteTimeGap        time.Duration
	EncryptionKey          string
	SpaceLimit             uint64
	UpdateBlockCount       uint64
	LastBillingHeight      int32
	UseEventualConsistency bool
	ConsistencyLevel       float64
	IsolationLevel         int
	SlowQueryTime          time.Duration
}

DBConfig defines the database config.

type DBKayakMuxService

type DBKayakMuxService struct {
	// contains filtered or unexported fields
}

DBKayakMuxService defines a mux service for sqlchain kayak.

func NewDBKayakMuxService

func NewDBKayakMuxService(serviceName string, server *rpc.Server) (s *DBKayakMuxService, err error)

NewDBKayakMuxService returns a new kayak mux service.

func (*DBKayakMuxService) Apply

func (s *DBKayakMuxService) Apply(req *kt.ApplyRequest, _ *interface{}) (err error)

Apply handles kayak apply call.

func (*DBKayakMuxService) Fetch

func (s *DBKayakMuxService) Fetch(req *kt.FetchRequest, resp *kt.FetchResponse) (err error)

Fetch handles kayak fetch call.

type DBMS

type DBMS struct {
	// contains filtered or unexported fields
}

DBMS defines a database management instance.

func NewDBMS

func NewDBMS(cfg *DBMSConfig) (dbms *DBMS, err error)

NewDBMS returns new database management instance.

func (*DBMS) Ack

func (dbms *DBMS) Ack(ack *types.Ack) (err error)

Ack handles ack of previous response.

func (*DBMS) Create

func (dbms *DBMS) Create(instance *types.ServiceInstance, cleanup bool) (err error)

Create add new database to the miner dbms.

func (*DBMS) Drop

func (dbms *DBMS) Drop(dbID proto.DatabaseID) (err error)

Drop remove database from the miner dbms.

func (*DBMS) Init

func (dbms *DBMS) Init() (err error)

Init defines dbms init logic.

func (*DBMS) Query

func (dbms *DBMS) Query(req *types.Request) (res *types.Response, err error)

Query handles query request in dbms.

func (*DBMS) Shutdown

func (dbms *DBMS) Shutdown() (err error)

Shutdown defines dbms shutdown logic.

func (*DBMS) Update

func (dbms *DBMS) Update(instance *types.ServiceInstance) (err error)

Update apply the new peers config to dbms.

func (*DBMS) UpdatePermission

func (dbms *DBMS) UpdatePermission(dbID proto.DatabaseID, user proto.AccountAddress, permStat *types.PermStat) (err error)

UpdatePermission exports the update permission interface for test.

type DBMSConfig

type DBMSConfig struct {
	RootDir          string
	Server           *mux.Server
	DirectServer     *rpc.Server // optional server to provide DBMS service
	MaxReqTimeGap    time.Duration
	OnCreateDatabase func()
}

DBMSConfig defines the local multi-database management system config.

type DBMSMeta

type DBMSMeta struct {
	DBS map[proto.DatabaseID]bool
}

DBMSMeta defines the meta structure.

func NewDBMSMeta

func NewDBMSMeta() (meta *DBMSMeta)

NewDBMSMeta returns new DBMSMeta struct.

type DBMSRPCService

type DBMSRPCService struct {
	// contains filtered or unexported fields
}

DBMSRPCService is the rpc endpoint of database management.

func NewDBMSRPCService

func NewDBMSRPCService(
	serviceName string, server *mux.Server, direct *rpc.Server, dbms *DBMS,
) (
	service *DBMSRPCService,
)

NewDBMSRPCService returns new dbms rpc service endpoint.

func (*DBMSRPCService) Ack

func (rpc *DBMSRPCService) Ack(ack *types.Ack, _ *types.AckResponse) (err error)

Ack rpc, called by client to confirm read request.

func (*DBMSRPCService) Deploy

func (rpc *DBMSRPCService) Deploy(req *types.UpdateService, _ *types.UpdateServiceResponse) (err error)

Deploy rpc, called by BP to create/drop database and update peers.

func (*DBMSRPCService) ObserverFetchBlock

func (rpc *DBMSRPCService) ObserverFetchBlock(req *ObserverFetchBlockReq, resp *ObserverFetchBlockResp) (err error)

ObserverFetchBlock handles observer fetch block logic.

func (*DBMSRPCService) Query

func (rpc *DBMSRPCService) Query(req *types.Request, res *types.Response) (err error)

Query rpc, called by client to issue read/write query.

type Database

type Database struct {
	// contains filtered or unexported fields
}

Database defines a single database instance in worker runtime.

func NewDatabase

func NewDatabase(cfg *DBConfig, peers *proto.Peers,
	genesis *types.Block) (db *Database, err error)

NewDatabase create a single database instance using config.

func (*Database) Ack

func (db *Database) Ack(ack *types.Ack) (err error)

Ack defines client response ack interface.

func (*Database) Check

func (db *Database) Check(rawReq interface{}) (err error)

Check implements kayak.types.Handler.Check.

func (*Database) Commit

func (db *Database) Commit(rawReq interface{}, isLeader bool) (result interface{}, err error)

Commit implements kayak.types.Handler.Commit.

func (*Database) DecodePayload

func (db *Database) DecodePayload(data []byte) (request interface{}, err error)

DecodePayload implements kayak.types.Handler.DecodePayload.

func (*Database) Destroy

func (db *Database) Destroy() (err error)

Destroy stop database instance and destroy all data/meta.

func (*Database) EncodePayload

func (db *Database) EncodePayload(request interface{}) (data []byte, err error)

EncodePayload implements kayak.types.Handler.EncodePayload.

func (*Database) Query

func (db *Database) Query(request *types.Request) (response *types.Response, err error)

Query defines database query interface.

func (*Database) Shutdown

func (db *Database) Shutdown() (err error)

Shutdown stop database handles and stop service the database.

func (*Database) UpdatePeers

func (db *Database) UpdatePeers(peers *proto.Peers) (err error)

UpdatePeers defines peers update query interface.

type ObserverFetchBlockReq

type ObserverFetchBlockReq struct {
	proto.Envelope
	proto.DatabaseID
	Count int32 // sqlchain block serial number since genesis block (0)
}

ObserverFetchBlockReq defines the request for observer to fetch block.

type ObserverFetchBlockResp

type ObserverFetchBlockResp struct {
	Count int32 // sqlchain block serial number since genesis block (0)
	Block *types.Block
}

ObserverFetchBlockResp defines the response for observer to fetch block.

type TrackerAndResponse

type TrackerAndResponse struct {
	Tracker  *x.QueryTracker
	Response *types.Response
}

TrackerAndResponse defines a query tracker used by xenomint and an unsigned response.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL