stream

package
v2.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2022 License: Apache-2.0 Imports: 14 Imported by: 15

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnectCalledMultipleTimes is returned when Connect has been called multiple times on a single client
	ErrConnectCalledMultipleTimes = errors.New("tried to call Connect multiple times")
	// ErrNoConnected is returned when the client did not receive the welcome
	// message from the server
	ErrNoConnected = errors.New("did not receive connected message")
	// ErrBadAuthResponse is returned when the client could not successfully authenticate
	ErrBadAuthResponse = errors.New("did not receive authenticated message")
	// ErrSubResponse is returned when the client's subscription request was not
	// acknowledged
	ErrSubResponse = errors.New("did not receive subscribed message")
	// ErrSubscriptionChangeBeforeConnect is returned when the client attempts to change subscriptions before
	// calling Connect
	ErrSubscriptionChangeBeforeConnect = errors.New("subscription change attempted before calling Connect")
	// ErrSubscriptionChangeAfterTerminated is returned when client attempts to change subscriptions after
	// the client has been terminated
	ErrSubscriptionChangeAfterTerminated = errors.New("subscription change after client termination")
	// ErrSubscriptionChangeAlreadyInProgress is returned when a subscription change is called concurrently
	// with another
	ErrSubscriptionChangeAlreadyInProgress = errors.New("subscription change already in progress")
	// ErrSubscriptionChangeInterrupted is returned when a subscription change was in progress when the client
	// has terminated
	ErrSubscriptionChangeInterrupted = errors.New("subscription change interrupted by client termination")
	// ErrSubscriptionChangeTimeout is returned when the server does not return a proper
	// subscription response after a subscription change request.
	ErrSubscriptionChangeTimeout = errors.New("subscription change timeout")
)
View Source
var (
	// ErrInvalidCredentials is returned when invalid credentials have been sent by the user.
	ErrInvalidCredentials error = errorMessage{/* contains filtered or unexported fields */}
	// ErrSymbolLimitExceeded is returned when the client has subscribed to too many symbols
	ErrSymbolLimitExceeded error = errorMessage{/* contains filtered or unexported fields */}
	// ErrConnectionLimitExceeded is returned when the client has exceeded their connection limit
	ErrConnectionLimitExceeded error = errorMessage{/* contains filtered or unexported fields */}
	// ErrSlowClient is returned when the server has detected a slow client. In this case there's no guarantee
	// that all prior messages are sent to the server so a subscription acknowledgement may not arrive
	ErrSlowClient error = errorMessage{/* contains filtered or unexported fields */}
	// ErrInsufficientSubscription is returned when the user does not have proper
	// subscription for the requested feed (e.g. SIP)
	ErrInsufficientSubscription error = errorMessage{/* contains filtered or unexported fields */}
	// ErrSubscriptionChangeInvalidForFeed is returned when a subscription change is invalid for the feed.
	ErrSubscriptionChangeInvalidForFeed error = errorMessage{/* contains filtered or unexported fields */}
	// ErrInsufficientScope is returned when the token used by the user doesn't have proper scopes
	// for data stream
	ErrInsufficientScope error = errorMessage{/* contains filtered or unexported fields */}
)

Functions

This section is empty.

Types

type Bar

type Bar struct {
	Symbol     string
	Open       float64
	High       float64
	Low        float64
	Close      float64
	Volume     uint64
	Timestamp  time.Time
	TradeCount uint64
	VWAP       float64
}

Bar is an aggregate of trades

type CryptoBar

type CryptoBar struct {
	Symbol     string
	Exchange   string
	Open       float64
	High       float64
	Low        float64
	Close      float64
	Volume     float64
	Timestamp  time.Time
	TradeCount uint64
	VWAP       float64
}

type CryptoClient

type CryptoClient interface {
	StreamClient
	SubscribeToTrades(handler func(trade CryptoTrade), symbols ...string) error
	UnsubscribeFromTrades(symbols ...string) error
	SubscribeToQuotes(handler func(quote CryptoQuote), symbols ...string) error
	UnsubscribeFromQuotes(symbols ...string) error
	SubscribeToBars(handler func(bar CryptoBar), symbols ...string) error
	UnsubscribeFromBars(symbols ...string) error
	SubscribeToUpdatedBars(handler func(bar CryptoBar), symbols ...string) error
	UnsubscribeFromUpdatedBars(symbols ...string) error
	SubscribeToDailyBars(handler func(bar CryptoBar), symbols ...string) error
	UnsubscribeFromDailyBars(symbols ...string) error
	SubscribeToOrderbooks(handler func(ob CryptoOrderbook), symbols ...string) error
	UnsubscribeFromOrderbooks(symbols ...string) error
}

CryptoClient is a client that connects to an Alpaca Data V2 stream server and handles communication both ways.

After constructing, Connect() must be called before any subscription changes are called. Connect keeps the connection alive and reestablishes it until a configured number of retries has not been exceeded.

Terminated() returns a channel that the client sends an error to when it has terminated. A client can not be reused once it has terminated!

SubscribeTo... and UnsubscribeFrom... can be used to modify subscriptions and the handler used to process incoming trades/quotes/bars. These block until an irrecoverable error occurs or if they succeed.

Note that subscription changes can not be called concurrently.

func NewCryptoClient

func NewCryptoClient(opts ...CryptoOption) CryptoClient

NewCryptoClient returns a new CryptoClient that will connect to the crypto feed and whose default configurations are modified by opts.

type CryptoOption

type CryptoOption interface {
	// contains filtered or unexported methods
}

CryptoOption is a configuration option for the CryptoClient

func WithCryptoBars

func WithCryptoBars(handler func(CryptoBar), symbols ...string) CryptoOption

WithCryptoBars configures initial bar symbols to subscribe to and the handler

func WithCryptoDailyBars

func WithCryptoDailyBars(handler func(CryptoBar), symbols ...string) CryptoOption

WithCryptoDailyBars configures initial daily bar symbols to subscribe to and the handler

func WithCryptoOrderbooks added in v2.5.0

func WithCryptoOrderbooks(handler func(CryptoOrderbook), symbols ...string) CryptoOption

WithCryptoOrderbooks configures initial orderbook symbols to subscribe to and the handler

func WithCryptoQuotes

func WithCryptoQuotes(handler func(CryptoQuote), symbols ...string) CryptoOption

WithCryptoQuotes configures initial quote symbols to subscribe to and the handler

func WithCryptoTrades

func WithCryptoTrades(handler func(CryptoTrade), symbols ...string) CryptoOption

WithCryptoTrades configures initial trade symbols to subscribe to and the handler

func WithCryptoUpdatedBars added in v2.3.0

func WithCryptoUpdatedBars(handler func(CryptoBar), symbols ...string) CryptoOption

WithCryptoUpdatedBars configures initial updated bar symbols to subscribe to and the handler

func WithExchanges

func WithExchanges(exchanges ...string) CryptoOption

WithExchanges configures the set of crypto exchanges to listen to

type CryptoOrderbook added in v2.5.0

type CryptoOrderbook struct {
	Symbol    string
	Exchange  string
	Timestamp time.Time
	Bids      []CryptoOrderbookEntry
	Asks      []CryptoOrderbookEntry
	Reset     bool
}

type CryptoOrderbookEntry added in v2.5.0

type CryptoOrderbookEntry struct {
	Price float64
	Size  float64
}

type CryptoQuote

type CryptoQuote struct {
	Symbol    string
	Exchange  string
	BidPrice  float64
	BidSize   float64
	AskPrice  float64
	AskSize   float64
	Timestamp time.Time
}

type CryptoTrade

type CryptoTrade struct {
	Symbol    string
	Exchange  string
	Price     float64
	Size      float64
	Timestamp time.Time
	Id        int64
	// TakerSide is the taker's side: one of B, S or -.
	// B is buy, S is sell and - is unknown.
	TakerSide string
}

type LULD

type LULD struct {
	Symbol         string
	LimitUpPrice   float64
	LimitDownPrice float64
	Indicator      string
	Timestamp      time.Time
	Tape           string
}

LULD is a Limit Up Limit Down message

type Logger

type Logger interface {
	Infof(format string, v ...interface{})
	Warnf(format string, v ...interface{})
	Errorf(format string, v ...interface{})
}

Logger wraps methods for leveled, formatted logging.

func DefaultLogger

func DefaultLogger() Logger

DefaultLogger returns a Logger that uses the standard go log package to print leveled logs to the standard error.

func ErrorOnlyLogger

func ErrorOnlyLogger() Logger

ErrorOnlyLogger returns a Logger that only logs errors to the standard error.

type News added in v2.2.0

type News struct {
	ID        int
	Author    string
	CreatedAt time.Time
	UpdatedAt time.Time
	Headline  string
	Summary   string
	Content   string
	URL       string
	Symbols   []string
}

type NewsClient added in v2.2.0

type NewsClient interface {
	StreamClient
	SubscribeToNews(handler func(news News), symbols ...string) error
	UnsubscribeFromNews(symbols ...string) error
}

func NewNewsClient added in v2.2.0

func NewNewsClient(opts ...NewsOption) NewsClient

NewNewsClient returns a new NewsClient that will connect the news stream.

type NewsOption added in v2.2.0

type NewsOption interface {
	// contains filtered or unexported methods
}

func WithNews added in v2.2.0

func WithNews(handler func(News), symbols ...string) NewsOption

WithNew configures initial symbols to subscribe to and the handler

type Option

type Option interface {
	StockOption
	CryptoOption
	NewsOption
}

Option is a configuration option that can be used for both StockClient and CryptoClient

func WithBaseURL

func WithBaseURL(url string) Option

WithBaseURL configures the base URL

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the size for the buffer that is used for messages received from the server

func WithConnectCallback added in v2.8.0

func WithConnectCallback(callback func()) Option

WithConnectCallback runs the callback function after the streaming connection is setup. If the stream terminates and can't reconnect, the connect callback will timeout one second after reaching the end of the stream's maintenance (if it is still running). This is to avoid the callback blocking the parent thread.

func WithCredentials

func WithCredentials(key, secret string) Option

WithCredentials configures the key and secret to use

func WithDisconnectCallback added in v2.8.0

func WithDisconnectCallback(callback func()) Option

WithDisconnectCallback runs the callback function after the streaming connection disconnects. If the stream is terminated and can't reconnect, the disconnect callback will timeout one second after reaching the end of the stream's maintenance (if it is still running). This is to avoid the callback blocking the parent thread.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger configures the logger

func WithProcessors

func WithProcessors(count int) Option

WithProcessors configures how many goroutines should process incoming messages. Increasing this past 1 means that the order of processing is not necessarily the same as the order of arrival the from server.

func WithReconnectSettings

func WithReconnectSettings(limit int, delay time.Duration) Option

WithReconnectSettings configures how many consecutive connection errors should be accepted and the delay (that is multipled by the number of consecutive errors) between retries. limit = 0 means the client will try restarting indefinitely unless it runs into an irrecoverable error (such as invalid credentials).

type Quote

type Quote struct {
	Symbol      string
	BidExchange string
	BidPrice    float64
	BidSize     uint32
	AskExchange string
	AskPrice    float64
	AskSize     uint32
	Timestamp   time.Time
	Conditions  []string
	Tape        string
	// contains filtered or unexported fields
}

Quote is a stock quote from the market

func (Quote) Internal added in v2.1.0

func (q Quote) Internal() quoteInternal

Internal contains internal fields. There aren't any behavioural or backward compatibility promises for them: they can be empty or removed in the future. You should not use them at all.

type StockOption

type StockOption interface {
	// contains filtered or unexported methods
}

StockOption is a configuration option for the StockClient

func WithBars

func WithBars(handler func(Bar), symbols ...string) StockOption

WithBars configures initial bar symbols to subscribe to and the handler

func WithCancelErrors added in v2.1.0

func WithCancelErrors(handler func(TradeCancelError)) StockOption

WithCancelErrors configures initial trade cancel errors handler. This does not create any new subscriptions because cancel errors are subscribed automatically together with trades. No need to pass in symbols.

func WithCorrections added in v2.1.0

func WithCorrections(handler func(TradeCorrection)) StockOption

WithCorrections configures initial trade corrections handler. This does not create any new subscriptions because corrections are subscribed automatically together with trades. No need to pass in symbols.

func WithDailyBars

func WithDailyBars(handler func(Bar), symbols ...string) StockOption

WithDailyBars configures initial daily bar symbols to subscribe to and the handler

func WithLULDs

func WithLULDs(handler func(LULD), symbols ...string) StockOption

WithLULDs configures initial LULD symbols to subscribe to and the handler

func WithQuotes

func WithQuotes(handler func(Quote), symbols ...string) StockOption

WithQuotes configures initial quote symbols to subscribe to and the handler

func WithStatuses

func WithStatuses(handler func(TradingStatus), symbols ...string) StockOption

WithStatuses configures initial trading status symbols to subscribe to and the handler

func WithTrades

func WithTrades(handler func(Trade), symbols ...string) StockOption

WithTrades configures initial trade symbols to subscribe to and the handler

func WithUpdatedBars added in v2.3.0

func WithUpdatedBars(handler func(Bar), symbols ...string) StockOption

WithUpdatedBars configures initial updated bar symbols to subscribe to and the handler

type StocksClient

type StocksClient interface {
	StreamClient
	SubscribeToTrades(handler func(trade Trade), symbols ...string) error
	UnsubscribeFromTrades(symbols ...string) error
	SubscribeToQuotes(handler func(quote Quote), symbols ...string) error
	UnsubscribeFromQuotes(symbols ...string) error
	SubscribeToBars(handler func(bar Bar), symbols ...string) error
	UnsubscribeFromBars(symbols ...string) error
	SubscribeToUpdatedBars(handler func(bar Bar), symbols ...string) error
	UnsubscribeFromUpdatedBars(symbols ...string) error
	SubscribeToDailyBars(handler func(bar Bar), symbols ...string) error
	UnsubscribeFromDailyBars(symbols ...string) error
	SubscribeToStatuses(handler func(ts TradingStatus), symbols ...string) error
	UnsubscribeFromStatuses(symbols ...string) error
	SubscribeToLULDs(handler func(luld LULD), symbols ...string) error
	UnsubscribeFromLULDs(symbols ...string) error
	RegisterCancelErrors(handler func(tce TradeCancelError))
	UnregisterCancelErrors()
	RegisterCorrections(handler func(tc TradeCorrection))
	UnregisterCorrections()
}

StocksClient is a client that connects to an Alpaca Data V2 stream server and handles communication both ways.

After constructing, Connect() must be called before any subscription changes are called. Connect keeps the connection alive and reestablishes it until a configured number of retries has not been exceeded.

Terminated() returns a channel that the client sends an error to when it has terminated. A client can not be reused once it has terminated!

SubscribeTo... and UnsubscribeFrom... can be used to modify subscriptions and the handler used to process incoming trades/quotes/bars/etc. These block until an irrecoverable error occurs or if they succeed.

Note that subscription changes can not be called concurrently.

func NewStocksClient

func NewStocksClient(feed string, opts ...StockOption) StocksClient

NewStocksClient returns a new StocksClient that will connect to feed data feed and whose default configurations are modified by opts.

type StreamClient

type StreamClient interface {
	// Connect establishes a connection and **reestablishes it when errors occur**
	// as long as the configured number of retries has not been exceeded.
	//
	// It blocks until the connection has been established for the first time (or it failed to do so).
	//
	// **Should only be called once!**
	Connect(ctx context.Context) error
	// Terminated returns a channel that the client sends an error to when it has terminated.
	// The channel is also closed upon termination.
	Terminated() <-chan error
}

type Trade

type Trade struct {
	ID         int64
	Symbol     string
	Exchange   string
	Price      float64
	Size       uint32
	Timestamp  time.Time
	Conditions []string
	Tape       string
	// contains filtered or unexported fields
}

Trade is a stock trade that happened on the market

func (Trade) Internal added in v2.1.0

func (t Trade) Internal() tradeInternal

Internal contains internal fields. There aren't any behavioural or backward compatibility promises for them: they can be empty or removed in the future. You should not use them at all.

type TradeCancelError added in v2.1.0

type TradeCancelError struct {
	Symbol            string
	ID                int64
	Exchange          string
	Price             float64
	Size              uint32
	CancelErrorAction string
	Tape              string
	Timestamp         time.Time
}

type TradeCorrection added in v2.1.0

type TradeCorrection struct {
	Symbol              string
	Exchange            string
	OriginalID          int64
	OriginalPrice       float64
	OriginalSize        uint32
	OriginalConditions  []string
	CorrectedID         int64
	CorrectedPrice      float64
	CorrectedSize       uint32
	CorrectedConditions []string
	Tape                string
	Timestamp           time.Time
}

type TradingStatus

type TradingStatus struct {
	Symbol     string
	StatusCode string
	StatusMsg  string
	ReasonCode string
	ReasonMsg  string
	Timestamp  time.Time
	Tape       string
}

TradingStatus is a halt or resume status for a security

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL