centrifuge

package module
v0.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2021 License: MIT Imports: 15 Imported by: 5

README

GoDoc

Websocket client for Centrifuge library and Centrifugo server.

There is no v1 release of this library yet – API still evolves. At the moment patch version updates only contain backwards compatible changes, minor version updates can have backwards incompatible API changes.

Feature matrix

  • connect to server using JSON protocol format
  • connect to server using Protobuf protocol format
  • connect with token (JWT)
  • connect with custom header
  • automatic reconnect in case of errors, network problems etc
  • an exponential backoff for reconnect
  • connect and disconnect events
  • handle disconnect reason
  • subscribe on a channel and handle asynchronous Publications
  • handle Join and Leave messages
  • handle Unsubscribe notifications
  • reconnect on subscribe timeout
  • publish method of Subscription
  • unsubscribe method of Subscription
  • presence method of Subscription
  • presence stats method of Subscription
  • history method of Subscription
  • top-level publish method
  • top-level presence method
  • top-level presence stats method
  • top-level history method
  • top-level unsubscribe method
  • send asynchronous messages to server
  • handle asynchronous messages from server
  • send RPC commands
  • publish to channel without being subscribed
  • subscribe to private channels with token (JWT)
  • connection token (JWT) refresh
  • private channel subscription token (JWT) refresh
  • handle connection expired error
  • handle subscription expired error
  • ping/pong to find broken connection
  • message recovery mechanism for client-side subscriptions
  • server-side subscriptions
  • message recovery mechanism for server-side subscriptions
  • history stream pagination

Run tests

First run Centrifugo instance:

docker run -d -p 8000:8000 centrifugo/centrifugo:latest centrifugo --client_insecure

Then run go test

Documentation

Index

Constants

View Source
const (
	DISCONNECTED = iota
	CONNECTING
	CONNECTED
	CLOSED
)

Describe client connection statuses.

View Source
const (
	DefaultHandshakeTimeout     = time.Second
	DefaultReadTimeout          = 5 * time.Second
	DefaultWriteTimeout         = time.Second
	DefaultPingInterval         = 25 * time.Second
	DefaultPrivateChannelPrefix = "$"
	DefaultName                 = "go"
)

Config defaults.

View Source
const (
	UNSUBSCRIBED = iota
	SUBSCRIBING
	SUBSCRIBED
	SUBERROR
	SUBCLOSED
)

Different states of Subscription.

Variables

View Source
var (
	// ErrTimeout returned if operation timed out.
	ErrTimeout = errors.New("timeout")
	// ErrClientClosed can be returned if client already closed.
	ErrClientClosed = errors.New("client closed")
	// ErrClientDisconnected can be returned if client goes to
	// disconnected state while operation in progress.
	ErrClientDisconnected = errors.New("client disconnected")
	// ErrReconnectFailed returned when reconnect to server failed (never
	// happen by default since client keeps reconnecting forever).
	ErrReconnectFailed = errors.New("reconnect failed")
	// ErrDuplicateSubscription returned if subscription to the same channel
	// already registered in current client instance. This is due to the fact
	// that server does not allow subscribing to the same channel twice for
	// the same connection.
	ErrDuplicateSubscription = errors.New("duplicate subscription")
	// ErrSubscribeClosed returned if Subscription was closed.
	ErrSubscriptionClosed = errors.New("subscription closed")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents client connection to Centrifugo or Centrifuge library based server. It provides methods to set various event handlers, subscribe to channels, call RPC commands etc. Call client Connect method to trigger actual connection with server. Call client Close method to clean up state when you don't need client instance anymore.

func New

func New(u string, config Config) *Client

New initializes Client. After client initialized call its Connect method to trigger connection establishment with server.

func (*Client) Close

func (c *Client) Close() error

Close closes Client forever and cleans up state.

func (*Client) Connect

func (c *Client) Connect() error

Connect dials to server and sends connect message. Will return an error if first dial with server failed. In case of failure client will automatically reconnect with exponential backoff.

func (*Client) Disconnect

func (c *Client) Disconnect() error

Disconnect client from server.

func (*Client) History added in v0.7.4

func (c *Client) History(channel string) (HistoryResult, error)

History for a channel without being subscribed.

func (*Client) NamedRPC added in v0.7.4

func (c *Client) NamedRPC(method string, data []byte) (RPCResult, error)

NamedRPC allows to make RPC – send data to server ant wait for response. RPC handler must be registered on server. In contrast to RPC method it allows to pass method name.

func (*Client) NewSubscription added in v0.2.0

func (c *Client) NewSubscription(channel string) (*Subscription, error)

NewSubscription allocates new Subscription on a channel. As soon as Subscription successfully created Client keeps reference to it inside internal map registry to manage automatic resubscribe on reconnect. If you ended up with Subscription then you can free resources by calling Subscription.Close method.

func (*Client) OnConnect added in v0.2.0

func (c *Client) OnConnect(handler ConnectHandler)

OnConnect is a function to handle connect event.

func (*Client) OnDisconnect added in v0.2.0

func (c *Client) OnDisconnect(handler DisconnectHandler)

OnDisconnect is a function to handle disconnect event.

func (*Client) OnError added in v0.2.0

func (c *Client) OnError(handler ErrorHandler)

OnError is a function that will receive unhandled errors for logging.

func (*Client) OnMessage added in v0.2.0

func (c *Client) OnMessage(handler MessageHandler)

OnMessage allows to process async message from server to client.

func (*Client) OnPrivateSub added in v0.2.0

func (c *Client) OnPrivateSub(handler PrivateSubHandler)

OnPrivateSub needed to handle private channel subscriptions.

func (*Client) OnRefresh added in v0.2.0

func (c *Client) OnRefresh(handler RefreshHandler)

OnRefresh handles refresh event when client's credentials expired and must be refreshed.

func (*Client) OnServerJoin added in v0.7.4

func (c *Client) OnServerJoin(handler ServerJoinHandler)

OnServerJoin sets function to handle Join event from server-side subscriptions.

func (*Client) OnServerLeave added in v0.7.4

func (c *Client) OnServerLeave(handler ServerLeaveHandler)

OnServerLeave sets function to handle Leave event from server-side subscriptions.

func (*Client) OnServerPublish added in v0.7.4

func (c *Client) OnServerPublish(handler ServerPublishHandler)

OnServerPublish sets function to handle Publications from server-side subscriptions.

func (*Client) OnServerSubscribe added in v0.7.4

func (c *Client) OnServerSubscribe(handler ServerSubscribeHandler)

OnServerSubscribe sets function to handle server-side subscription subscribe events.

func (*Client) OnServerUnsubscribe added in v0.7.4

func (c *Client) OnServerUnsubscribe(handler ServerUnsubscribeHandler)

OnServerUnsubscribe sets function to handle unsubscribe from server-side subscriptions.

func (*Client) Presence added in v0.7.4

func (c *Client) Presence(channel string) (PresenceResult, error)

Presence for a channel without being subscribed.

func (*Client) PresenceStats added in v0.7.4

func (c *Client) PresenceStats(channel string) (PresenceStatsResult, error)

PresenceStats for a channel without being subscribed.

func (*Client) Publish

func (c *Client) Publish(channel string, data []byte) (PublishResult, error)

Publish data into channel.

func (*Client) RPC

func (c *Client) RPC(data []byte) (RPCResult, error)

RPC allows to make RPC – send data to server and wait for response. RPC handler must be registered on server.

func (*Client) Send

func (c *Client) Send(data []byte) error

Send message to server without waiting for response. Message handler must be registered on server.

func (*Client) SetConnectData

func (c *Client) SetConnectData(data []byte)

SetConnectData allows to set data to send in connect command.

func (*Client) SetHeader

func (c *Client) SetHeader(key, value string)

SetHeader allows to set custom header to be sent in Upgrade HTTP request.

func (*Client) SetToken

func (c *Client) SetToken(token string)

SetToken allows to set connection token to let client authenticate itself on connect.

func (*Client) SetURL added in v0.7.4

func (c *Client) SetURL(url string)

SetURL allows to set URL to allow dynamic urls to be assigned to after a disconnect for example.

type ClientInfo

type ClientInfo struct {
	// Client is a client unique id.
	Client string
	// User is an ID of authenticated user. Zero value means anonymous user.
	User string
	// ConnInfo is an additional information about connection.
	ConnInfo []byte
	// ChanInfo is an additional information about connection in context of
	// channel subscription.
	ChanInfo []byte
}

ClientInfo contains information about client connection.

type Config

type Config struct {
	// NetDialContext specifies the dial function for creating TCP connections. If
	// NetDialContext is nil, net.DialContext is used.
	NetDialContext func(ctx context.Context, network, addr string) (net.Conn, error)

	// PrivateChannelPrefix is private channel prefix.
	PrivateChannelPrefix string
	// ReadTimeout is how long to wait read operations to complete.
	ReadTimeout time.Duration
	// WriteTimeout is Websocket write timeout.
	WriteTimeout time.Duration
	// PingInterval is how often to send ping commands to server.
	PingInterval time.Duration
	// HandshakeTimeout specifies the duration for the handshake to complete.
	HandshakeTimeout time.Duration
	// TLSConfig specifies the TLS configuration to use with tls.Client.
	// If nil, the default configuration is used.
	TLSConfig *tls.Config
	// EnableCompression specifies if the client should attempt to negotiate
	// per message compression (RFC 7692). Setting this value to true does not
	// guarantee that compression will be supported. Currently only "no context
	// takeover" modes are supported.
	EnableCompression bool
	// CookieJar specifies the cookie jar.
	// If CookieJar is nil, cookies are not sent in requests and ignored
	// in responses.
	CookieJar http.CookieJar
	// Header specifies custom HTTP Header to send.
	Header http.Header
	// Name allows setting client name. You should only use a limited
	// amount of client names throughout your applications – i.e. don't
	// make it unique per user for example, this name semantically represents
	// an environment from which client connects.
	Name string
	// Version allows setting client version. This is an application
	// specific information. By default no version set.
	Version string
}

Config contains various client options.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns Config with default options.

type ConnectEvent

type ConnectEvent struct {
	ClientID string
	Version  string
	Data     []byte
}

ConnectEvent is a connect event context passed to OnConnect callback.

type ConnectHandler

type ConnectHandler interface {
	OnConnect(*Client, ConnectEvent)
}

ConnectHandler is an interface describing how to handle connect event.

type DisconnectEvent

type DisconnectEvent struct {
	Reason    string
	Reconnect bool
}

DisconnectEvent is a disconnect event context passed to OnDisconnect callback.

type DisconnectHandler

type DisconnectHandler interface {
	OnDisconnect(*Client, DisconnectEvent)
}

DisconnectHandler is an interface describing how to handle disconnect event.

type Error

type Error struct {
	Code    uint32
	Message string
}

Error represents protocol-level error.

func (Error) Error added in v0.7.4

func (e Error) Error() string

type ErrorEvent

type ErrorEvent struct {
	// TODO: return error type here instead of string
	// so user code could distinguish various types of possible errors?
	Message string
}

ErrorEvent is an error event context passed to OnError callback.

type ErrorHandler

type ErrorHandler interface {
	OnError(*Client, ErrorEvent)
}

ErrorHandler is an interface describing how to handle error event.

type HistoryResult added in v0.7.4

type HistoryResult struct {
	Publications []Publication
}

HistoryResult contains the result of history op.

type JoinEvent

type JoinEvent struct {
	ClientInfo
}

JoinEvent has info about user who joined channel.

type JoinHandler

type JoinHandler interface {
	OnJoin(*Subscription, JoinEvent)
}

JoinHandler is a function to handle join messages.

type LeaveEvent

type LeaveEvent struct {
	ClientInfo
}

LeaveEvent has info about user who left channel.

type LeaveHandler

type LeaveHandler interface {
	OnLeave(*Subscription, LeaveEvent)
}

LeaveHandler is a function to handle leave messages.

type MessageEvent

type MessageEvent struct {
	Data []byte
}

MessageEvent is an event for async message from server to client.

type MessageHandler

type MessageHandler interface {
	OnMessage(*Client, MessageEvent)
}

MessageHandler is an interface describing how to handle async message from server.

type PresenceResult added in v0.7.4

type PresenceResult struct {
	Presence map[string]ClientInfo
}

HistoryResult contains the result of presence op.

type PresenceStats

type PresenceStats struct {
	NumClients int
	NumUsers   int
}

PresenceStats represents short presence information.

type PresenceStatsResult added in v0.7.4

type PresenceStatsResult struct {
	PresenceStats
}

PresenceStatsResult wraps presence stats.

type PrivateSubEvent

type PrivateSubEvent struct {
	ClientID string
	Channel  string
}

PrivateSubEvent contains info required to create PrivateSign when client wants to subscribe on private channel.

type PrivateSubHandler

type PrivateSubHandler interface {
	OnPrivateSub(*Client, PrivateSubEvent) (string, error)
}

PrivateSubHandler is an interface describing how to handle private subscription request.

type Publication

type Publication struct {
	// Offset is an incremental position number inside history stream.
	// Zero value means that channel does not maintain Publication stream.
	Offset uint64
	// Data published to channel.
	Data []byte
	// Info is an optional information about client connection published
	// this data to channel.
	Info *ClientInfo
}

Publication is a data sent to channel.

type PublishEvent

type PublishEvent struct {
	Publication
}

PublishEvent has info about received channel Publication.

type PublishHandler

type PublishHandler interface {
	OnPublish(*Subscription, PublishEvent)
}

PublishHandler is a function to handle messages published in channels.

type PublishResult added in v0.7.4

type PublishResult struct{}

PublishResult contains the result of publish.

type RPCResult added in v0.7.4

type RPCResult struct {
	Data []byte
}

RPCResult contains data returned from server as RPC result.

type RefreshHandler

type RefreshHandler interface {
	OnRefresh(*Client) (string, error)
}

RefreshHandler is an interface describing how to handle token refresh event.

type ServerJoinEvent added in v0.7.4

type ServerJoinEvent struct {
	Channel string
	ClientInfo
}

ServerJoinEvent has info about user who left channel.

type ServerJoinHandler added in v0.7.4

type ServerJoinHandler interface {
	OnServerJoin(*Client, ServerJoinEvent)
}

ServerJoinHandler is an interface describing how to handle Join events from server-side subscriptions.

type ServerLeaveEvent added in v0.7.4

type ServerLeaveEvent struct {
	Channel string
	ClientInfo
}

ServerLeaveEvent has info about user who joined channel.

type ServerLeaveHandler added in v0.7.4

type ServerLeaveHandler interface {
	OnServerLeave(*Client, ServerLeaveEvent)
}

ServerLeaveHandler is an interface describing how to handle Leave events from server-side subscriptions.

type ServerPublishEvent added in v0.7.4

type ServerPublishEvent struct {
	Channel string
	Publication
}

ServerPublishEvent has info about received channel Publication.

type ServerPublishHandler added in v0.7.4

type ServerPublishHandler interface {
	OnServerPublish(*Client, ServerPublishEvent)
}

ServerPublishHandler is an interface describing how to handle Publication from server-side subscriptions.

type ServerSubscribeEvent added in v0.7.4

type ServerSubscribeEvent struct {
	Channel      string
	Resubscribed bool
	Recovered    bool
}

type ServerSubscribeHandler added in v0.7.4

type ServerSubscribeHandler interface {
	OnServerSubscribe(*Client, ServerSubscribeEvent)
}

ServerSubscribeHandler is an interface describing how to handle subscribe events from server-side subscriptions.

type ServerUnsubscribeEvent added in v0.7.4

type ServerUnsubscribeEvent struct {
	Channel string
}

ServerUnsubscribeEvent is an event passed to unsubscribe event handler.

type ServerUnsubscribeHandler added in v0.7.4

type ServerUnsubscribeHandler interface {
	OnServerUnsubscribe(*Client, ServerUnsubscribeEvent)
}

ServerUnsubscribeHandler is an interface describing how to handle unsubscribe events from server-side subscriptions.

type SubscribeErrorEvent

type SubscribeErrorEvent struct {
	Error string
}

SubscribeErrorEvent is a subscribe error event context passed to event callback.

type SubscribeErrorHandler

type SubscribeErrorHandler interface {
	OnSubscribeError(*Subscription, SubscribeErrorEvent)
}

SubscribeErrorHandler is a function to handle subscribe error event.

type SubscribeSuccessEvent

type SubscribeSuccessEvent struct {
	Resubscribed bool
	Recovered    bool
}

SubscribeSuccessEvent is a subscribe success event context passed to event callback.

type SubscribeSuccessHandler

type SubscribeSuccessHandler interface {
	OnSubscribeSuccess(*Subscription, SubscribeSuccessEvent)
}

SubscribeSuccessHandler is a function to handle subscribe success event.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription represents client subscription to channel.

func (*Subscription) Channel

func (s *Subscription) Channel() string

Channel returns subscription channel.

func (*Subscription) Close added in v0.7.4

func (s *Subscription) Close() error

Close unsubscribes from channel and removes Subscription from Client's subscription map.

func (*Subscription) History

func (s *Subscription) History() (HistoryResult, error)

History allows to extract channel history.

func (*Subscription) OnJoin added in v0.2.0

func (s *Subscription) OnJoin(handler JoinHandler)

OnJoin allows to set JoinHandler to SubEventHandler.

func (*Subscription) OnLeave added in v0.2.0

func (s *Subscription) OnLeave(handler LeaveHandler)

OnLeave allows to set LeaveHandler to SubEventHandler.

func (*Subscription) OnPublish added in v0.2.0

func (s *Subscription) OnPublish(handler PublishHandler)

OnPublish allows to set PublishHandler to SubEventHandler.

func (*Subscription) OnSubscribeError added in v0.2.0

func (s *Subscription) OnSubscribeError(handler SubscribeErrorHandler)

OnSubscribeError allows to set SubscribeErrorHandler to SubEventHandler.

func (*Subscription) OnSubscribeSuccess added in v0.2.0

func (s *Subscription) OnSubscribeSuccess(handler SubscribeSuccessHandler)

OnSubscribeSuccess allows to set SubscribeSuccessHandler to SubEventHandler.

func (*Subscription) OnUnsubscribe added in v0.2.0

func (s *Subscription) OnUnsubscribe(handler UnsubscribeHandler)

OnUnsubscribe allows to set UnsubscribeHandler to SubEventHandler.

func (*Subscription) Presence

func (s *Subscription) Presence() (PresenceResult, error)

Presence allows to extract channel history.

func (*Subscription) PresenceStats

func (s *Subscription) PresenceStats() (PresenceStatsResult, error)

PresenceStats allows to extract channel presence stats.

func (*Subscription) Publish

func (s *Subscription) Publish(data []byte) (PublishResult, error)

Publish allows to publish data to channel.

func (*Subscription) Subscribe

func (s *Subscription) Subscribe() error

Subscribe allows to subscribe again after unsubscribing.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe allows to unsubscribe from channel.

type UnsubscribeEvent

type UnsubscribeEvent struct{}

UnsubscribeEvent is an event passed to unsubscribe event handler.

type UnsubscribeHandler

type UnsubscribeHandler interface {
	OnUnsubscribe(*Subscription, UnsubscribeEvent)
}

UnsubscribeHandler is a function to handle unsubscribe event.

type UnsubscribeResult added in v0.7.4

type UnsubscribeResult struct{}

Directories

Path Synopsis
examples module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL