stream

package
v0.0.0-...-83dca6d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2022 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// WebsocketNotEnabled alerts of a disabled websocket
	WebsocketNotEnabled = "exchange_websocket_not_enabled"

	WebsocketNotAuthenticatedUsingRest = "%v - Websocket not authenticated, using REST\n"
	Ping                               = "ping"
	Pong                               = "pong"
	UnhandledMessage                   = " - Unhandled websocket message: "
)

Websocket functionality list and state consts

Variables

View Source
var (
	// ErrSubscriptionFailure defines an error when a subscription fails
	ErrSubscriptionFailure = errors.New("subscription failure")
)

Functions

This section is empty.

Types

type ChannelSubscription

type ChannelSubscription struct {
	Channel  string
	Currency currency.Pair
	Asset    asset.Item
	Params   map[string]interface{}
}

ChannelSubscription container for streaming subscriptions

func (*ChannelSubscription) Equal

Equal two WebsocketChannelSubscription to determine equality

type Connection

type Connection interface {
	Dial(*websocket.Dialer, http.Header) error
	ReadMessage() Response
	SendJSONMessage(interface{}) error
	SetupPingHandler(PingHandler)
	GenerateMessageID(highPrecision bool) int64
	SendMessageReturnResponse(signature interface{}, request interface{}) ([]byte, error)
	SendRawMessage(messageType int, message []byte) error
	SetURL(string)
	SetProxy(string)
	GetURL() string
	Shutdown() error
}

Connection defines a streaming services connection

type ConnectionSetup

type ConnectionSetup struct {
	ResponseCheckTimeout time.Duration
	ResponseMaxLimit     time.Duration
	RateLimit            int64
	URL                  string
	Authenticated        bool
}

ConnectionSetup defines variables for an individual stream connection

type FundingData

type FundingData struct {
	Timestamp    time.Time
	CurrencyPair currency.Pair
	AssetType    asset.Item
	Exchange     string
	Amount       float64
	Rate         float64
	Period       int64
	Side         order.Side
}

FundingData defines funding data

type KlineData

type KlineData struct {
	Timestamp  time.Time
	Pair       currency.Pair
	AssetType  asset.Item
	Exchange   string
	StartTime  time.Time
	CloseTime  time.Time
	Interval   string
	OpenPrice  float64
	ClosePrice float64
	HighPrice  float64
	LowPrice   float64
	Volume     float64
}

KlineData defines kline feed

type Match

type Match struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Match is a distributed subtype that handles the matching of requests and responses in a timely manner, reducing the need to differentiate between connections. Stream systems fan in all incoming payloads to one routine for processing.

func NewMatch

func NewMatch() *Match

NewMatch returns a new matcher

func (*Match) Incoming

func (m *Match) Incoming(signature interface{}) bool

Incoming matches with request, disregarding the returned payload

func (*Match) IncomingWithData

func (m *Match) IncomingWithData(signature interface{}, data []byte) bool

IncomingWithData matches with requests and takes in the returned payload, to be processed outside of a stream processing routine

type PingHandler

type PingHandler struct {
	Websocket         bool
	UseGorillaHandler bool
	MessageType       int
	Message           []byte
	Delay             time.Duration
}

PingHandler container for ping handler settings

type Response

type Response struct {
	Type int
	Raw  []byte
}

Response defines generalised data from the stream connection

type UnhandledMessageWarning

type UnhandledMessageWarning struct {
	Message string
}

UnhandledMessageWarning defines a container for unhandled message warnings

type Websocket

type Websocket struct {
	Init bool

	Subscribe   chan []ChannelSubscription
	Unsubscribe chan []ChannelSubscription

	// Subscriber function for package defined websocket subscriber
	// functionality
	Subscriber func([]ChannelSubscription) error
	// Unsubscriber function for packaged defined websocket unsubscriber
	// functionality
	Unsubscriber func([]ChannelSubscription) error
	// GenerateSubs function for package defined websocket generate
	// subscriptions functionality
	GenerateSubs func() ([]ChannelSubscription, error)

	DataHandler chan interface{}
	ToRoutine   chan interface{}

	Match *Match

	// shutdown synchronises shutdown event across routines
	ShutdownC chan struct{}
	Wg        *sync.WaitGroup

	// Orderbook is a local buffer of orderbooks
	Orderbook buffer.Orderbook

	// Trade is a notifier of occurring trades
	Trade trade.Trade

	// Fills is a notifier of occurring fills
	Fills fill.Fills

	// trafficAlert monitors if there is a halt in traffic throughput
	TrafficAlert chan struct{}
	// ReadMessageErrors will received all errors from ws.ReadMessage() and
	// verify if its a disconnection
	ReadMessageErrors chan error

	// Standard stream connection
	Conn Connection
	// Authenticated stream connection
	AuthConn Connection
	// contains filtered or unexported fields
}

Websocket defines a return type for websocket connections via the interface wrapper for routine processing

func New

func New() *Websocket

New initialises the websocket struct

func (*Websocket) AddSuccessfulSubscriptions

func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription)

AddSuccessfulSubscriptions adds subscriptions to the subscription lists that has been successfully subscribed

func (*Websocket) CanUseAuthenticatedEndpoints

func (w *Websocket) CanUseAuthenticatedEndpoints() bool

CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in a thread safe manner

func (*Websocket) CanUseAuthenticatedWebsocketForWrapper

func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool

CanUseAuthenticatedWebsocketForWrapper Handles a common check to verify whether a wrapper can use an authenticated websocket endpoint

func (*Websocket) Connect

func (w *Websocket) Connect() error

Connect initiates a websocket connection by using a package defined connection function

func (*Websocket) Disable

func (w *Websocket) Disable() error

Disable disables the exchange websocket protocol

func (*Websocket) Enable

func (w *Websocket) Enable() error

Enable enables the exchange websocket protocol

func (*Websocket) FlushChannels

func (w *Websocket) FlushChannels() error

FlushChannels flushes channel subscriptions when there is a pair/asset change

func (*Websocket) GetChannelDifference

func (w *Websocket) GetChannelDifference(genSubs []ChannelSubscription) (sub, unsub []ChannelSubscription)

GetChannelDifference finds the difference between the subscribed channels and the new subscription list when pairs are disabled or enabled.

func (*Websocket) GetName

func (w *Websocket) GetName() string

GetName returns exchange name

func (*Websocket) GetProxyAddress

func (w *Websocket) GetProxyAddress() string

GetProxyAddress returns the current websocket proxy

func (*Websocket) GetSubscriptions

func (w *Websocket) GetSubscriptions() []ChannelSubscription

GetSubscriptions returns a copied list of subscriptions subscriptions is a private member and cannot be manipulated

func (*Websocket) GetWebsocketURL

func (w *Websocket) GetWebsocketURL() string

GetWebsocketURL returns the running websocket URL

func (*Websocket) IsConnected

func (w *Websocket) IsConnected() bool

IsConnected returns status of connection

func (*Websocket) IsConnecting

func (w *Websocket) IsConnecting() bool

IsConnecting returns status of connecting

func (*Websocket) IsConnectionMonitorRunning

func (w *Websocket) IsConnectionMonitorRunning() bool

IsConnectionMonitorRunning returns status of connection monitor

func (*Websocket) IsDataMonitorRunning

func (w *Websocket) IsDataMonitorRunning() bool

IsDataMonitorRunning returns status of data monitor

func (*Websocket) IsEnabled

func (w *Websocket) IsEnabled() bool

IsEnabled returns status of enabled

func (*Websocket) IsInit

func (w *Websocket) IsInit() bool

IsInit returns status of init

func (*Websocket) IsTrafficMonitorRunning

func (w *Websocket) IsTrafficMonitorRunning() bool

IsTrafficMonitorRunning returns status of the traffic monitor

func (*Websocket) RemoveSuccessfulUnsubscriptions

func (w *Websocket) RemoveSuccessfulUnsubscriptions(channels ...ChannelSubscription)

RemoveSuccessfulUnsubscriptions removes subscriptions from the subscription list that has been successfulling unsubscribed

func (*Websocket) ResubscribeToChannel

func (w *Websocket) ResubscribeToChannel(subscribedChannel *ChannelSubscription) error

ResubscribeToChannel resubscribes to channel

func (*Websocket) SetCanUseAuthenticatedEndpoints

func (w *Websocket) SetCanUseAuthenticatedEndpoints(val bool)

SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in a thread safe manner

func (*Websocket) SetProxyAddress

func (w *Websocket) SetProxyAddress(proxyAddr string) error

SetProxyAddress sets websocket proxy address

func (*Websocket) SetWebsocketURL

func (w *Websocket) SetWebsocketURL(url string, auth, reconnect bool) error

SetWebsocketURL sets websocket URL and can refresh underlying connections

func (*Websocket) Setup

func (w *Websocket) Setup(s *WebsocketSetup) error

Setup sets main variables for websocket connection

func (*Websocket) SetupNewConnection

func (w *Websocket) SetupNewConnection(c ConnectionSetup) error

SetupNewConnection sets up an auth or unauth streaming connection

func (*Websocket) Shutdown

func (w *Websocket) Shutdown() error

Shutdown attempts to shut down a websocket connection and associated routines by using a package defined shutdown function

func (*Websocket) SubscribeToChannels

func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error

SubscribeToChannels appends supplied channels to channelsToSubscribe

func (*Websocket) UnsubscribeChannels

func (w *Websocket) UnsubscribeChannels(channels []ChannelSubscription) error

UnsubscribeChannels unsubscribes from a websocket channel

type WebsocketConnection

type WebsocketConnection struct {
	Verbose bool

	RateLimit    int64
	ExchangeName string
	URL          string
	ProxyURL     string
	Wg           *sync.WaitGroup
	Connection   *websocket.Conn
	ShutdownC    chan struct{}

	Match            *Match
	ResponseMaxLimit time.Duration
	Traffic          chan struct{}
	// contains filtered or unexported fields
}

WebsocketConnection contains all the data needed to send a message to a WS connection

func (*WebsocketConnection) Dial

func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header) error

Dial sets proxy urls and then connects to the websocket

func (*WebsocketConnection) GenerateMessageID

func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64

GenerateMessageID Creates a messageID to checkout

func (*WebsocketConnection) GetURL

func (w *WebsocketConnection) GetURL() string

GetURL returns the connection URL

func (*WebsocketConnection) IsConnected

func (w *WebsocketConnection) IsConnected() bool

IsConnected exposes websocket connection status

func (*WebsocketConnection) ReadMessage

func (w *WebsocketConnection) ReadMessage() Response

ReadMessage reads messages, can handle text, gzip and binary

func (*WebsocketConnection) SendJSONMessage

func (w *WebsocketConnection) SendJSONMessage(data interface{}) error

SendJSONMessage sends a JSON encoded message over the connection

func (*WebsocketConnection) SendMessageReturnResponse

func (w *WebsocketConnection) SendMessageReturnResponse(signature, request interface{}) ([]byte, error)

SendMessageReturnResponse will send a WS message to the connection and wait for response

func (*WebsocketConnection) SendRawMessage

func (w *WebsocketConnection) SendRawMessage(messageType int, message []byte) error

SendRawMessage sends a message over the connection without JSON encoding it

func (*WebsocketConnection) SetProxy

func (w *WebsocketConnection) SetProxy(proxy string)

SetProxy sets connection proxy

func (*WebsocketConnection) SetURL

func (w *WebsocketConnection) SetURL(url string)

SetURL sets connection URL

func (*WebsocketConnection) SetupPingHandler

func (w *WebsocketConnection) SetupPingHandler(handler PingHandler)

SetupPingHandler will automatically send ping or pong messages based on WebsocketPingHandler configuration

func (*WebsocketConnection) Shutdown

func (w *WebsocketConnection) Shutdown() error

Shutdown shuts down and closes specific connection

type WebsocketPositionUpdated

type WebsocketPositionUpdated struct {
	Timestamp time.Time
	Pair      currency.Pair
	AssetType asset.Item
	Exchange  string
}

WebsocketPositionUpdated reflects a change in orders/contracts on an exchange

type WebsocketSetup

type WebsocketSetup struct {
	ExchangeConfig        *config.Exchange
	DefaultURL            string
	RunningURL            string
	RunningURLAuth        string
	Connector             func() error
	Subscriber            func([]ChannelSubscription) error
	Unsubscriber          func([]ChannelSubscription) error
	GenerateSubscriptions func() ([]ChannelSubscription, error)
	Features              *protocol.Features
	// Local orderbook buffer config values
	SortBuffer            bool
	SortBufferByUpdateIDs bool
	UpdateEntriesByID     bool
	TradeFeed             bool
	// Fill data config values
	FillsFeed bool
}

WebsocketSetup defines variables for setting up a websocket connection

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL