xconn

package module
v0.0.0-...-52d5c43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2025 License: MIT Imports: 23 Imported by: 4

README

xconn

WAMP v2 Router and Client for go.

Installation

To install xconn, use the following command:

go get github.com/xconnio/xconn-go

Server

Setting up a basic server is straightforward:

package main

import (
	"log"

	"github.com/xconnio/xconn-go"
)

func main() {
	r := xconn.NewRouter()
	r.AddRealm("realm1")

	server := xconn.NewServer(r, nil)
	err := server.Start("localhost", 8080)
	if err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

For more advanced usage, such as integrating an authenticator, refer to the sample tool available in the cmd folder of the project.

Client

Creating a client:

package main

import (
	"context"
	"log"

	"github.com/xconnio/xconn-go"
)

func main() {
	client := xconn.Client{}
	session, err := client.Connect(context.Background(), "ws://localhost:8080/ws", "realm1")
	if err != nil {
		log.Fatal(err)
	}
}

Once the session is established, you can perform WAMP actions. Below are examples of all 4 WAMP operations:

Subscribe to a topic
func exampleSubscribe(session *xconn.Session) {
    subscription, err := session.Subscribe("io.xconn.example", eventHandler, map[string]any{})
    if err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
    }
    log.Printf("Subscribed to topic io.xconn.example: %v", subscription)
}

func eventHandler(evt *xconn.Event) {
    fmt.Printf("Event Received: args=%s, kwargs=%s, details=%s", evt.Args, evt.KwArgs, evt.Details)
}
Publish to a topic
func examplePublish(session *xconn.Session) {
    err := session.Publish("io.xconn.example", []any{}, map[string]any{}, map[string]any{})
    if err != nil {
        log.Fatalf("Failed to publish: %v", err)
    }
    log.Printf("Publsihed to topic io.xconn.example")
}
Register a procedure
func exampleRegister(session *xconn.Session) {
    registration, err := session.Register("io.xconn.example", invocationHandler, map[string]any{})
    if err != nil {
        log.Fatalf("Failed to register: %v", err)
    }
    log.Printf("Registered procedure io.xconn.example: %v", registration)
}

func invocationHandler(ctx context.Context, inv *xconn.Invocation) *xconn.Result {
    return &xconn.Result{Args: inv.Args, KwArgs: inv.KwArgs, Details: inv.Details}
}
Call a procedure
func exampleCall(session *xconn.Session) {
    result, err := session.Call(context.Background(), "io.xconn.example", []any{"Hello World!"}, map[string]any{}, map[string]any{})
    if err != nil {
        log.Fatalf("Failed to call: %v", err)
    }
    log.Printf("Call result: args=%s, kwargs=%s, details=%s", result.Args, result.KwArgs, result.Details)
}
Authentication

Authentication is straightforward. Simply create the desired authenticator and pass it to the Client.

Ticket Auth

ticketAuthenticator := auth.NewTicketAuthenticator(authID, map[string]any{}, ticket)
client := xconn.Client{Authenticator: ticketAuthenticator}
session, err := client.Connect(context.Background(), "ws://localhost:8080/ws", "realm1")
if err != nil {
    log.Fatalf("Failed to connect: %v", err)
}

Challenge Response Auth

craAuthenticator := auth.NewCRAAuthenticator(authID, map[string]any{}, secret)
client := xconn.Client{Authenticator: craAuthenticator}
session, err := client.Connect(context.Background(), "ws://localhost:8080/ws", "realm1")
if err != nil {
	log.Fatalf("Failed to connect: %v", err)
}

Cryptosign Auth

cryptoSignAuthenticator, err := auth.NewCryptoSignAuthenticator(authID, map[string]any{}, secret)
if err != nil {
    log.Fatal(err)
}
client := xconn.Client{Authenticator: cryptoSignAuthenticator}
session, err := client.Connect(context.Background(), "ws://localhost:8080/ws", "realm1")
if err != nil {
    log.Fatalf("Failed to connect: %v", err)
}
Serializers

XConn supports various serializers for different data formats. To use, just pass chosen serializer spec to the client.

JSON Serializer

client := xconn.Client{SerializerSpec: xconn.JSONSerializerSpec}
session, err := client.Connect(context.Background(), "ws://localhost:8080/ws", "realm1")
if err != nil {
	log.Fatalf("Failed to connect: %v", err)
}

CBOR Serializer

client := xconn.Client{SerializerSpec: xconn.CBORSerializerSpec}
session, err := client.Connect(context.Background(), "ws://localhost:8080/ws", "realm1")
if err != nil {
	log.Fatalf("Failed to connect: %v", err)
}

MsgPack Serializer

client := xconn.Client{SerializerSpec: xconn.MsgPackSerializerSpec}
session, err := client.Connect(context.Background(), "ws://localhost:8080/ws", "realm1")
if err != nil {
	log.Fatalf("Failed to connect: %v", err)
}

For more detailed examples or usage, refer to the examples folder of the project.

Documentation

Index

Constants

View Source
const (
	JsonWebsocketProtocol    = "wamp.2.json"
	MsgpackWebsocketProtocol = "wamp.2.msgpack"
	CborWebsocketProtocol    = "wamp.2.cbor"
	ProtobufSubProtocol      = "wamp.2.protobuf"

	CloseGoodByeAndOut  = "wamp.close.goodbye_and_out"
	CloseCloseRealm     = "wamp.close.close_realm"
	CloseSystemShutdown = "wamp.close.system_shutdown"
)
View Source
const ErrNoResult = "io.xconn.no_result"

Variables

Functions

func ReadHello

func ReadHello(peer Peer, serializer serializers.Serializer) (*messages.Hello, error)

func ReadMessage

func ReadMessage(peer Peer, serializer serializers.Serializer) (messages.Message, error)

func WriteMessage

func WriteMessage(peer Peer, message messages.Message, serializer serializers.Serializer) error

Types

type BaseSession

type BaseSession interface {
	ID() int64
	Realm() string
	AuthID() string
	AuthRole() string

	Serializer() serializers.Serializer
	NetConn() net.Conn
	Read() ([]byte, error)
	Write([]byte) error
	ReadMessage() (messages.Message, error)
	WriteMessage(messages.Message) error
	Close() error
}

func Accept

func Accept(peer Peer, hello *messages.Hello, serializer serializers.Serializer,
	authenticator auth.ServerAuthenticator) (BaseSession, error)

func Join

func Join(cl Peer, realm string, serializer serializers.Serializer,
	authenticator auth.ClientAuthenticator) (BaseSession, error)

func NewBaseSession

func NewBaseSession(id int64, realm, authID, authRole string, cl Peer, serializer serializers.Serializer) BaseSession

type CallResponse

type CallResponse struct {
	// contains filtered or unexported fields
}

type Client

type Client struct {
	Authenticator  auth.ClientAuthenticator
	SerializerSpec WSSerializerSpec
	NetDial        func(ctx context.Context, network, addr string) (net.Conn, error)

	DialTimeout       time.Duration
	KeepAliveInterval time.Duration
	KeepAliveTimeout  time.Duration
}

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, url string, realm string) (*Session, error)

type Error

type Error struct {
	URI         string
	Arguments   []any
	KwArguments map[string]any
}

func (*Error) Error

func (e *Error) Error() string

type Event

type Event struct {
	Topic       string
	Arguments   []any
	KwArguments map[string]any
	Details     map[string]any
}

type EventHandler

type EventHandler func(event *Event)

type GoodBye

type GoodBye struct {
	Details map[string]any
	Reason  string
}

type Invocation

type Invocation struct {
	Procedure   string
	Arguments   []any
	KwArguments map[string]any
	Details     map[string]any

	SendProgress SendProgress
}

type InvocationHandler

type InvocationHandler func(ctx context.Context, invocation *Invocation) *Result

type Peer

type Peer interface {
	Type() TransportType
	NetConn() net.Conn
	Read() ([]byte, error)
	Write([]byte) error
}

func DialWebSocket

func DialWebSocket(ctx context.Context, url *netURL.URL, config *WSDialerConfig) (Peer, error)

func NewWebSocketPeer

func NewWebSocketPeer(conn net.Conn, peerConfig WSPeerConfig) (Peer, error)

func UpgradeWebSocket

func UpgradeWebSocket(conn net.Conn, config *WebSocketServerConfig) (Peer, error)

type Progress

type Progress struct {
	Arguments   []any
	KwArguments map[string]any
	Options     map[string]any
	Err         error
}

type ProgressHandler

type ProgressHandler func(result *Result)

type PublishResponse

type PublishResponse struct {
	// contains filtered or unexported fields
}

type ReaderFunc

type ReaderFunc func(rw io.ReadWriter) ([]byte, error)

type Realm

type Realm struct {
	// contains filtered or unexported fields
}

func NewRealm

func NewRealm() *Realm

func (*Realm) AttachClient

func (r *Realm) AttachClient(base BaseSession) error

func (*Realm) Close

func (r *Realm) Close()

func (*Realm) DetachClient

func (r *Realm) DetachClient(base BaseSession) error

func (*Realm) ReceiveMessage

func (r *Realm) ReceiveMessage(sessionID int64, msg messages.Message) error

type RegisterResponse

type RegisterResponse struct {
	// contains filtered or unexported fields
}

type Registration

type Registration struct {
	ID int64
}

type Result

type Result struct {
	Arguments   []any
	KwArguments map[string]any
	Details     map[string]any
	Err         string
}

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter() *Router

func (*Router) AddRealm

func (r *Router) AddRealm(name string)

func (*Router) AttachClient

func (r *Router) AttachClient(base BaseSession) error

func (*Router) Close

func (r *Router) Close()

func (*Router) DetachClient

func (r *Router) DetachClient(base BaseSession) error

func (*Router) HasRealm

func (r *Router) HasRealm(name string) bool

func (*Router) ReceiveMessage

func (r *Router) ReceiveMessage(base BaseSession, msg messages.Message) error

func (*Router) RemoveRealm

func (r *Router) RemoveRealm(name string)

type SendProgress

type SendProgress func(arguments []any, kwArguments map[string]any) error

type SendProgressive

type SendProgressive func(ctx context.Context) *Progress

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(router *Router, authenticator auth.ServerAuthenticator, config *ServerConfig) *Server

func (*Server) HandleClient

func (s *Server) HandleClient(conn net.Conn)

func (*Server) RegisterSpec

func (s *Server) RegisterSpec(spec WSSerializerSpec) error

func (*Server) Start

func (s *Server) Start(host string, port int) (io.Closer, error)

func (*Server) StartUnixServer

func (s *Server) StartUnixServer(udsPath string) (io.Closer, error)

type ServerConfig

type ServerConfig struct {
	Throttle          *internal.Throttle
	KeepAliveInterval time.Duration
	KeepAliveTimeout  time.Duration
}

type Session

type Session struct {
	// contains filtered or unexported fields
}

func Connect

func Connect(ctx context.Context, url string, realm string) (*Session, error)

func NewSession

func NewSession(base BaseSession, serializer serializers.Serializer) *Session

func (*Session) Call

func (s *Session) Call(ctx context.Context, procedure string, args []any, kwArgs map[string]any,
	options map[string]any) (*Result, error)

func (*Session) CallProgress

func (s *Session) CallProgress(ctx context.Context, procedure string, args []any, kwArgs map[string]any,
	options map[string]any, progressHandler ProgressHandler) (*Result, error)

func (*Session) CallProgressive

func (s *Session) CallProgressive(ctx context.Context, procedure string,
	progressFunc SendProgressive) (*Result, error)

func (*Session) CallProgressiveProgress

func (s *Session) CallProgressiveProgress(ctx context.Context, procedure string,
	progressFunc SendProgressive, progressHandler ProgressHandler) (*Result, error)

func (*Session) Connected

func (s *Session) Connected() bool

func (*Session) GoodBye

func (s *Session) GoodBye() *GoodBye

func (*Session) ID

func (s *Session) ID() int64

func (*Session) Leave

func (s *Session) Leave() error

func (*Session) LeaveChan

func (s *Session) LeaveChan() <-chan struct{}

func (*Session) Publish

func (s *Session) Publish(topic string, args []any, kwArgs map[string]any,
	options map[string]any) error

func (*Session) Register

func (s *Session) Register(procedure string, handler InvocationHandler,
	options map[string]any) (*Registration, error)

func (*Session) SetOnLeaveListener

func (s *Session) SetOnLeaveListener(listener func())

func (*Session) Subscribe

func (s *Session) Subscribe(topic string, handler EventHandler, options map[string]any) (*Subscription, error)

func (*Session) Unregister

func (s *Session) Unregister(registrationID int64) error

func (*Session) Unsubscribe

func (s *Session) Unsubscribe(subscription *Subscription) error

type SubscribeResponse

type SubscribeResponse struct {
	// contains filtered or unexported fields
}

type Subscription

type Subscription struct {
	ID int64
}

type TransportType

type TransportType int
const (
	TransportNone TransportType = iota
	TransportWebSocket
)

type UnregisterResponse

type UnregisterResponse struct {
	// contains filtered or unexported fields
}

type UnsubscribeResponse

type UnsubscribeResponse struct {
	// contains filtered or unexported fields
}

type WSDialerConfig

type WSDialerConfig struct {
	SubProtocol       string
	DialTimeout       time.Duration
	NetDial           func(ctx context.Context, network, addr string) (net.Conn, error)
	KeepAliveInterval time.Duration
	KeepAliveTimeout  time.Duration
}

type WSPeerConfig

type WSPeerConfig struct {
	Protocol          string
	Binary            bool
	Server            bool
	KeepAliveInterval time.Duration
	KeepAliveTimeout  time.Duration
}

type WSSerializerSpec

type WSSerializerSpec interface {
	SubProtocol() string
	Serializer() serializers.Serializer
}

func NewWSSerializerSpec

func NewWSSerializerSpec(subProtocol string, serializer serializers.Serializer) WSSerializerSpec

type WebSocketAcceptor

type WebSocketAcceptor struct {
	Authenticator auth.ServerAuthenticator
	// contains filtered or unexported fields
}

func (*WebSocketAcceptor) Accept

func (w *WebSocketAcceptor) Accept(conn net.Conn, config *WebSocketServerConfig) (BaseSession, error)

func (*WebSocketAcceptor) RegisterSpec

func (w *WebSocketAcceptor) RegisterSpec(spec WSSerializerSpec) error

func (*WebSocketAcceptor) Spec

func (w *WebSocketAcceptor) Spec(subProtocol string) (serializers.Serializer, error)

type WebSocketJoiner

type WebSocketJoiner struct {
	SerializerSpec WSSerializerSpec
	Authenticator  auth.ClientAuthenticator
}

func (*WebSocketJoiner) Join

func (w *WebSocketJoiner) Join(ctx context.Context, url, realm string, config *WSDialerConfig) (BaseSession, error)

type WebSocketPeer

type WebSocketPeer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*WebSocketPeer) NetConn

func (c *WebSocketPeer) NetConn() net.Conn

func (*WebSocketPeer) Protocol

func (c *WebSocketPeer) Protocol() string

func (*WebSocketPeer) Read

func (c *WebSocketPeer) Read() ([]byte, error)

func (*WebSocketPeer) Type

func (c *WebSocketPeer) Type() TransportType

func (*WebSocketPeer) Write

func (c *WebSocketPeer) Write(bytes []byte) error

type WebSocketServerConfig

type WebSocketServerConfig struct {
	SubProtocols      []string
	KeepAliveInterval time.Duration
	KeepAliveTimeout  time.Duration
}

func DefaultWebSocketServerConfig

func DefaultWebSocketServerConfig() *WebSocketServerConfig

type WriterFunc

type WriterFunc func(w io.Writer, p []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL