pubsub

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2023 License: MIT Imports: 9 Imported by: 0

README

pubsub

godoc

golang websocket pubsub server and client

Built using the gorilla/websocket library.

Example

parentCtx, parentCancel := context.WithCancel(context.Background())
defer parentCancel()

// Variables
pubSubAuthToken := "abc"
pubAuthToken := "abc123"
subAuthToken := "abc123456"
wsHost := "localhost:8003"
isSecure := false
wsPath := "/ws"
id := "1"

// Create mux router
r := mux.NewRouter().StrictSlash(true)

// Start websockets server with integer channels
s := NewServer(parentCtx)
r.HandleFunc(fmt.Sprintf("%s/{id:[0-9]+}", wsPath), func(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    id, ok := vars["id"]
    if !ok {
        panic("no websockets id")
    }
  
    // Auth
    bearerToken := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
    switch bearerToken {
    case pubSubAuthToken:
        err := s.HandlePubSub(id, w, r)
        if err != nil {
            fmt.Println(err.Error())
            panic("websockets error: %s", err.Error())
        }
    case pubAuthToken:
        err := s.HandlePub(id, w, r)
        if err != nil {
            fmt.Println(err.Error())
            panic("websockets error: %s", err.Error())
        }
    case subAuthToken:
        err := s.HandleSub(id, w, r)
        if err != nil {
            fmt.Println(err.Error())
            panic("websockets error: %s", err.Error())
        }
    default:
        panic("auth failure: (actual: %s)", bearerToken)
    }
})

// Create http server
httpServer := http.Server{
    Handler: r,
    Addr:    wsHost,
    //WriteTimeout: 15 * time.Second,
    //ReadTimeout:  15 * time.Second,
}
defer httpServer.Close()

go func() {
    ctx, cancel := context.WithCancel(parentCtx)
    defer cancel()
  
    // Create subscription only websockets client
    // Wait for single pub and 10 successful client pub messages
    openClientPubCount := 0
    newClientPubCount := 0
    subClient := NewClientWithBearerToken(
        wsHost,
        subAuthToken,
        isSecure,
        wsPath,
        id,
    )
    go func() {
      err := subClient.Start(ctx, func(message []byte) (err error) {
          switch string(message) {
          case "abc":
              openClientPubCount += 1
          case "def":
              newClientPubCount += 1
          default:
              panic("message error: %s", string(message))
          }
          if openClientPubCount > 10 && newClientPubCount >= 10 {
              httpServer.Close()
          }
          return nil
      })
      if err != nil {
          panic("sub client start error: %s", err.Error())
      }
  }()

  // Send 1000 messages to the sub client - should not be broadcast
  for i := 0; i < 1000; i++ {
      err := subClient.WriteMessage(websocket.TextMessage, []byte("invalid"))
      if err != nil {
          panic(err)
      }
  }

  // Create publish only websockets client
  // Send messages with the open client
  pubClient := NewClientWithBearerToken(
      wsHost,
      pubAuthToken,
      isSecure,
      wsPath,
      id,
  )
  go func() {
      err := pubClient.Start(ctx, func(message []byte) (err error) {
          return fmt.Errorf("message should not be received using a pub client: %s", string(message))
      })
      if err != nil {
          panic("pub client start error: %s", err.Error())
      }
  }()
  ticker := time.NewTicker(50 * time.Millisecond)
  defer ticker.Stop()
  for {
      select {
      case <-ctx.Done():
          return
      case <-ticker.C:
  
          // Send pub message with open client
          err := pubClient.WriteMessage(websocket.TextMessage, []byte("abc"))
          if err != nil {
              panic(err)
          }
    
          // Send pub message with a new client
          err = PubWithBearerToken(
              ctx,
              wsHost,
              pubAuthToken,
              isSecure,
              wsPath,
              id,
              []byte("def"),
          )
          if err != nil {
              panic(err)
          }
       }
    }
}()

// Start http server
err := httpServer.ListenAndServe()
if err != nil {
    if !strings.Contains(err.Error(), "http: Server closed") {
        panic(err)
    }
}

Documentation

Index

Constants

View Source
const (
	BYTE     = 1 << (iota * 10) // = 1 << 0
	KILOBYTE                    // = 1 << 10
	MEGABYTE                    // = 1 << 20
	GIGABYTE                    // = 1 << 30

	// Gorilla Message types.
	TextMessage   = websocket.TextMessage
	BinaryMessage = websocket.BinaryMessage
	CloseMessage  = websocket.CloseMessage
	PingMessage   = websocket.PingMessage
	PongMessage   = websocket.PongMessage
)

Variables

View Source
var BroadcastLimitExceededCloseMessage = websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "client broadcast message limit exceeded")
View Source
var ErrStoreLimitExceeded = errors.New("websockets store limit exceeded")

Functions

func Pub

func Pub(
	ctx context.Context,
	host string,
	isSecure bool,
	path string,
	id string,
	message []byte,
) (err error)

Pub creates a new websockets client. Pub publishes a message and then closes the client connection.

func PubWithBearerToken

func PubWithBearerToken(
	ctx context.Context,
	host string,
	token string,
	isSecure bool,
	path string,
	id string,
	message []byte,
) (err error)

PubWithBearerToken creates a new websockets client with a brearer token. PubWithBearerToken publishes a message and then closes the client connection.

func PubWithHeader

func PubWithHeader(
	parentCtx context.Context,
	host string,
	header http.Header,
	isSecure bool,
	path string,
	id string,
	message []byte,
) (err error)

PubWithHeader creates a new websockets client with a custom header. PubWithHeader publishes a message and then closes the client connection.

Types

type Client

type Client struct {

	// Settings
	WriteWait        time.Duration // Time allowed to write a message to the peer.
	PongWait         time.Duration // Time allowed to read the next pong message from the peer.
	PingPeriod       time.Duration // Send pings to peer with this period. Must be less than PongWait.
	MaxMessageSize   int64         // Maximum message size allowed from peer.
	CloseGracePeriod time.Duration // Grace period after closing websocket.

	Dialer *websocket.Dialer
	// contains filtered or unexported fields
}

Client is the websockets client.

func NewClient

func NewClient(
	host string,
	isSecure bool,
	path string,
	id string,
) *Client

NewClient creates a new websockets client.

func NewClientWithBearerToken

func NewClientWithBearerToken(
	host string,
	token string,
	isSecure bool,
	path string,
	id string,
) *Client

NewClientWithBearerToken creates a new websockets client with a brearer token.

func NewClientWithHeader

func NewClientWithHeader(
	host string,
	header http.Header,
	isSecure bool,
	path string,
	id string,
) *Client

NewClientWithHeader creates a new websockets client with a custom header.

func (*Client) AllowInsecureConnections

func (c *Client) AllowInsecureConnections()

AllowInsecureConnections allows the client to make insecure connections (i.e. to servers with self-signed certificates).

func (*Client) ReadMessage

func (c *Client) ReadMessage() (messageType int, p []byte, err error)

ReadMessage reads messageType int and p []byte from the websocket connection.

func (*Client) Start

func (c *Client) Start(parentCtx context.Context, messageFunc func(message []byte) (err error)) (err error)

Start starts the websocket client.

func (*Client) WriteJSON

func (c *Client) WriteJSON(v interface{}) (err error)

WriteJSON writes marshalls v interface{} as JSON and sends the json []byte to the websocket connection.

func (*Client) WriteMessage

func (c *Client) WriteMessage(messageType int, data []byte) (err error)

WriteMessage writes b []byte to the websocket connection.

func (*Client) WriteMessageSlice

func (c *Client) WriteMessageSlice(dataSlice [][]byte) (err error)

WriteMessageSlice writes each set of bytes in the dataSlice [][]byte to one websocket connection.

func (*Client) WriteTextMessage

func (c *Client) WriteTextMessage(data []byte) (err error)

WriteTextMessage writesa websocket.TextMessage b []byte message to the websocket connection.

type Server

type Server struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Server is the websockets server struct containing m (a map[string] of map[*Store]bool entries).

func NewServer

func NewServer(ctx context.Context) *Server

NewServer returns a new websockets server with default settings.

func NewServerWithSettings

func NewServerWithSettings(
	ctx context.Context,
	writeWait time.Duration,
	pongWait time.Duration,
	pingPeriod time.Duration,
	maxMessageSize int64,
) *Server

NewServerWithSettings returns a new websockets server.

func (*Server) HandlePub

func (s *Server) HandlePub(channel string, w http.ResponseWriter, r *http.Request) (err error)

HandlePub is HandlePub but is Pub only (i.e. broadcast messages sent by other clients are not received by this websocket connection).

func (*Server) HandlePubSub

func (s *Server) HandlePubSub(channel string, w http.ResponseWriter, r *http.Request) (err error)

HandlePubSub is a handler that creates a websocket connection with a channel string identifier. All messages sent via this websocket connection are broadcast to all other clients subscribed with this channel identifier. All broadcast messages sent by other clients using this identifier are received by this websocket connection.

func (*Server) HandlePubSubWithHeader

func (s *Server) HandlePubSubWithHeader(channel string, header http.Header, w http.ResponseWriter, r *http.Request) (err error)

HandlePubSubWithHeader is HandlePubSub but uses a custom header.

func (*Server) HandlePubSubWithHeaderAndLimits

func (s *Server) HandlePubSubWithHeaderAndLimits(channel string, header http.Header, storeLimit int64, broadcastLimit time.Duration, w http.ResponseWriter, r *http.Request) (err error)

HandlePubSubWithHeaderAndLimits is HandlePubSub but uses a custom header and has store and/or broadcast limits. storeLimit is the maximum number of concurrent Stores (i.e. connections) that is allowed for each channel (default is 0 - no limit). broadcastLimit is the minimum delay required between broadcasts for each connection (default is 0*time.Second - no limit). The connection is closed if the broadcast limit is exceeded

func (*Server) HandlePubWithHeader

func (s *Server) HandlePubWithHeader(channel string, header http.Header, w http.ResponseWriter, r *http.Request) (err error)

HandlePubWithHeader is HandlePub but uses a custom header

func (*Server) HandlePubWithHeaderAndLimits

func (s *Server) HandlePubWithHeaderAndLimits(channel string, header http.Header, storeLimit int64, broadcastLimit time.Duration, w http.ResponseWriter, r *http.Request) (err error)

HandlePubWithHeaderAndLimits is HandlePub but uses a custom header and has store and/or broadcast limits. storeLimit is the maximum number of concurrent Stores (i.e. connections) that is allowed for each channel (default is 0 - no limit). broadcastLimit is the minimum delay required between broadcasts for each connection (default is 0*time.Second - no limit). The connection is closed if the broadcast limit is exceeded.

func (*Server) HandleSub

func (s *Server) HandleSub(channel string, w http.ResponseWriter, r *http.Request) (err error)

HandleSub is HandlePubSub but is Sub only (i.e. messages sent via this websocket connection are not broadcast to other subscribed clients).

func (*Server) HandleSubWithHeader

func (s *Server) HandleSubWithHeader(channel string, header http.Header, w http.ResponseWriter, r *http.Request) (err error)

HandleSubWithHeader is HandleSub but uses a custom header.

func (*Server) HandleSubWithHeaderAndLimits

func (s *Server) HandleSubWithHeaderAndLimits(channel string, header http.Header, storeLimit int64, broadcastLimit time.Duration, w http.ResponseWriter, r *http.Request) (err error)

HandleSubWithHeaderAndLimits is HandleSub but uses a custom header and has store and/or broadcast limits. storeLimit is the maximum number of concurrent Stores (i.e. connections) that is allowed for each channel (default is 0 - no limit). broadcastLimit is the minimum delay required between broadcasts for each connection (default is 0*time.Second - no limit). The connection is closed if the broadcast limit is exceeded.

type Store

type Store struct {
	sync.Mutex // write lock
	// contains filtered or unexported fields
}

Store stores a conn variable (the *websocket.Conn) and a sendCh variable (a buffered channel of outbound []byte messages).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL