webds

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2020 License: Apache-2.0 Imports: 12 Imported by: 0

README

webds

Distributed System with websocket

设计

  • 整体结构

Server层为核心服务层,通过选举产生一个中心节点.
SubServer层实例不可被选举为中心节点,但可以跨网段部署,其余与server节点等效
client层节点可以连接任意server节点
每个server节点都有维护 superior/lateral master 列表, 用以记录上级和平级的节点.
每个节点最多只有一个实际往出连通的平级或上级的连接,用以维护数据广播的最终一致性
平级连接之间共享 superior/lateral master 列表
出口连接共享 topics 1级订阅列表
出口连接当成一个特殊的入口连接, 出口连接不存在则为整个广播系统最核心的master节点,出口连接断掉则根据选举算法重新确定连接
  • 通信方式

      websocket
    
  • 通信协议

      主要以多级topic设计消息目标地址, 父子topic订阅节点会递归触发 其余参考ip层设计协议
      参考iris  序列化成字节流,格式: prefix(n)type(1)random_tag(4)source_idx(4)target_topic;msg
      type: 1个字节 msg type
      random_tag: 5个byte
      source_idx: 四个字节长度的数字 利用源地址映射 避免该地址长度线性增长 
      target_topic: 目标topic
      msg: json or protobuf
      保留下列1级topic,其余topic用于分发
          /sys   用于系统指令, 消息不广播, 用以两个节点之间连接维护状态和共享信息
          /inner 用于client 连接的Server直接处理,不进行广播, 响应函数由 conn.On 函数指定
          /self  用于回复消息时使用, 回复消息统一目标地址为/self/source_idx
                 source_idx 定长 4个字节, 根据source_idx 映射表找到实际的上一跳地址
          /srv   用以注册同步类型服务 暂未开发
    
  • 命令行工具设计

      参照ros命令行设计:形如
          - webds topic list/pub/sub
          - webds node list/stop
    
  • 选举步骤

    • 交换id
    • 交换信息
    • 请求是否跳转 不跳转则建立成功

TODO

  • client/go/py/js

  • command tools

  • 分布式,选举产生中心通信节点

  • 服务模式(同步消息机制)

  • 消息广播机制 优化和benchmark

  • 是否进行目标地址映射,减小协议编码长度

  • 变量逃逸分析 是否要大量采用interface来组织代码 评估性能损失

update

  • v0.2.5 重新组织代码,将重要对象拆分成interface, 优化代码逻辑,完善节点选举
  • v0.2.4 同级节点选举过程完成
  • v0.2.3 修改了消息协议 完善分布式设计思路
  • v0.2.2 修改了OnConnection的回调函数, 若回调函数返回error, 则 conn 直接关闭, 该功能可用于鉴定权限. 优化消息解析代码. 初步开始设计分布式结构.
  • v0.2.1 添加webds node list/stop 指令
  • v0.2.0 基于topic订阅机制初步重构完 server, go.client,command tool
  • v0.1.0 old version just a websocket server

Documentation

Index

Constants

View Source
const (
	// DefaultWebsocketWriteTimeout 0, no timeout
	DefaultWebsocketWriteTimeout = 0
	// DefaultWebsocketReadTimeout 0, no timeout
	DefaultWebsocketReadTimeout = 0
	// DefaultWebsocketPongTimeout 60 * time.Second
	DefaultWebsocketPongTimeout = 60 * time.Second
	// DefaultWebsocketPingPeriod (DefaultPongTimeout * 9) / 10
	DefaultWebsocketPingPeriod = (DefaultWebsocketPongTimeout * 9) / 10
	// DefaultWebsocketMaxMessageSize 1024
	DefaultWebsocketMaxMessageSize = 1024
	// DefaultWebsocketReadBufferSize 4096
	DefaultWebsocketReadBufferSize = 4096
	// DefaultWebsocketWriterBufferSize 4096
	DefaultWebsocketWriterBufferSize = 4096
)
View Source
const (
	Version = "v0.2.5"
)

Variables

View Source
var (
	ErrDuplicatedClient = errors.New("duplicated client")
	ErrID               = errors.New("id is not exist")
	ErrOrigin           = errors.New("error origin")
)

Functions

func DefaultIDGenerator added in v0.2.0

func DefaultIDGenerator(r *http.Request) string

DefaultIDGenerator returns a random unique for a new conn. Used when cfg.IDGenerator is nil.

func New added in v0.2.0

func New(cfg *Config) *webds

Types

type Config added in v0.2.0

type Config struct {
	// the server id
	ID string
	// IDGenerator used to create (and later on, set)
	// an ID for each incoming websocket connections (clients).
	// The request is an input parameter which you can use to generate the ID (from headers for example).
	// If empty then the ID is generated by DefaultIDGenerator: randomString(64)
	IDGenerator func(r *http.Request) string
	// record the url address of the superior cluster
	SuperiorMaster []string
	// record the url address of the lateral cluster
	LateralMaster []string
	EnableCluster bool
	// MsgPrefix is the prefix of the underline websocket events that are being established under the hoods.
	// This prefix is visible only to the javascript side (code) and it has nothing to do
	// with the message that the end-user receives.
	// Do not change it unless it is absolutely necessary.
	//
	// If empty then defaults to []byte("ws").
	MsgPrefix []byte
	// Error is the function that will be fired if any client couldn't upgrade the HTTP conn
	// to a websocket conn, a handshake error.
	Error func(w http.ResponseWriter, r *http.Request, status int, reason error)
	// CheckOrigin a function that is called right before the handshake,
	// if returns false then that client is not allowed to connect with the websocket server.
	CheckOrigin func(r *http.Request) bool
	// HandshakeTimeout specifies the duration for the handshake to complete.
	HandshakeTimeout time.Duration
	// WriteTimeout time allowed to write a message to the conn.
	// 0 means no timeout.
	// Default value is 0
	WriteTimeout time.Duration

	// EnableCompression specify if the server should attempt to negotiate per
	// message compression (RFC 7692). Setting this value to true does not
	// guarantee that compression will be supported. Currently only "no context
	// takeover" modes are supported.
	//
	// Defaults to false and it should be remain as it is, unless special requirements.
	EnableCompression bool

	// Subprotocols specifies the server's supported protocols in order of
	// preference. If this field is set, then the Upgrade method negotiates a
	// subprotocol by selecting the first match in this list with a protocol
	// requested by the client.
	Subprotocols []string
	// contains filtered or unexported fields
}

Config the websocket server configuration all of these are optional.

func (*Config) BinaryMessages added in v0.2.0

func (c *Config) BinaryMessages() bool

func (*Config) Ctx added in v0.2.5

func (c *Config) Ctx() context.Context

func (*Config) MsgSerializer added in v0.2.5

func (c *Config) MsgSerializer() *message.Serializer

func (*Config) PingPeriod added in v0.2.0

func (c *Config) PingPeriod() time.Duration

func (*Config) ReadBufferSize added in v0.2.0

func (c *Config) ReadBufferSize() int64

func (*Config) ReadTimeout added in v0.2.0

func (c *Config) ReadTimeout() time.Duration

func (*Config) Validate added in v0.2.0

func (c *Config) Validate()

Validate validates the configuration

func (*Config) Webds added in v0.2.5

func (c *Config) Webds() core.Webds

type ConnectionFunc added in v0.2.0

type ConnectionFunc func(core.Connection) error

Directories

Path Synopsis
client
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL