aggregator

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	TaskAlreadyInitializedErrorFn = func(taskIndex types.TaskIndex) error {
		return fmt.Errorf("task %d already initialized", taskIndex)
	}
	TaskExpiredError = func(taskIndex types.TaskIndex, threadholdInfo map[types.QuorumNum]ThresholdInfo) error {
		return fmt.Errorf("task#%v expired: %+v", taskIndex, threadholdInfo)
	}
	TaskNotFoundErrorFn = func(taskIndex types.TaskIndex) error {
		return fmt.Errorf("task %d not initialized or already completed", taskIndex)
	}
	OperatorNotPartOfTaskQuorumErrorFn = func(operatorId types.OperatorId) error {
		return fmt.Errorf("operator %x not part of task's quorum", operatorId)
	}
	SignatureVerificationError = func(operatorId types.OperatorId, err error) error {
		return fmt.Errorf("operator %x Failed to verify signature: %w", operatorId, err)
	}
	OperatorG2KeyNotFound = func(operatorId types.OperatorId) error {
		return fmt.Errorf("operator %x g2 key not found", operatorId)
	}
	IncorrectSignatureError = errors.New("Signature verification failed. Incorrect Signature.")
)

Functions

This section is empty.

Types

type Aggregator

type Aggregator struct {
	TaskManager *xtask.TaskManager

	Collector *xmetric.AggregatorCollector
	// contains filtered or unexported fields
}

func NewAggregator

func NewAggregator(ctx context.Context, cfg *Config) (*Aggregator, error)

func (*Aggregator) Start

func (agg *Aggregator) Start(ctx context.Context) error

type AggregatorApi

type AggregatorApi struct {
	// contains filtered or unexported fields
}

func (*AggregatorApi) FetchTask added in v0.2.0

func (a *AggregatorApi) FetchTask(ctx context.Context, req *FetchTaskReq) (*FetchTaskResp, error)

func (*AggregatorApi) SubmitMetrics added in v0.2.0

func (a *AggregatorApi) SubmitMetrics(ctx context.Context, req *SubmitMetricsReq) error

func (*AggregatorApi) SubmitTask

func (a *AggregatorApi) SubmitTask(ctx context.Context, req *TaskRequest) error

type AttestationLayer added in v0.5.0

type AttestationLayer struct {
	Version int
	Address common.Address
	RpcUrl  string
}

func (*AttestationLayer) Build added in v0.5.0

type AttestationLayerClient added in v0.5.0

type AttestationLayerClient struct {
	// contains filtered or unexported fields
}

type BlsAggregationServiceResponse added in v0.2.0

type BlsAggregationServiceResponse struct {
	Err                error                    // if Err is not nil, the other fields are not valid
	TaskIndex          types.TaskIndex          // unique identifier of the task
	TaskResponseDigest types.TaskResponseDigest // digest of the task response that was signed
	// The below 8 fields are the data needed to build the IBLSSignatureChecker.NonSignerStakesAndSignature struct
	// users of this service will need to build the struct themselves by converting the bls points
	// into the BN254.G1/G2Point structs that the IBLSSignatureChecker expects
	// given that those are different for each AVS service manager that individually inherits BLSSignatureChecker
	NonSignersPubkeysG1          []*bls.G1Point
	QuorumApksG1                 []*bls.G1Point
	SignersApkG2                 *bls.G2Point
	SignersAggSigG1              *bls.Signature
	NonSignerQuorumBitmapIndices []uint32
	QuorumApkIndices             []uint32
	TotalStakeIndices            []uint32
	NonSignerStakeIndices        [][]uint32
}

BlsAggregationServiceResponse is the response from the bls aggregation service

type BlsAggregatorService added in v0.2.0

type BlsAggregatorService struct {
	// contains filtered or unexported fields
}

BlsAggregatorService is a service that performs BLS signature aggregation for an AVS' tasks Assumptions:

  1. BlsAggregatorService only verifies digest signatures, so avs code needs to verify that the digest passed to ProcessNewSignature is indeed the digest of a valid taskResponse (see the comment above checkSignature for more details)
  2. BlsAggregatorService is VERY generic and makes very few assumptions about the tasks structure or the time at which operators will send their signatures. It is mostly suitable for offchain computation oracle (a la truebit) type of AVS, where tasks are sent onchain by users sporadically, and where new tasks can start even before the previous ones have finished aggregation. AVSs like eigenDA that have a much more controlled task submission schedule and where new tasks are only submitted after the previous one's response has been aggregated and responded onchain, could have a much simpler AggregationService without all the complicated parallel goroutines.

func NewBlsAggregatorService added in v0.2.0

func NewBlsAggregatorService(avsRegistryService avsregistry.AvsRegistryService, logger logging.Logger) *BlsAggregatorService

func (*BlsAggregatorService) GetResponseChannel added in v0.2.0

func (a *BlsAggregatorService) GetResponseChannel() <-chan *BlsAggregationServiceResponse

func (*BlsAggregatorService) InitializeNewTask added in v0.2.0

func (a *BlsAggregatorService) InitializeNewTask(
	ctx context.Context,
	taskIndex types.TaskIndex,
	taskCreatedBlock uint32,
	quorumNumbers types.QuorumNums,
	minQuorumThresholdPercentages types.QuorumThresholdPercentages,
	minWait time.Duration,
	timeToExpiry time.Duration,
) error

InitializeNewTask creates a new task goroutine meant to process new signed task responses for that task (that are sent via ProcessNewSignature) and adds a channel to a.taskChans to send the signed task responses to it quorumNumbers and quorumThresholdPercentages set the requirements for this task to be considered complete, which happens when a particular TaskResponseDigest (received via the a.taskChans[taskIndex]) has been signed by signers whose stake in each of the listed quorums adds up to at least quorumThresholdPercentages[i] of the total stake in that quorum

func (*BlsAggregatorService) ProcessNewSignature added in v0.2.0

func (a *BlsAggregatorService) ProcessNewSignature(
	ctx context.Context,
	taskIndex types.TaskIndex,
	taskResponseDigest types.TaskResponseDigest,
	blsSignature *bls.Signature,
	operatorId types.OperatorId,
) error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(endpoint string) (*Client, error)

func (*Client) FetchTask added in v0.2.0

func (c *Client) FetchTask(ctx context.Context, task *FetchTaskReq) (*FetchTaskResp, error)

func (*Client) SubmitMetrics added in v0.2.0

func (c *Client) SubmitMetrics(ctx context.Context, req *SubmitMetricsReq) error

func (*Client) SubmitTask

func (c *Client) SubmitTask(ctx context.Context, task *TaskRequest) error

type Config

type Config struct {
	ListenAddr       string
	TimeToExpirySecs int
	MinWaitSecs      int

	EcdsaPrivateKey string
	EthHttpEndpoint string
	EthWsEndpoint   string
	// AttestationLayerRpcURL     string
	MultiProverContractAddress common.Address

	AttestationLayer []AttestationLayer

	AVSRegistryCoordinatorAddress common.Address
	OperatorStateRetrieverAddress common.Address
	EigenMetricsIpPortAddress     string
	ScanStartBlock                uint64
	Threshold                     uint64
	Sampling                      uint64

	GenTaskSampling  uint64
	ExecTaskSampling uint64

	LineaMaxBlock int64

	OpenTelemetry *xmetric.OpenTelemetryConfig

	TaskFetcher []*xtask.TaskManagerConfig

	Simulation bool
}

func (*Config) Init added in v0.3.0

func (cfg *Config) Init() error

type FetchTaskReq added in v0.2.0

type FetchTaskReq struct {
	PrevTaskID  int            `json:"prev_task_id"`
	TaskType    xtask.TaskType `json:"task_type"`
	MaxWaitSecs int            `json:"max_wait_secs"`
	WithContext bool           `json:"with_context"`
}

type FetchTaskResp added in v0.2.0

type FetchTaskResp struct {
	Ok       bool            `json:"ok"`
	TaskID   int             `json:"task_id"`
	TaskType xtask.TaskType  `json:"task_type"`
	Ext      json.RawMessage `json:"ext"`
	Context  json.RawMessage `json:"context,omitempty"`
}

type Metadata

type Metadata struct {
	BatchId    uint64 `json:"batch_id,omitempty"`
	StartBlock uint64 `json:"start_block"`
	EndBlock   uint64 `json:"end_block"`
}

type OperatorPubkeysService

type OperatorPubkeysService struct {
	// contains filtered or unexported fields
}

OperatorPubkeysServiceInMemory is a stateful goroutine (see https://gobyexample.com/stateful-goroutines) implementation of OperatorPubkeysService that listen for the NewPubkeyRegistration using a websocket connection to an eth client and stores the pubkeys in memory. Another possible implementation is using a mutex (https://gobyexample.com/mutexes) instead. We can switch to that if we ever find a good reason to.

Warning: this service should probably not be used in production. Haven't done a thorough analysis of all the clients but there is still an open PR about an issue with ws subscription on geth: https://github.com/ethereum/go-ethereum/issues/23845 Another reason to note for infra/devops engineer who would put this into production, is that this service crashes on websocket connection errors or when failing to query past events. The philosophy here is that hard crashing is better than silently failing, since it will be easier to debug. Naturally, this means that this aggregator using this service needs to be replicated and load-balanced, so that when it fails traffic can be switched to the other aggregator.

func NewOperatorPubkeysService

func NewOperatorPubkeysService(
	ctx context.Context,
	client *ethclient.Client,
	avsRegistrySubscriber avsregistry.AvsRegistrySubscriber,
	avsRegistryReader avsregistry.AvsRegistryReader,
	logger logging.Logger,
	fileCache string,
	startBlock uint64,
	maxBlock uint64,
) (*OperatorPubkeysService, error)

NewOperatorPubkeysServiceInMemory constructs a OperatorPubkeysServiceInMemory and starts it in a goroutine. It takes a context as argument because the "backfilling" of the database is done inside this constructor, so we wait for all past NewPubkeyRegistration events to be queried and the db to be filled before returning the service. The constructor is thus following a RAII-like pattern, of initializing the serving during construction. Using a separate initialize() function might lead to some users forgetting to call it and the service not behaving properly.

func (*OperatorPubkeysService) GetOperatorPubkeys

func (pkcs *OperatorPubkeysService) GetOperatorPubkeys(ctx context.Context, operator common.Address) (operatorPubkeys types.OperatorPubkeys, operatorFound bool)

TODO(samlaf): we might want to also add an async version of this method that returns a channel of operator pubkeys?

type OperatorStates added in v0.3.0

type OperatorStates struct {
	// contains filtered or unexported fields
}

func NewOperatorStates added in v0.3.0

func NewOperatorStates(
	ctx context.Context,
	avsRegistryService avsregistry.AvsRegistryService,
	quorumNumbers types.QuorumNums,
	minQuorumThresholdPercentages []types.QuorumThresholdPercentage,
	taskIndex uint32,
	taskCreatedBlock uint32,
) (*OperatorStates, error)

func (*OperatorStates) NonSignerOperatorIds added in v0.3.0

func (o *OperatorStates) NonSignerOperatorIds(oprs *aggregatedOperators) []types.OperatorId

func (*OperatorStates) QuorumApksG1 added in v0.3.0

func (o *OperatorStates) QuorumApksG1() []*bls.G1Point

type SignedTaskResponse

type SignedTaskResponse struct {
	TaskResponse MultiProverServiceManager.IMultiProverServiceManagerStateHeader
	BlsSignature bls.Signature
	OperatorId   types.OperatorId
}

type StateHeader

type StateHeader struct {
	Identifier                 *hexutil.Big  `json:"identifier"`
	Metadata                   hexutil.Bytes `json:"metadata"`
	State                      hexutil.Bytes `json:"state"`
	QuorumNumbers              hexutil.Bytes `json:"quorum_numbers"`
	QuorumThresholdPercentages hexutil.Bytes `json:"quorum_threshold_percentages"`
	ReferenceBlockNumber       uint32        `json:"reference_blocknumber"`
}

func (*StateHeader) Digest

func (s *StateHeader) Digest() (types.TaskResponseDigest, error)

func (*StateHeader) ToAbi

func (s *StateHeader) ToAbi() *bindings.StateHeader

type SubmitMetricsReq added in v0.2.0

type SubmitMetricsReq struct {
	Name    string                  `json:"name"`
	Metrics []*xmetric.MetricFamily `json:"metrics"`
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

type TaskRequest

type TaskRequest struct {
	Task            *StateHeader
	Signature       *bls.Signature
	OperatorId      types.OperatorId
	ProverSignature hexutil.Bytes
}

type ThresholdInfo added in v0.2.3

type ThresholdInfo struct {
	Signed    *big.Int
	Total     *big.Int
	Threshold types.QuorumThresholdPercentage
	Percent   *big.Float
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL