bridge_core

package module
v0.0.0-...-c40edbd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

README

Bridge-core

Bridge core is an event fetching library. This library automatically fetch logs from chain then invoke the perspective events. Callback functions will be provided in application side.

How to create an applicationn

Preparation

Firstly, we need to clone ronin for go-ethereum replacement later.

git clone https://github.com/axieinfinity/ronin.git

Create a go application

mkdir app
cd app
go mod init app

Inside go.mod, we add this line. By doing this, we replace go-ethereum module by ronin module which cloned from above

replace github.com/ethereum/go-ethereum => ../ronin

Get migration package. We need it for migration.

go get -v github.com/axieinfinity/bridge-migrations

Create main.go, we need to migrate to a postgres database by using gorm

import (
	migration "github.com/axieinfinity/bridge-migrations"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)


func main() {
    connectionString := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=disable", "localhost", "user", "password", "dbname", 5432)
	db, err := gorm.Open(postgres.Open(connectionString), &gorm.Config{})
	if err != nil {
		panic(err)
	}

	if err := migration.Migrate(db, config); err != nil {
		panic(err)
	}
}
	
Implementation

First, we need to provide an implementation of Listener interface. Then, we add methods determining event callbacks to the implementation struct. Type of callback method

type Callback func(fromChainId *big.Int, tx bridgeCore.Transaction, data []byte) error

For delegating a callback on event, we implement callback methods following the above type

func (l *ConreteListener) WithdrewCallback(fromChainId *big.Int, tx bridge_core.Transaction, data []byte) error {
	// implementation here
}

Next, we create an instance of controller. Before creating, we need to call AddListener method which receives ChainName and an initalizing function returning an instance of Listener.

Type of init function:

type Init func(ctx context.Context, lsConfig *bridge_core.LsConfig, store stores.MainStore, helpers utils.Utils) bridge_core.Listener
func CreateController(cfg *bridge_core.Config, db *gorm.DB) *bridge_core.Controller {
	bridge_core.AddListener("Ethereum", InitEthereum)
	bridge_core.AddListener("Ronin", InitRonin)
	controller, err := bridge_core.New(cfg, db, nil)
	if err != nil {
		panic(err)
	}
	return controller
}

func InitEthereum(ctx context.Context, lsConfig *bridge_core.LsConfig, store stores.MainStore, helpers utils.Utils) bridge_core.Listener {
	// implementation here
}

func InitRonin(ctx context.Context, lsConfig *bridge_core.LsConfig, store stores.MainStore, helpers utils.Utils) bridge_core.Listener {
	// implementation here
}
Configuration

we need a configuration:

	config := &bridge_core.Config{
		Listeners: map[string]*bridge_core.LsConfig{
			"Ethereum": {
				ChainId: "0x03",
				Name:    "Ethereum",
				RpcUrl:  "url",
				Subscriptions: map[string]*bridge_core.Subscribe{
					"WithdrewSubscription": {
						To:   "0x4E4D9B21B157CCD52b978C3a3BCd1dc5eBAE7167",
						Type: 1, // 0 for listening, 1 for callback
						CallBacks: map[string]string{
							"Ethereum": "WithdrewCallback", // Key: Value is Chain name: method name
						},
						Handler: &bridge_core.Handler{
							Contract: "EthereumGateway", // contract name
							Name:     "Withdrew",        // Event name
						},
					},
				},
			},
			"Ronin": {
				ChainId: "0x7e5",
				Name:    "Ronin",
				RpcUrl:  "url",
				Subscriptions: map[string]*bridge_core.Subscribe{
					"DepositedCallback": {
						To:   "0xA8D61A5427a778be28Bd9bb5990956b33385c738",
						Type: 1, // 0 for listening, 1 for callback
						CallBacks: map[string]string{
							"Ronin": "DepositedCallback", // Key: Value is Chain name: method name
						},
						Handler: &bridge_core.Handler{
							Contract: "RoninGateway", // contract name
							Name:     "Deposited",    // Event name
						},
					},
				},
			},
		},
	}

Config includes a map representing the configuration on this chain. This configuration includes chain id, a map of subscription provides information about event listener and callback, these information are provided inside Handler and Callbacks perspectively.

Examples

See Example for sample use cases.

Documentation

Index

Constants

View Source
const (
	ListenHandler = iota
	CallbackHandler
)
View Source
const (
	TxEvent = iota
	LogEvent
)

Variables

This section is empty.

Functions

func AddListener

func AddListener(name string, initFunc func(ctx context.Context, lsConfig *LsConfig, store stores.MainStore, helpers utils.Utils) Listener)

Types

type BaseJob

type BaseJob struct {
	// contains filtered or unexported fields
}

func NewBaseJob

func NewBaseJob(listener Listener, job *models.Job, transaction Transaction) (*BaseJob, error)

func (*BaseJob) CreatedAt

func (e *BaseJob) CreatedAt() time.Time

func (*BaseJob) FromChainID

func (e *BaseJob) FromChainID() *big.Int

func (*BaseJob) GetBackOff

func (e *BaseJob) GetBackOff() int

func (*BaseJob) GetData

func (e *BaseJob) GetData() []byte

func (*BaseJob) GetID

func (e *BaseJob) GetID() int32

func (*BaseJob) GetListener

func (e *BaseJob) GetListener() Listener

func (*BaseJob) GetMaxTry

func (e *BaseJob) GetMaxTry() int

func (*BaseJob) GetNextTry

func (e *BaseJob) GetNextTry() int64

func (*BaseJob) GetRetryCount

func (e *BaseJob) GetRetryCount() int

func (*BaseJob) GetSubscriptionName

func (e *BaseJob) GetSubscriptionName() string

func (*BaseJob) GetTransaction

func (e *BaseJob) GetTransaction() Transaction

func (*BaseJob) GetType

func (e *BaseJob) GetType() int

func (*BaseJob) GetValue

func (e *BaseJob) GetValue() *big.Int

func (*BaseJob) Hash

func (e *BaseJob) Hash() common.Hash

func (*BaseJob) IncreaseRetryCount

func (e *BaseJob) IncreaseRetryCount()

func (*BaseJob) Process

func (e *BaseJob) Process() ([]byte, error)

func (*BaseJob) Save

func (e *BaseJob) Save() error

func (*BaseJob) SetID

func (e *BaseJob) SetID(id int32)

func (*BaseJob) String

func (e *BaseJob) String() string

func (*BaseJob) Update

func (e *BaseJob) Update(status string) error

func (*BaseJob) UpdateNextTry

func (e *BaseJob) UpdateNextTry(nextTry int64)

func (*BaseJob) Utils

func (e *BaseJob) Utils() utils.Utils

type Block

type Block interface {
	GetHash() common.Hash
	GetHeight() uint64
	GetTransactions() []Transaction
	GetLogs() []Log
	GetTimestamp() uint64
}

type BridgeWorker

type BridgeWorker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(ctx context.Context, id int, mainChan, failedChan chan<- JobHandler, queue chan chan JobHandler, size int, listeners map[string]Listener) *BridgeWorker

func (*BridgeWorker) Channel

func (w *BridgeWorker) Channel() chan JobHandler

func (*BridgeWorker) Close

func (w *BridgeWorker) Close()

func (*BridgeWorker) Context

func (w *BridgeWorker) Context() context.Context

func (*BridgeWorker) FailedChannel

func (w *BridgeWorker) FailedChannel() chan<- JobHandler

func (*BridgeWorker) IsClose

func (w *BridgeWorker) IsClose() bool

func (*BridgeWorker) PoolChannel

func (w *BridgeWorker) PoolChannel() chan<- JobHandler

func (*BridgeWorker) ProcessJob

func (w *BridgeWorker) ProcessJob(job JobHandler) error

func (*BridgeWorker) String

func (w *BridgeWorker) String() string

func (*BridgeWorker) WorkersQueue

func (w *BridgeWorker) WorkersQueue() chan chan JobHandler

type Config

type Config struct {
	Listeners       map[string]*LsConfig `json:"listeners"`
	SlackUrl        string               `json:"slackUrl"`
	ScanUrl         string               `json:"scanUrl"`
	NumberOfWorkers int                  `json:"numberOfWorkers"`
	MaxQueueSize    int                  `json:"maxQueueSize"`
	MaxRetry        int32                `json:"maxRetry"`
	BackOff         int32                `json:"backoff"`
	DB              *stores.Database     `json:"database"`

	// this field is used for testing purpose
	Testing bool
}

type Controller

type Controller struct {
	HandlerABIs map[string]*abi.ABI

	Pool *Pool
	// contains filtered or unexported fields
}

func New

func New(cfg *Config, db *gorm.DB, helpers utils.Utils) (*Controller, error)

func (*Controller) Close

func (c *Controller) Close()

func (*Controller) LoadABIsFromConfig

func (c *Controller) LoadABIsFromConfig(lsConfig *LsConfig) (err error)

LoadABIsFromConfig loads all ABIPath and add results to Handler.ABI

func (*Controller) LoadAbi

func (c *Controller) LoadAbi(path string) (*abi.ABI, error)

func (*Controller) Start

func (c *Controller) Start() error

type EmptyTransaction

type EmptyTransaction struct {
	// contains filtered or unexported fields
}

func NewEmptyTransaction

func NewEmptyTransaction(chainId *big.Int, tx common.Hash, data []byte, from, to *common.Address) *EmptyTransaction

func (*EmptyTransaction) GetData

func (b *EmptyTransaction) GetData() []byte

func (*EmptyTransaction) GetFromAddress

func (b *EmptyTransaction) GetFromAddress() string

func (*EmptyTransaction) GetHash

func (b *EmptyTransaction) GetHash() common.Hash

func (*EmptyTransaction) GetToAddress

func (b *EmptyTransaction) GetToAddress() string

func (*EmptyTransaction) GetValue

func (b *EmptyTransaction) GetValue() *big.Int

type Handler

type Handler struct {
	// Contract Name that will be used to get ABI
	Contract string `json:"contract"`

	// Name is method/event name
	Name string `json:"name"`

	// ContractAddress is used in callback case
	ContractAddress string `json:"contractAddress"`

	// Listener who triggers callback event
	Listener string `json:"listener"`

	ABI *abi.ABI `json:"-"`

	// HandleMethod is used when processing listened job, do nothing if it is empty
	HandleMethod string `json:"handleMethod"`
}

type Job

type Job struct {
	ID         int32
	Type       int
	Message    []interface{}
	RetryCount int32
	NextTry    int32
	MaxTry     int32
	BackOff    int32
	Listener   Listener
}

func (Job) Hash

func (job Job) Hash() common.Hash

type JobHandler

type JobHandler interface {
	GetID() int32
	GetType() int
	GetRetryCount() int
	GetNextTry() int64
	GetMaxTry() int
	GetData() []byte
	GetValue() *big.Int
	GetBackOff() int

	Process() ([]byte, error)
	Hash() common.Hash

	IncreaseRetryCount()
	UpdateNextTry(int64)

	GetListener() Listener
	GetSubscriptionName() string
	GetTransaction() Transaction

	FromChainID() *big.Int

	Save() error
	Update(string) error

	CreatedAt() time.Time
	String() string
}

type Listener

type Listener interface {
	GetName() string
	GetStore() stores.MainStore
	Config() *LsConfig

	Period() time.Duration
	GetSafeBlockRange() uint64
	GetPreventOmissionRange() uint64
	GetCurrentBlock() Block
	GetLatestBlock() (Block, error)
	GetLatestBlockHeight() (uint64, error)
	GetBlock(height uint64) (Block, error)
	GetBlockWithLogs(height uint64) (Block, error)
	GetChainID() (*big.Int, error)
	GetReceipt(common.Hash) (*types.Receipt, error)
	Context() context.Context

	GetSubscriptions() map[string]*Subscribe

	UpdateCurrentBlock(block Block) error

	SaveCurrentBlockToDB() error
	SaveTransactionsToDB(txs []Transaction) error

	GetListenHandleJob(subscriptionName string, tx Transaction, eventId string, data []byte) JobHandler
	SendCallbackJobs(listeners map[string]Listener, subscriptionName string, tx Transaction, inputData []byte)

	NewJobFromDB(job *models.Job) (JobHandler, error)

	Start()
	Close()

	IsDisabled() bool
	SetInitHeight(uint64)
	GetInitHeight() uint64

	GetEthClient() utils.EthClient

	GetTasks() []TaskHandler
	GetTask(index int) TaskHandler
	AddTask(handler TaskHandler)

	IsUpTodate() bool
	SetPrepareJobChan(chan JobHandler)

	GetValidatorSign() utils.ISign

	AddListeners(map[string]Listener)

	// GetListener returns listener by name
	GetListener(string) Listener
}

type Log

type Log interface {
	GetContractAddress() string
	GetTopics() []string
	GetData() []byte
	GetIndex() uint
	GetTxIndex() uint
	GetTransactionHash() string
}

type LsConfig

type LsConfig struct {
	ChainId              string            `json:"chainId"`
	Name                 string            `json:"name"`
	RpcUrl               string            `json:"rpcUrl"`
	SlackUrl             string            `json:"slackUrl"`
	ScanUrl              string            `json:"scanUrl"`
	LoadInterval         time.Duration     `json:"blockTime"`
	SafeBlockRange       uint64            `json:"safeBlockRange"`
	PreventOmissionRange uint64            `json:"preventOmissionRange"`
	FromHeight           uint64            `json:"fromHeight"`
	DomainSeparators     map[uint64]string `json:"domainSeparators"`
	Decimals             map[uint64]uint64 `json:"decimals"`
	TaskInterval         time.Duration     `json:"taskInterval"`
	Disabled             bool              `json:"disabled"`

	// TODO: apply more ways to get privatekey. such as: PLAINTEXT, KMS, etc.
	Secret                 *Secret               `json:"secret"`
	Subscriptions          map[string]*Subscribe `json:"subscriptions"`
	TransactionCheckPeriod time.Duration         `json:"transactionCheckPeriod"`
	Contracts              map[string]string     `json:"contracts"`
	ProcessWithinBlocks    uint64                `json:"processWithinBlocks"`

	MaxTasksQuery int `json:"maxTasksQuery"`
	MinTasksQuery int `json:"minTasksQuery"`

	// GetLogsBatchSize is used at batch size when calling processBatchLogs
	GetLogsBatchSize int `json:"getLogsBatchSize"`

	// MaxProcessingTasks is used to specify max processing tasks allowed while processing tasks
	// if number of tasks reaches this number, it waits until this number decrease
	MaxProcessingTasks int `json:"maxProcessingTasks"`
}

type Pool

type Pool struct {
	Workers []Worker

	// message backoff
	MaxRetry int32
	BackOff  int32

	// Queue holds a list of worker
	Queue chan chan JobHandler

	// JobChan receives new job
	JobChan        chan JobHandler
	RetryJobChan   chan JobHandler
	FailedJobChan  chan JobHandler
	PrepareJobChan chan JobHandler

	MaxQueueSize int
	// contains filtered or unexported fields
}

func NewPool

func NewPool(ctx context.Context, cfg *Config, db *gorm.DB, workers []Worker) *Pool

func (*Pool) AddWorkers

func (p *Pool) AddWorkers(workers []Worker)

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

func (*Pool) PrepareJob

func (p *Pool) PrepareJob(job JobHandler) error

PrepareJob saves new job to database

func (*Pool) PrepareRetryableJob

func (p *Pool) PrepareRetryableJob(job JobHandler)

func (*Pool) Start

func (p *Pool) Start(closeFunc func())

func (*Pool) Stats

func (p *Pool) Stats() Stats

func (*Pool) Wait

func (p *Pool) Wait()

type Receipt

type Receipt interface {
	GetTransaction() Transaction
	GetStatus() bool
	GetLogs() []Log
}

type Secret

type Secret struct {
	Validator *utils.SignMethodConfig `json:"validator"`
}

type Stats

type Stats struct {
	PendingQueue int
	Queue        int
}

type Subscribe

type Subscribe struct {
	From string `json:"from"`
	To   string `json:"to"`

	// Type can be either TxEvent or LogEvent
	Type int `json:"type"`

	Handler   *Handler          `json:"handler"`
	CallBacks map[string]string `json:"callbacks"`
}

type TaskHandler

type TaskHandler interface {
	Start()
	Close()
	GetListener() Listener
	SetLimitQuery(limit int)
}

type Transaction

type Transaction interface {
	GetHash() common.Hash
	GetFromAddress() string
	GetToAddress() string
	GetData() []byte
	GetValue() *big.Int
}

type Worker

type Worker interface {
	Context() context.Context
	Close()
	ProcessJob(job JobHandler) error
	IsClose() bool
	Channel() chan JobHandler
	PoolChannel() chan<- JobHandler
	WorkersQueue() chan chan JobHandler
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL