indexer

package
v0.15.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

Node Indexers Manager

Initially the node periodically reports its data to the chain, data like capacity, uptime, location, ...etc and then the chain events is processed by graphql-processor to dump these data among with others data for farms/contracts/twins to a postgres database which we use to serve both graphql-api and proxy-api. Things looks fine, but when it comes to a bigger data like gpu/dmi it is not the best solution to store these data on the chain. And that what the Node-Indexers solves by periodically calling the nodes based on a configurable interval to get the data and store it on the same postgres database and then it can be served to apis. only proxy-api for now.

The indexer structure

Each indexer has two clients:

  • Database: a client to the postgres db.
  • RmbClient: an rmb client used to make the node calls.

three channels:

  • IdChan: it collects the twin ids for the nodes the indexer will call.
  • ResultChan: it collects the results returned by the rmb call to the node.
  • BatchChan: transfer batches of results ready to directly upserted.

four types of workers:

  • Finder: this worker calls the database to filter nodes and push its data to the IdChan
  • Getter: this worker pop the twins from IdChan and call the node with the RmbClient to get data and then push the result to ResultChan
  • Batcher: this worker collect results from ResultChan in batches and send it to the BatchChan
  • Upserter: this worker get data from BatchChan then update/insert to the Database

The indexer struct is generic and each indexer functionality differ from the others based on its Work. Work a struct that implement the interface Work which have three methods:

  • Finders: this is a map of string and interval to decide which finders this node should use.
  • Get: a method that prepare the payload from rmb call and parse the response to return a ready db model data.
  • Upsert: calling the equivalent db upserting method with the ability to remove old expired data.

Registered Indexers

  1. Gpu indexer:

    • Function: query the gpu list on node.
    • Interval: 60 min
    • Other triggers: new node is added (check every 5m).
    • Default caller worker number: 5
    • Dump table: node_gpu
  2. Health indexer:

    • Function: decide the node health based on its internal state.
    • Interval: 5 min
    • Default caller worker number: 100
    • Dump table: health_report
  3. Dmi indexer:

    • Function: collect some hardware data from the node.
    • Interval: 1 day
    • Other triggers: new node is added (check every 5m).
    • Default caller worker number: 1
    • Dump table: dmi
  4. Speed indexer:

    • Function: get the network upload/download speed on the node tested against iperf server.
    • Interval: 5 min
    • Default caller worker number: 100
    • Dump table: speed

Documentation

Index

Constants

View Source
const (
	DmiCallCmd = "zos.system.dmi"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DMIWork

type DMIWork struct {
	// contains filtered or unexported fields
}

func NewDMIWork

func NewDMIWork(interval uint) *DMIWork

func (*DMIWork) Finders

func (w *DMIWork) Finders() map[string]time.Duration

func (*DMIWork) Get

func (w *DMIWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.Dmi, error)

func (*DMIWork) Upsert

func (w *DMIWork) Upsert(ctx context.Context, db db.Database, batch []types.Dmi) error

type Finder

type Finder func(context.Context, time.Duration, db.Database, chan uint32)

type GPUWork

type GPUWork struct {
	// contains filtered or unexported fields
}

func NewGPUWork

func NewGPUWork(interval uint) *GPUWork

func (*GPUWork) Finders

func (w *GPUWork) Finders() map[string]time.Duration

func (*GPUWork) Get

func (w *GPUWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodeGPU, error)

func (*GPUWork) Upsert

func (w *GPUWork) Upsert(ctx context.Context, db db.Database, batch []types.NodeGPU) error

type HealthWork

type HealthWork struct {
	// contains filtered or unexported fields
}

func NewHealthWork

func NewHealthWork(interval uint) *HealthWork

func (*HealthWork) Finders

func (w *HealthWork) Finders() map[string]time.Duration

func (*HealthWork) Get

func (w *HealthWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.HealthReport, error)

func (*HealthWork) Upsert

func (w *HealthWork) Upsert(ctx context.Context, db db.Database, batch []types.HealthReport) error

type Indexer

type Indexer[T any] struct {
	// contains filtered or unexported fields
}

func NewIndexer

func NewIndexer[T any](
	work Work[T],
	name string,
	db db.Database,
	rmb *peer.RpcClient,
	worker uint,
) *Indexer[T]

func (*Indexer[T]) Start

func (i *Indexer[T]) Start(ctx context.Context)

type Ipv6Work added in v0.15.1

type Ipv6Work struct {
	// contains filtered or unexported fields
}

func NewIpv6Work added in v0.15.1

func NewIpv6Work(interval uint) *Ipv6Work

func (*Ipv6Work) Finders added in v0.15.1

func (w *Ipv6Work) Finders() map[string]time.Duration

func (*Ipv6Work) Get added in v0.15.1

func (w *Ipv6Work) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.HasIpv6, error)

func (*Ipv6Work) Upsert added in v0.15.1

func (w *Ipv6Work) Upsert(ctx context.Context, db db.Database, batch []types.HasIpv6) error

type SpeedWork

type SpeedWork struct {
	// contains filtered or unexported fields
}

func NewSpeedWork

func NewSpeedWork(interval uint) *SpeedWork

func (*SpeedWork) Finders

func (w *SpeedWork) Finders() map[string]time.Duration

func (*SpeedWork) Get

func (w *SpeedWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.Speed, error)

func (*SpeedWork) Upsert

func (w *SpeedWork) Upsert(ctx context.Context, db db.Database, batch []types.Speed) error

type Work

type Work[T any] interface {
	Finders() map[string]time.Duration
	Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]T, error)
	Upsert(ctx context.Context, db db.Database, batch []T) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL