phx

package module
v0.0.0-...-20e4138 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2022 License: MIT Imports: 13 Imported by: 0

README

Phx - A Phoenix Channels client for Go

Go Reference

This is a comprehensive client for Phoenix Channels written for Go applications. The goal of this project is to be a reliable, resilient, full-featured client library for connecting to Phoenix servers over websockets and being able to push and receive events from one or more channels in a performant and concurrent manner.

This module is based off of the official JavaScript client implementation, except where deviations were needed due to being written in Go. But if you're familiar with connecting to Phoenix Channels in JS, then this library should feel right at home.

Installation

This module requires Go 1.18 or later.

go get github.com/nshafer/phx

Documentation

API documentation can be viewed via godoc at API Documentation.

Examples are in examples/.

Features

  • Supports websockets as the transport method. Longpoll is not currently supported, nor are there plans to implement it.
  • Supports the JSONSerializerV2 serializer. (JSONSerializerV1 also available if preferred.)
  • All event handlers are simple functions that are registered with the Socket, Channels or Pushes. No complicated interfaces to implement.
  • Completely concurrent using many goroutines in the background so that your main thread is not blocked. All callbacks will run in separate goroutines.
  • Supports setting connection parameters, headers, proxy, etc on the main websocket connection.
  • Supports passing parameters when joining a Channel
  • Pluggable Transport, TransportHandler, Logger if needed.

Simple example

For a more complete example, see examples/simple/simple.go. This example does not include error handling.

// Connect to socket
endPoint, _ := url.Parse("ws://localhost:4000/socket?foo=bar")
socket := phx.NewSocket(endPoint)
_ = socket.Connect()

// Join a channel
channel := socket.Channel("test:lobby", nil)
join, _ := channel.Join()
join.Receive("ok", func(response any) {
  log.Println("Joined channel:", channel.Topic(), response)
})

// Send an event
ping, _ := channel.Push("ping", "pong")
ping.Receive("ok", func(response any) {
  log.Println("Ping:", response)
})

// Listen for pushes or broadcasts from the server
channel.On("shout", func(payload any) {
  log.Println("received shout:", payload)
})

CLI example

There is also a simple CLI example for interactively using the library to connect to any server/channel in examples/cli/cli.go.

Not implemented currently:

  • Longpoll transport.
  • Binary messages.

Documentation

Overview

Package phx is a comprehensive client for Phoenix Channels written for Go applications.

Index

Constants

View Source
const (
	LogDebug   LoggerLevel = 0
	LogInfo                = 1
	LogWarning             = 2
	LogError               = 3
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// PushTimeout is the time that a Push waits before considering it defunct and triggering a "timeout" event.
	PushTimeout time.Duration

	// RejoinAfterFunc is a function that returns the duration to wait before rejoining based on given tries
	RejoinAfterFunc func(tries int) time.Duration
	// contains filtered or unexported fields
}

A Channel is a unique connection to the given Topic on the server. You can have many Channels connected over one Socket, each handling messages independently.

func NewChannel

func NewChannel(topic string, params map[string]string, socket *Socket) *Channel

NewChannel creates a new Channel attached to the Socket. If there is already a Channel for the given topic, that channel is returned instead of creating a new one.

func (*Channel) Clear

func (c *Channel) Clear(event string)

Clear removes all bindings for the given event

func (*Channel) IsClosed

func (c *Channel) IsClosed() bool

func (*Channel) IsErrored

func (c *Channel) IsErrored() bool

func (*Channel) IsJoined

func (c *Channel) IsJoined() bool

func (*Channel) IsJoining

func (c *Channel) IsJoining() bool

func (*Channel) IsLeaving

func (c *Channel) IsLeaving() bool

func (*Channel) IsRemoved

func (c *Channel) IsRemoved() bool

func (*Channel) Join

func (c *Channel) Join() (*Push, error)

Join will send a JoinEvent to the server and attempt to join the topic of this Channel. A Push is returned to which you can attach event handlers to with Receive, such as "ok", "error" and "timeout".

func (*Channel) JoinRef

func (c *Channel) JoinRef() Ref

JoinRef returns the JoinRef for this channel, which is the Ref of the Push returned by Join

func (*Channel) Leave

func (c *Channel) Leave() (*Push, error)

Leave will send a LeaveEvent to the server to leave the topic of this Channel A Push is returned to which you can attach event handlers to with Receive, such as "ok", "error" and "timeout".

func (*Channel) Off

func (c *Channel) Off(bindingRef Ref)

Off removes the callback for the given bindingRef, as returned by On, OnRef, OnJoin, OnClose, OnError.

func (*Channel) On

func (c *Channel) On(event string, callback func(payload any)) (bindingRef Ref)

On will register the given callback for all matching events received on this Channel. Returns a unique Ref that can be used to cancel this callback via Off.

func (*Channel) OnClose

func (c *Channel) OnClose(callback func(payload any)) (bindingRef Ref)

OnClose will register the given callback for whenever this Channel is closed. Returns a unique Ref that can be used to cancel this callback via Off.

func (*Channel) OnError

func (c *Channel) OnError(callback func(payload any)) (bindingRef Ref)

OnError will register the given callback for whenever this channel gets an ErrorEvent message, such as the channel process crashing. Returns a unique Ref that can be used to cancel this callback via Off.

func (*Channel) OnJoin

func (c *Channel) OnJoin(callback func(payload any)) (bindingRef Ref)

OnJoin will register the given callback for whenever this Channel joins successfully to the server. Returns a unique Ref that can be used to cancel this callback via Off.

func (*Channel) OnRef

func (c *Channel) OnRef(ref Ref, event string, callback func(payload any)) (bindingRef Ref)

OnRef will register the given callback for all matching events only if the ref also matches. This is mostly used by Push so that it can get ReplyEvents for its messages. Returns a unique Ref that can be used to cancel this callback via Off.

func (*Channel) Push

func (c *Channel) Push(event string, payload any) (*Push, error)

Push will send the given Event and Payload to the server. A Push is returned to which you can attach event handlers to with Receive() so you can process replies.

func (*Channel) Remove

func (c *Channel) Remove() error

Remove will remove this channel from the Socket. Once this is done, the channel will no longer receive any kind of messages, and be essentially orphaned, so it is important that you also remove all references to the Channel so that it can be garbage collected.

func (*Channel) State

func (c *Channel) State() ChannelState

State returns the current ChannelState of this channel. Can also use Is*() functions

func (*Channel) Topic

func (c *Channel) Topic() string

Topic returns the topic for this channel

type ChannelState

type ChannelState int
const (
	ChannelClosed ChannelState = iota
	ChannelErrored
	ChannelJoined
	ChannelJoining
	ChannelLeaving
	ChannelRemoved
)

func (ChannelState) String

func (c ChannelState) String() string

type ConnectionState

type ConnectionState int
const (
	ConnectionConnecting ConnectionState = iota
	ConnectionOpen
	ConnectionClosing
	ConnectionClosed
)

func (ConnectionState) String

func (s ConnectionState) String() string

type CustomLogger

type CustomLogger struct {
	// contains filtered or unexported fields
}

CustomLogger is a logger that logs to the given log.Logger if the message is >= logLevel

func NewCustomLogger

func NewCustomLogger(level LoggerLevel, logger *log.Logger) *CustomLogger

func NewSimpleLogger

func NewSimpleLogger(logLevel LoggerLevel) *CustomLogger

NewSimpleLogger returns a CustomLogger that uses the 'log' package's DefaultLogger to log messages above the given logLevel

func (*CustomLogger) Print

func (l *CustomLogger) Print(level LoggerLevel, kind string, v ...any)

func (*CustomLogger) Printf

func (l *CustomLogger) Printf(level LoggerLevel, kind string, format string, v ...any)

func (*CustomLogger) Println

func (l *CustomLogger) Println(level LoggerLevel, kind string, v ...any)

type Event

type Event string

Event represents a phoenix channel event for a message, and can be almost anything the user wants, such as "ping", "shout", "talk", etc. There are several reserved for control messages for protocol overhead.

const (
	// JoinEvent is sent when we join a channel.
	JoinEvent Event = "phx_join"

	// CloseEvent is sent by the server when a channel is closed, such as before shutting down the socket
	// This event is also generated by the client whenever `channel.Leave()` is called. Triggers channel.OnClose().
	CloseEvent Event = "phx_close"

	// ErrorEvent is sent by the server whenever something catastrophic happens on the server side, such as the channel
	// process crashing, or attempting to join a channel already joined.
	ErrorEvent Event = "phx_error"

	// ReplyEvent is sent by the server in reply to any event sent by the client.
	ReplyEvent Event = "phx_reply"

	// LeaveEvent is sent by the client when we leave a channel and unsubscribe to a topic.
	LeaveEvent Event = "phx_leave"

	// HeartBeatEvent is a special message for heartbeats on the special topic "phoenix"
	HeartBeatEvent Event = "heartbeat"
)

Special/reserved Events

type JSONMessage

type JSONMessage struct {
	JoinRef string `json:"join_ref,omitempty"`
	Ref     string `json:"ref"`
	Topic   string `json:"topic"`
	Event   string `json:"event"`
	Payload any    `json:"payload"`
}

JSONMessage is a JSON representation of a Message

func NewJSONMessage

func NewJSONMessage(msg Message) *JSONMessage

func (*JSONMessage) Message

func (jm *JSONMessage) Message() (*Message, error)

type JSONSerializerV1

type JSONSerializerV1 struct{}

func NewJSONSerializerV1

func NewJSONSerializerV1() *JSONSerializerV1

func (*JSONSerializerV1) Decode

func (s *JSONSerializerV1) Decode(data []byte) (*Message, error)

func (*JSONSerializerV1) Encode

func (s *JSONSerializerV1) Encode(msg *Message) ([]byte, error)

func (*JSONSerializerV1) Version

func (s *JSONSerializerV1) Version() string

type JSONSerializerV2

type JSONSerializerV2 struct{}

func NewJSONSerializerV2

func NewJSONSerializerV2() *JSONSerializerV2

func (*JSONSerializerV2) Decode

func (s *JSONSerializerV2) Decode(data []byte) (*Message, error)

func (*JSONSerializerV2) Encode

func (s *JSONSerializerV2) Encode(msg *Message) ([]byte, error)

func (*JSONSerializerV2) Version

func (s *JSONSerializerV2) Version() string

type Logger

type Logger interface {
	Print(level LoggerLevel, kind string, v ...any)
	Println(level LoggerLevel, kind string, v ...any)
	Printf(level LoggerLevel, kind string, format string, v ...any)
}

type LoggerLevel

type LoggerLevel int

func (LoggerLevel) String

func (level LoggerLevel) String() string

type Message

type Message struct {
	// JoinRef is the unique Ref sent when a JoinEvent is sent to join a channel. JoinRef can also be though of as
	// a Channel ref. If present, this message is tied to the given instance of a Channel.
	JoinRef Ref

	// Ref is the unique Ref for a given message. When sending a new Message, a Ref should be generated. When a reply
	// is sent back from the server, it will have Ref set to match the Message it is a reply to.
	Ref Ref

	// Topic is the Channel topic this message is in relation to, as defined on the server side.
	Topic string

	// Event is a string description of what this message is about, and can be set to anything the user desires.
	// Some Events are reserved for specific protocol messages as defined in event.go.
	Event string

	// Payload is any arbitrary data attached to the message.
	Payload any
}

Message is a message sent or received via the socket after encoding/decoding

func (Message) MarshalJSON

func (m Message) MarshalJSON() ([]byte, error)

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(data []byte) error

type NoopLogger

type NoopLogger int

NoopLogger is a logger that does nothing

func NewNoopLogger

func NewNoopLogger() *NoopLogger

func (*NoopLogger) Print

func (l *NoopLogger) Print(_ LoggerLevel, _ string, _ ...any)

func (*NoopLogger) Printf

func (l *NoopLogger) Printf(_ LoggerLevel, _ string, _ string, _ ...any)

func (*NoopLogger) Println

func (l *NoopLogger) Println(_ LoggerLevel, _ string, _ ...any)

type Push

type Push struct {
	// Event is the string event you want to push to the server.
	Event string

	// Payload is whatever payload you want to attach to the push. Must be JSON serializable.
	Payload any

	// Timeout is the time to wait for a reply before triggering a "timeout" event.
	Timeout time.Duration

	Ref Ref
	// contains filtered or unexported fields
}

Push allows you to send an Event to the server and easily monitor for replies, errors or timeouts. A Push is typically created by Channel.Join, Channel.Leave and Channel.Push.

func NewPush

func NewPush(channel *Channel, event string, payload any, timeout time.Duration) *Push

NewPush gets a new Push ready to send and allows you to attach event handlers for replies, errors, timeouts.

func (*Push) IsSent

func (p *Push) IsSent() bool

func (*Push) Receive

func (p *Push) Receive(status string, callback pushCallback)

Receive registers the given event handler for the given status. Built in Events such as Join, Leave will respond with "ok", "error" and "timeout". Custom event handlers (handle_in/3) in your Channel on the server can respond with any string event they want. If a custom event handler (handle_in/3) does not reply (returns :noreply) then the only events that will trigger here are "error" and "timeout".

func (*Push) Send

func (p *Push) Send() error

Send will actually push the event to the server.

type Ref

type Ref uint64

Ref is a unique reference integer that is atomically incremented and will wrap at 64 bits + 1

func ParseRef

func ParseRef(ref any) (Ref, error)

type Serializer

type Serializer interface {
	Version() string
	Encode(*Message) ([]byte, error)
	Decode([]byte) (*Message, error)
}

A Serializer describes the required interface for serializers

type Socket

type Socket struct {
	// Endpoint is the URL to connect to. Include parameters here.
	EndPoint *url.URL

	// RequestHeader is an http.Header map to send in the initial connection.
	RequestHeader http.Header

	// Transport is the main transport mechanism to use to connect to the server. Only Websocket is implemented currently.
	Transport Transport

	// Specify a logger for Errors, Warnings, Info and Debug messages. Defaults to phx.NoopLogger.
	Logger Logger

	// Timeout for initial handshake with server.
	ConnectTimeout time.Duration

	// ReconnectAfterFunc is a function that returns the time to delay reconnections based on the given tries
	ReconnectAfterFunc func(tries int) time.Duration

	// HeartbeatInterval is the duration between heartbeats sent to the server to keep the connection alive.
	HeartbeatInterval time.Duration

	// Serializer encodes/decodes messages to/from the server. Must work with a Serializer on the server.
	// Defaults to JSONSerializerV2
	Serializer Serializer
	// contains filtered or unexported fields
}

A Socket represents a connection to the server via the given Transport. Many Channels can be connected over a single Socket.

func NewSocket

func NewSocket(endPoint *url.URL) *Socket

NewSocket creates a Socket that connects to the given endPoint using the default websocket Transport. After creating the socket, several options can be set, such as Transport, Logger, Serializer and timeouts.

If a custom websocket.Dialer is needed, such as to set up a Proxy, then create a custom WebSocket

func (*Socket) Channel

func (s *Socket) Channel(topic string, params map[string]string) *Channel

Channel creates a new instance of phx.Channel, or returns an existing instance if it had already been created.

func (*Socket) Connect

func (s *Socket) Connect() error

Connect will start connection attempts with the server until successful or canceled with Disconnect.

func (*Socket) ConnectionState

func (s *Socket) ConnectionState() ConnectionState

func (*Socket) Disconnect

func (s *Socket) Disconnect() error

Disconnect or stop trying to Connect to server.

func (*Socket) IsConnected

func (s *Socket) IsConnected() bool

func (*Socket) IsConnectedOrConnecting

func (s *Socket) IsConnectedOrConnecting() bool

func (*Socket) MakeRef

func (s *Socket) MakeRef() Ref

MakeRef returns a unique Ref for this Socket.

func (*Socket) Off

func (s *Socket) Off(ref Ref)

Off cancels the given callback from being called.

func (*Socket) OnClose

func (s *Socket) OnClose(callback func()) Ref

OnClose registers the given callback to be called whenever the Socket is closed Returns a unique Ref that can be used to cancel this callback via Off.

func (*Socket) OnError

func (s *Socket) OnError(callback func(error)) Ref

OnError registers the given callback to be called whenever the Socket has an error Returns a unique Ref that can be used to cancel this callback via Off.

func (*Socket) OnMessage

func (s *Socket) OnMessage(callback func(Message)) Ref

OnMessage registers the given callback to be called whenever the server sends a message Returns a unique Ref that can be used to cancel this callback via Off.

func (*Socket) OnOpen

func (s *Socket) OnOpen(callback func()) Ref

OnOpen registers the given callback to be called whenever the Socket is opened successfully. Returns a unique Ref that can be used to cancel this callback via Off.

func (*Socket) Push

func (s *Socket) Push(topic string, event string, payload any, joinRef Ref) (Ref, error)

func (*Socket) PushMessage

func (s *Socket) PushMessage(msg Message) error

func (*Socket) Reconnect

func (s *Socket) Reconnect() error

Reconnect with the server.

type Transport

type Transport interface {
	Connect(endPoint *url.URL, requestHeader http.Header, connectTimeout time.Duration) error
	Disconnect() error
	Reconnect() error
	IsConnected() bool
	ConnectionState() ConnectionState
	Send([]byte) error
}

Transport is used by a Socket to actually connect to the server.

type TransportHandler

type TransportHandler interface {
	// contains filtered or unexported methods
}

TransportHandler defines the interface that handles the activity of the Transport. This is usually just a Socket, but a custom TransportHandler can be implemented to stand in between a Transport and Socket.

type Websocket

type Websocket struct {
	Dialer  *websocket.Dialer
	Handler TransportHandler
	// contains filtered or unexported fields
}

Websocket is a Transport that connects to the server via Websockets.

func NewWebsocket

func NewWebsocket(handler TransportHandler) *Websocket

func (*Websocket) Connect

func (w *Websocket) Connect(endPoint *url.URL, requestHeader http.Header, connectTimeout time.Duration) error

func (*Websocket) ConnectionState

func (w *Websocket) ConnectionState() ConnectionState

func (*Websocket) Disconnect

func (w *Websocket) Disconnect() error

func (*Websocket) IsConnected

func (w *Websocket) IsConnected() bool

func (*Websocket) Reconnect

func (w *Websocket) Reconnect() error

func (*Websocket) Send

func (w *Websocket) Send(msg []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL