stream

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2021 License: MIT Imports: 7 Imported by: 0

README

go-hyperion-stream

Go Reference Gosec Build Status Go Report Card Coverage Status

This is a (minimal) library for the Hyperion Stream API.

Example

Prints a stream of rewards paid for players of Alien Worlds on WAX

package main

import (
	stream "github.com/blockpane/go-hyperion-stream"
	"log"
)

var (
	url      = "ws://wax.eosusa.news"
	contract = "m.federation"
	action   = "logmine"
	account  = ""
)

func main() {
	results := make(chan stream.HyperionResponse)
	errors := make(chan error)

	client, err := stream.NewClient(url, results, errors)
	if err != nil {
		panic(err)
	}

	err = client.StreamActions(stream.NewActionsReq(contract, account, action))
	if err != nil {
		panic(err)
	}

	act := &stream.ActionTrace{}
	for {
		select {
		case <-client.Ctx.Done():
			return
			
		case e := <-errors:
			switch e.(type) {
			case stream.ExitError:
				panic(e)
			default:
				log.Println(e)
			}

		case response := <-results:
			switch response.Type() {
			case stream.RespActionType:
				act, err = response.Action()
				if err != nil {
					log.Println(err)
					continue
				}
				log.Printf("%13s <- %11v %-13s - %v\n", act.Act.Data["miner"], act.Act.Data["bounty"], act.Act.Data["planet_name"], act.Act.Data["land_id"])
			}
		}
	}
}

Outputs:

2021/01/28 14:06:21     pgqqy.wam <-  1.4961 TLM eyeke.world   - 1099512960814
2021/01/28 14:06:21     mnk4g.wam <-  1.0674 TLM neri.world    - 1099512958948
2021/01/28 14:06:21     v5bqy.wam <-  1.6872 TLM magor.world   - 1099512960536
2021/01/28 14:06:22     a.nqy.wam <-  0.1773 TLM kavian.world  - 1099512961065
2021/01/28 14:06:22     jgxqy.wam <-  0.9895 TLM eyeke.world   - 1099512961378
2021/01/28 14:06:22     ppjay.wam <-  2.4352 TLM magor.world   - 1099512959814
2021/01/28 14:06:22     i5jay.wam <-  0.7850 TLM kavian.world  - 1099512959254
2021/01/28 14:06:22     r53qy.wam <-  0.6678 TLM kavian.world  - 1099512958463
2021/01/28 14:06:23     x1gay.wam <-  0.4707 TLM neri.world    - 1099512958632
...

Documentation

Index

Constants

View Source
const (
	// RespActionType denotes an action record sent by Hyperion
	RespActionType ResponseType = "action"
	// RespDeltaType denotes a delta (table update) record sent by Hyperion
	RespDeltaType ResponseType = "delta"

	// RespModeLive denotes data is being received in near-real-time
	RespModeLive ResponseMode = "live"
	// RespModeHist denotes data is being replayed from history
	RespModeHist ResponseMode = "history"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ActionTrace

type ActionTrace struct {
	ActionOrdinal        uint32            `json:"action_ordinal"`
	CreatorActionOrdinal uint32            `json:"creator_action_ordinal"`
	ContextFree          bool              `json:"context_free"`
	Elapsed              string            `json:"elapsed"`
	TS                   string            `json:"@timestamp"`
	BlockNum             uint32            `json:"block_num"`
	Producer             eos.AccountName   `json:"producer"`
	TrxId                eos.HexBytes      `json:"trx_id"`
	GlobalSequence       uint64            `json:"global_sequence"`
	CodeSequence         uint32            `json:"code_sequence"`
	AbiSequence          uint32            `json:"abi_sequence"`
	Notified             []eos.AccountName `json:"notified"`

	Act struct {
		Account       eos.AccountName        `json:"account"`
		Name          eos.ActionName         `json:"name"`
		Authorization []eos.PermissionLevel  `json:"authorization"`
		Data          map[string]interface{} `json:"data"`
	} `json:"act"`

	Receipts []struct {
		Receiver       eos.AccountName       `json:"receiver"`
		GlobalSequence string                `json:"global_sequence"`
		RecvSequence   string                `json:"recv_sequence"`
		AuthSequence   []eos.PermissionLevel `json:"auth_sequence"`
	} `json:"receipts"`
	// contains filtered or unexported fields
}

ActionTrace holds a trace response, it differs somewhat for standard EOSIO structures. Note that the ActionTrace.Act.Data field is a map[string]interface that will mirror the raw JSON sent by Hyperion.

func (*ActionTrace) Action

func (act *ActionTrace) Action() (*ActionTrace, error)

Action satisfies the HyperionResponse interface and will return a stream.ActionTrace if this is an action, otherwise it will return an error

func (*ActionTrace) Delta

func (act *ActionTrace) Delta() (*DeltaTrace, error)

Delta satisfies the HyperionResponse interface and will return a stream.DeltaTrace if this is a delta, otherwise it will return an error

func (*ActionTrace) Mode

func (act *ActionTrace) Mode() ResponseMode

Mode satisfies the HyperionResponse interface and will return whether streaming live or historical events

func (*ActionTrace) ToJson

func (act *ActionTrace) ToJson() []byte

ToJson marshals an ActionTrace to JSON

func (*ActionTrace) Type

func (act *ActionTrace) Type() ResponseType

Type satisfies the HyperionResponse interface and will return what type of trace this is.

type ActionsReq

type ActionsReq struct {
	Contract  eos.AccountName `json:"contract"`
	Account   eos.AccountName `json:"account"`
	Action    eos.ActionName  `json:"action"`
	Filters   []*ReqFilter    `json:"filters"`
	StartFrom interface{}     `json:"start_from"`
	ReadUntil interface{}     `json:"read_until"`
}

ActionsReq is the query sent to Hyperion requesting it to stream action traces.

func NewActionsReq

func NewActionsReq(contract string, account string, action string) *ActionsReq

NewActionsReq is a request for action traces starting at the current head block.

func NewActionsReqByBlock

func NewActionsReqByBlock(contract string, account string, action string, first int64, last int64) *ActionsReq

NewActionsReqByTime is a request for action traces with a specific block range. If last == 0 Hyperion will continue streaming data once it has caught up to the head block.

func NewActionsReqByTime

func NewActionsReqByTime(contract string, account string, action string, startRFC3339 string, endRFC3339 string) *ActionsReq

NewActionsReqByTime is a request for action updates with a specific time range. Note that it uses RFC3339 strings. an example of how this format can be expressed is: `time.Now().Format(time.RFC3339)` Passing in an empty string for the end time will instruct Hyperion to continue streaming once it has caught up to the current head block.

func (*ActionsReq) AddFilter

func (ar *ActionsReq) AddFilter(f *ReqFilter) (ok bool)

AddFilter assists in appending a ReqFilter to the request.

func (*ActionsReq) ToJson

func (ar *ActionsReq) ToJson() ([]byte, error)

ToJson marshals a ActionsReq to JSON

type BusyError

type BusyError struct{}

BusyError is used when a socket already has a subscription

func (BusyError) Error

func (b BusyError) Error() string

Error satisfies the error interface

type Client

type Client struct {
	Ctx context.Context

	LibNum  uint32
	LibId   string
	ChainId string
	// contains filtered or unexported fields
}

Client is a streaming client using a websocket to connect to Hyperion. The Client.Ctx will get closed when the websocket is terminated.

func NewClient

func NewClient(url string, results chan HyperionResponse, errors chan error) (*Client, error)

NewClient immediately connects to Hyperion, handles ping/pongs, and stores state information such as last irreversible block number in the Client.LibNum. It expects two channels for sending results and errors. Once connected a query will need to be sent before any output is sent over the results channel. If no request is sent in the first 25 seconds the websocket will be closed by Hyperion.

func (*Client) StreamActions

func (c *Client) StreamActions(req *ActionsReq) error

StreamActions will emit an action stream request to Hyperion. Note that only one stream subscription is supported in this library to keep things simple.

func (*Client) StreamDeltas

func (c *Client) StreamDeltas(req *DeltasReq) error

StreamDeltas will emit an delta stream request to Hyperion.

type DeltaTrace

type DeltaTrace struct {
	Code       eos.AccountName `json:"code"`
	Scope      eos.Name        `json:"scope"`
	Table      eos.Name        `json:"table"`
	PrimaryKey string          `json:"primary_key"`
	Payer      eos.AccountName `json:"payer"`
	TS         string          `json:"@timestamp"`
	Present    bool            `json:"present"`
	BlockNum   uint32          `json:"block_num"`
	BlockId    eos.HexBytes    `json:"block_id"`
	Data       interface{}     `json:"data"` // most likely map[string]interface{} or string
	// contains filtered or unexported fields
}

DeltaTrace is the struct returned for table updates from Hyperion

func (*DeltaTrace) Action

func (d *DeltaTrace) Action() (*ActionTrace, error)

Action satisfies the HyperionResponse interface and will return a stream.ActionTrace if this is an action, otherwise it will return an error

func (*DeltaTrace) Delta

func (d *DeltaTrace) Delta() (*DeltaTrace, error)

Delta satisfies the HyperionResponse interface and will return a stream.DeltaTrace if this is a delta, otherwise it will return an error

func (*DeltaTrace) Mode

func (d *DeltaTrace) Mode() ResponseMode

Mode satisfies the HyperionResponse interface and will return whether streaming live or historical events

func (*DeltaTrace) ToJson

func (d *DeltaTrace) ToJson() []byte

ToJson marshals a DeltaTrace to JSON

func (*DeltaTrace) Type

func (d *DeltaTrace) Type() ResponseType

Type satisfies the HyperionResponse interface and will return what type of trace this is.

type DeltasReq

type DeltasReq struct {
	Code      eos.AccountName `json:"code"`
	Table     eos.Name        `json:"table"`
	Scope     eos.Name        `json:"scope"`
	Payer     eos.AccountName `json:"payer"`
	StartFrom interface{}     `json:"start_from"` // number or string
	ReadUntil interface{}     `json:"read_until"` // number or string
}

DeltasReq is the query sent to Hyperion requesting a stream of table updates.

func NewDeltasReq

func NewDeltasReq(code string, table string, scope string, payer string) *DeltasReq

NewDeltasReq is a request for table updates starting at the current head block.

func NewDeltasReqByBlock

func NewDeltasReqByBlock(code string, table string, scope string, payer string, first int64, last int64) *DeltasReq

NewDeltasReqByBlock is a request for table updates with a specific block range. If last == 0 Hyperion will continue streaming data once it has caught up to the head block.

func NewDeltasReqByTime

func NewDeltasReqByTime(code string, table string, scope string, payer string, startRFC3339, endRFC3339 string) *DeltasReq

NewDeltasReqByTime is a request for table updates with a specific time range. Note that it uses RFC3339 strings. an example of how this format can be expressed is: `time.Now().Format(time.RFC3339)` Passing in an empty string for the end time will instruct Hyperion to continue streaming once it has caught up to the current head block.

func (*DeltasReq) ToJson

func (dr *DeltasReq) ToJson() ([]byte, error)

ToJson marshals a DeltasReq to JSON

type ExitError

type ExitError struct{}

ExitError is used when the socket.io-specific exit message is received

func (ExitError) Error

func (e ExitError) Error() string

Error satisfies the error interface

type HyperionResponse

type HyperionResponse interface {
	Type() ResponseType
	Mode() ResponseMode
	Action() (*ActionTrace, error)
	Delta() (*DeltaTrace, error)
}

HyperionResponse is the data being streamed over the results channel of the stream.Client it can be one of (at current) two types.

type NotActionError

type NotActionError struct{}

NotActionError is used when HyperionResponse.Action() is used on a DeltaTrace

func (NotActionError) Error

func (NotActionError) Error() string

Error satisfies the error interface

type NotDeltaError

type NotDeltaError struct{}

NotDeltaError is used when HyperionResponse.Delta() is used on a ActionTrace

func (NotDeltaError) Error

func (NotDeltaError) Error() string

Error satisfies the error interface

type ReqFilter

type ReqFilter struct {
	Field string `json:"field"`
	Value string `json:"value"`
}

ReqFilter instructs Hyperion to perform further filtering, more information at:

https://github.com/eosrio/hyperion-stream-client/tree/master#211-act-data-filters

adding filters to an ActionsReq should be performed using the ActionsReq.AddFilter function.

type ResponseMode

type ResponseMode string

ResponseMode represents whether the data being streamed is live or historical

type ResponseType

type ResponseType string

ResponseType indicates if the HyperionResponse is an action trace or delta trace

type UnknownTypeError

type UnknownTypeError struct{}

UnknownTypeError is used when an unknown trace message is received

func (UnknownTypeError) Error

func (UnknownTypeError) Error() string

Error satisfies the error interface

Directories

Path Synopsis
_example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL