hubx

package module
v1.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2020 License: MIT Imports: 12 Imported by: 0

README

hubx

golang webocket + redis, using actor + command pattern.
No consistency guarantee, for performance.

Install

go get -u github.com/RocksonZeta/hubx

Example

package hubx_test

import (
	"fmt"
	"log"
	"net/http"
	"testing"
	"time"

	"github.com/RocksonZeta/hubx"
	"github.com/go-redis/redis/v7"
)

func serveHome(w http.ResponseWriter, r *http.Request) {
	log.Println(r.URL)
	if r.URL.Path != "/" {
		http.Error(w, "Not found", http.StatusNotFound)
		return
	}
	if r.Method != "GET" {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}
	http.ServeFile(w, r, "play.html")
}

var h *hubx.Hubx

func newHub() *hubx.Hubx {
	fmt.Println("newHub")
	hub, err := hubx.New(hubx.Options{
		WriteTimeout: 3 * time.Second,
		Logger:       log.Writer(),
		ChannelSize:  100,
	})
	if err != nil {
		fmt.Println(err)
	}
	redisOptions := hubx.RedisOptions{
		RedisChannel: "hello",
		Redis: redis.Options{
			Addr:         "localhost:6379",
			PoolSize:     10,
			MinIdleConns: 2,
		},
		ChannelSize: 10,
	}
	bs, err := hubx.NewRedisBroadcaster(hub, redisOptions)
	if err != nil {
		fmt.Println(err)
	}
	hub.SetBroadcaster(bs)
	hub.AfterJoin = func(client *hubx.Client) {
		uid, _ := client.Props.Load("uid")
		fmt.Println("user join uid:" + uid.(string))
	}
	hub.UseWs(func(client *hubx.Client, msg hubx.PartialMessage, next func()) {
		fmt.Println("before 1" + msg.String())
		next()
		fmt.Println("after 1" + msg.String())
	})
	hub.UseWs(func(client *hubx.Client, msg hubx.PartialMessage, next func()) {
		fmt.Println("before 2" + msg.String())
		next()
		fmt.Println("after 2" + msg.String())
	})
	hub.Use(func(msg hubx.PartialMessage, next func()) {
		fmt.Println("before r 1" + msg.String())
		next()
		fmt.Println("after r 1" + msg.String())
	})
	hub.OnWs("play", func(client *hubx.Client, msg hubx.PartialMessage) {
		fmt.Println("OnWs msg:" + msg.String())
		hub.Broadcast(msg.Subject, msg.Data)
	})
	hub.On("play", func(msg hubx.PartialMessage) {
		fmt.Println("On msg:" + msg.String())
		hub.BroadcastWs(msg.Subject, msg.Data)
	})
	hub.OnWs("close", func(client *hubx.Client, msg hubx.PartialMessage) {
		fmt.Println("OnWs close:" + msg.String())
		hub.CloseAsync()
		h = nil
	})
	hub.Ticker = func(tickCount int64) {
	}
	bs.Start()
	hub.Start()
	return hub
}

func TestMain(t *testing.T) {
	http.HandleFunc("/", serveHome)
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		if h == nil {
			h = newHub()
		}
		hubx.ServeWs(h, w, r, hubx.DefaultUpgrader(), map[string]interface{}{"uid": "1"})
	})
	err := http.ListenAndServe(":9000", nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}


Documentation

Index

Constants

View Source
const (
	Trace = 0
	Debug = 1
	Warn  = 2
	Error = 3
)

Variables

View Source
var DefaultMarhaller = func(obj interface{}) ([]byte, error) {
	return json.Marshal(obj)
}
View Source
var DefaultUnmarhaller = func(dataBs []byte, obj interface{}) error {
	return json.Unmarshal(dataBs, obj)
}

Functions

func DefaultUpgrader

func DefaultUpgrader() websocket.Upgrader

func Go

func Go(fn func())

func ServeWs

func ServeWs(hub *Hubx, w http.ResponseWriter, r *http.Request, upgrader websocket.Upgrader, clientProps map[string]interface{})

serveWs handles websocket requests from the peer.

Types

type BcFilter

type BcFilter func(msg PartialMessage, next func())

BcFilter broadcast message filter

type BcListener

type BcListener func(msg PartialMessage)

BcListener broadcast message listener

type Broadcaster

type Broadcaster interface {
	Send() chan<- []byte
	Close() chan<- bool
}

type Client

type Client struct {
	Props sync.Map
	// contains filtered or unexported fields
}

Client is a middleman between the websocket connection and the hub.

func (*Client) Send added in v1.0.5

func (c *Client) Send() chan<- []byte

type Hubx

type Hubx struct {
	BcListeners map[string]BcListener // subcribe redis message

	BeforeJoin            func(client *Client) error
	AfterJoin             func(client *Client)
	BeforeLeave           func(client *Client)
	AfterLeave            func(client *Client)
	AfterClose            func()
	OnLocalMessage        func(msg interface{})
	Unmarshaller          func(dataBs []byte, obj interface{}) error
	Marshaller            func(obj interface{}) ([]byte, error)
	RawMessageUnmarhaller func(bs []byte) (PartialMessage, error)
	Ticker                func(tickerCount int64)
	// contains filtered or unexported fields
}

func New

func New(options Options) (*Hubx, error)

func (*Hubx) Broadcast added in v1.0.4

func (h *Hubx) Broadcast(subject string, data interface{})

Broadcast message to broadcaster

func (*Hubx) BroadcastMessage

func (h *Hubx) BroadcastMessage() chan<- []byte

BroadcastMessage recevie message from network broadcast like redis pubsub

func (*Hubx) BroadcastWs

func (h *Hubx) BroadcastWs(subject string, msg interface{})

BroadcastWs send msg to all websocket client

func (*Hubx) Close

func (h *Hubx) Close() chan<- bool

Close chan

func (*Hubx) CloseAsync

func (h *Hubx) CloseAsync()

func (*Hubx) GetClients added in v1.0.9

func (h *Hubx) GetClients() map[*Client]bool

func (*Hubx) GetOptions added in v1.0.9

func (h *Hubx) GetOptions() Options

func (*Hubx) IsEmpty added in v1.0.9

func (h *Hubx) IsEmpty() bool

func (*Hubx) LocalChan added in v1.0.10

func (h *Hubx) LocalChan() chan<- interface{}

func (*Hubx) On

func (h *Hubx) On(subject string, cb BcListener)

On set broadcast message listener

func (*Hubx) OnDefault added in v1.0.5

func (h *Hubx) OnDefault(cb BcListener)

func (*Hubx) OnWs

func (h *Hubx) OnWs(subject string, cb WsListener)

OnWs set websocket message listener

func (*Hubx) OnWsDefault added in v1.0.5

func (h *Hubx) OnWsDefault(cb WsListener)

func (*Hubx) SendWs

func (h *Hubx) SendWs(subject string, data interface{}, clients ...*Client)

SendWs send msg to specified websocket client

func (*Hubx) SendWsWithCtx

func (h *Hubx) SendWsWithCtx(ctx context.Context, subject string, data interface{}, clients ...*Client)

SendWsWithCtx send msg to specified websocket client with context

func (*Hubx) SetBroadcaster

func (h *Hubx) SetBroadcaster(broadcaster Broadcaster)

SetBroadcast we should set broadcaster before start

func (*Hubx) Start

func (h *Hubx) Start()

Start hubx actor

func (*Hubx) Use

func (h *Hubx) Use(filter BcFilter)

Userfilter for broadcaster's message

func (*Hubx) UseWs

func (h *Hubx) UseWs(filter WsFilter)

UseWs filter for websocket's message

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

func NewLogger

func NewLogger(writer io.Writer, module string, level int) *Logger

func (*Logger) Debug

func (l *Logger) Debug(msg string)

func (*Logger) Error

func (l *Logger) Error(msg string)

func (*Logger) Trace

func (l *Logger) Trace(msg string)

func (*Logger) Warn

func (l *Logger) Warn(msg string)

type Message

type Message struct {
	Subject string
	Data    interface{}
}

func (Message) Marshal

func (m Message) Marshal() ([]byte, error)

func (Message) Unmarshal

func (m Message) Unmarshal(result interface{}) error

type Options

type Options struct {
	WriteTimeout     time.Duration
	PongTimeout      time.Duration
	PingPeriod       time.Duration
	TickerPeriod     time.Duration
	MaxMessageSize   int64
	Logger           io.Writer
	LoggerLevel      int
	ChannelSize      int
	LocalChannelSize int
}

type PartialMessage

type PartialMessage struct {
	Subject string
	Data    json.RawMessage
}

func DefaultRawMessageUnmarhaller

func DefaultRawMessageUnmarhaller(bs []byte) (PartialMessage, error)

func (*PartialMessage) Marshal

func (p *PartialMessage) Marshal() ([]byte, error)

func (*PartialMessage) String

func (p *PartialMessage) String() string

func (*PartialMessage) Unmarshal

func (p *PartialMessage) Unmarshal(bs []byte) error

func (*PartialMessage) UnmarshalData

func (p *PartialMessage) UnmarshalData(result interface{}) error

type RawMessage

type RawMessage struct {
	Client *Client
	Msg    []byte
}

type RedisBroadcaster

type RedisBroadcaster struct {
	// contains filtered or unexported fields
}

func NewRedisBroadcaster

func NewRedisBroadcaster(hub *Hubx, options RedisOptions) (*RedisBroadcaster, error)

func (*RedisBroadcaster) Close

func (r *RedisBroadcaster) Close() chan<- bool

func (*RedisBroadcaster) Send

func (r *RedisBroadcaster) Send() chan<- []byte

func (*RedisBroadcaster) Start

func (r *RedisBroadcaster) Start()

type RedisOptions

type RedisOptions struct {
	// Url         string
	Redis        redis.Options
	RedisChannel string //redis pubsub channel
	// PoolSize    int    //default is 2
	ChannelSize int
}

type WsFilter

type WsFilter func(client *Client, msg PartialMessage, next func())

WsFilter Websocket message filter

type WsListener

type WsListener func(client *Client, msg PartialMessage)

WsListener Websocket message listener

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL