relay

package
v0.29.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2023 License: AGPL-3.0 Imports: 6 Imported by: 0

Documentation

Overview

Example
package main

import (
	"fmt"
	"math/rand"

	"github.com/rs/zerolog"

	"github.com/koko1123/flow-go-1/engine/access/relay"
	splitterNetwork "github.com/koko1123/flow-go-1/engine/common/splitter/network"
	"github.com/koko1123/flow-go-1/model/flow"
	"github.com/koko1123/flow-go-1/network/channels"

	testnet "github.com/koko1123/flow-go-1/utils/unittest/network"
)

func main() {
	// create a mock network
	net := testnet.NewNetwork()

	// create splitter network
	logger := zerolog.Nop()
	splitterNet := splitterNetwork.NewNetwork(net, logger)

	// generate a random origin ID
	var id flow.Identifier
	rand.Seed(0)
	rand.Read(id[:])

	// create engines
	engineProcessFunc := func(engineName string) testnet.EngineProcessFunc {
		return func(channel channels.Channel, originID flow.Identifier, event interface{}) error {
			fmt.Printf("Engine %v received message: channel=%v, originID=%v, event=%v\n", engineName, channel, originID, event)
			return nil
		}
	}
	fooEngine := testnet.NewEngine().OnProcess(engineProcessFunc("Foo"))
	barEngine := testnet.NewEngine().OnProcess(engineProcessFunc("Bar"))

	// register engines on the splitter network
	fooChannel := channels.Channel("foo-channel")
	barChannel := channels.Channel("bar-channel")
	_, err := splitterNet.Register(fooChannel, fooEngine)
	if err != nil {
		fmt.Println(err)
	}
	_, err = splitterNet.Register(barChannel, barEngine)
	if err != nil {
		fmt.Println(err)
	}

	// create another network that messages will be relayed to
	relayNet := testnet.NewNetwork().OnPublish(func(channel channels.Channel, event interface{}, targetIDs ...flow.Identifier) error {
		fmt.Printf("Message published to relay network: channel=%v, event=%v, targetIDs=%v\n", channel, event, targetIDs)
		return nil
	})

	// create relay engine
	channels := channels.ChannelList{fooChannel, barChannel}
	_, err = relay.New(logger, channels, splitterNet, relayNet)
	if err != nil {
		fmt.Println(err)
	}

	// send messages to network
	err = net.Send(fooChannel, id, "foo")
	if err != nil {
		fmt.Println(err)
	}
	err = net.Send(barChannel, id, "bar")
	if err != nil {
		fmt.Println(err)
	}

}
Output:

Message published to relay network: channel=foo-channel, event=foo, targetIDs=[0000000000000000000000000000000000000000000000000000000000000000]
Engine Foo received message: channel=foo-channel, originID=0194fdc2fa2ffcc041d3ff12045b73c86e4ff95ff662a5eee82abdf44a2d0b75, event=foo
Message published to relay network: channel=bar-channel, event=bar, targetIDs=[0000000000000000000000000000000000000000000000000000000000000000]
Engine Bar received message: channel=bar-channel, originID=0194fdc2fa2ffcc041d3ff12045b73c86e4ff95ff662a5eee82abdf44a2d0b75, event=bar

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Relay engine relays all the messages that are received to the given network for the corresponding channel

func New

func New(
	log zerolog.Logger,
	channelList channels.ChannelList,
	net network.Network,
	unstakedNet network.Network,
) (*Engine, error)

func (*Engine) Done

func (e *Engine) Done() <-chan struct{}

Done returns a done channel that is closed once the engine has fully stopped.

func (*Engine) Process

func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error

Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.

func (*Engine) ProcessLocal

func (e *Engine) ProcessLocal(event interface{}) error

ProcessLocal processes an event originating on the local node.

func (*Engine) Ready

func (e *Engine) Ready() <-chan struct{}

Ready returns a ready channel that is closed once the engine has fully started.

func (*Engine) Submit

func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{})

Submit submits the given event from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.

func (*Engine) SubmitLocal

func (e *Engine) SubmitLocal(event interface{})

SubmitLocal submits an event originating on the local node.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL