messageloop

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

MessageLoop

A realtime messaging platform server written in Go, providing pub/sub messaging capabilities over WebSocket and gRPC streaming transports, using a CloudEvents-based protocol.

Features

  • Multiple Transports - WebSocket and gRPC streaming support
  • Flexible Encoding - JSON and Protobuf message encoding
  • CloudEvents Protocol - Standardized event format for message passing
  • Pub/Sub Messaging - Channel-based publish/subscribe with wildcard support
  • RPC Integration - Request/response messaging with backend proxy support
  • Distributed Broker - Pluggable broker backend (in-memory or Redis Streams)
  • Sharded Architecture - High-performance connection and subscription management
  • Client SDKs - Go and TypeScript/JavaScript SDKs available

Quick Start

Installation
git clone https://github.com/messageloopio/messageloop.git
cd messageloop
go mod download
Running the Server
go run cmd/server/main.go --config-dir ./configs

The server will start with:

  • WebSocket transport on ws://localhost:9080/ws
  • gRPC transport on localhost:9090
Configuration

Create a configuration file (see config-example.yaml) or use the default settings:

transport:
  websocket:
    address: ":9080"
  grpc:
    address: ":9090"

broker:
  type: "memory"  # or "redis"
  redis:
    addr: "localhost:6379"

Client SDKs

Go SDK
import "github.com/messageloopio/messageloop/sdks/go"

client, err := messageloop.Dial(context.Background(),
    "ws://localhost:9080/ws",
    messageloop.WithClientId("my-client"),
)

// Subscribe to channels
err = client.Subscribe(context.Background(), "chat.messages")

// Publish a message
event := messageloop.NewCloudEvent(
    "/client",
    "chat.message",
    []byte(`{"text":"Hello!"}`),
)
err = client.Publish(context.Background(), "chat.messages", event)

See sdks/go/example/ for more examples.

TypeScript/JavaScript SDK
import { dial, createCloudEvent } from "@messageloop/sdk";

const client = await dial("ws://localhost:9080/ws", [
  withClientId("my-client"),
]);

client.onMessage((events) => {
  events.forEach((msg) => console.log("Message:", msg.event.type));
});

const event = createCloudEvent({
  source: "/client",
  type: "chat.message",
  data: { text: "Hello!" },
});
await client.publish("chat.messages", event);

See sdks/ts/ for more details and examples.

Development

Build
go build ./...
Generate Protocol Buffers
# Install buf first
go install github.com/bufbuild/buf/cmd/buf@v1.63.0

# Generate
task generate-protocol
Run Tests
# All tests
go test ./...

# Specific package
go test ./pkg/topics/...

# Verbose output
go test -v ./pkg/topics/...
TypeScript SDK
cd sdks/ts
npm install
npm run build
npm test

Architecture

MessageLoop is built with a modular, sharded architecture for high performance:

  • Node - Central coordinator managing Hub, Broker, and Proxy
  • Hub - 64-sharded connection registry for efficient client management
  • Broker - Pluggable pub/sub backend (memory or Redis Streams)
  • Transport - Abstracted connection handling (WebSocket/gRPC)
  • Proxy - Backend service integration for RPC routing
CloudEvents Protocol

All messaging uses CloudEvents format:

  • Publish and RpcRequest wrap data in CloudEvents
  • Supports BinaryData and TextData fields
  • Standardized event metadata (type, source, id, time)

Documentation

  • CLAUDE.md - Detailed architecture and development guide
  • sdks/go/ - Go SDK documentation
  • sdks/ts/ - TypeScript SDK documentation

License

[Add your license here]

Documentation

Index

Constants

View Source
const (
	SystemMethodAuthenticate = "$authenticate"
)

Variables

View Source
var (
	// DisconnectInvalidToken issued when client came with invalid token.
	DisconnectInvalidToken = Disconnect{
		Code:   3500,
		Reason: "invalid token",
	}
	// DisconnectBadRequest issued when client uses malformed protocol frames.
	DisconnectBadRequest = Disconnect{
		Code:   3501,
		Reason: "bad request",
	}
	// DisconnectStale issued to close connection that did not become
	// authenticated in configured interval after dialing.
	DisconnectStale = Disconnect{
		Code:   3502,
		Reason: "stale",
	}
	// DisconnectForceNoReconnect issued when server disconnects connection
	// and asks it to not reconnect again.
	DisconnectForceNoReconnect = Disconnect{
		Code:   3503,
		Reason: "force disconnect",
	}
	// DisconnectConnectionLimit can be issued when client connection exceeds a
	// configured connection limit (per user ID or due to other rule).
	DisconnectConnectionLimit = Disconnect{
		Code:   3504,
		Reason: "connection limit",
	}
	// DisconnectChannelLimit can be issued when client connection exceeds a
	// configured channel limit.
	DisconnectChannelLimit = Disconnect{
		Code:   3505,
		Reason: "channel limit",
	}
	// DisconnectInappropriateProtocol can be issued when client connection format can not
	// handle incoming data. For example, this happens when JSON-based clients receive
	// binary data in a channel. This is usually an indicator of programmer error, JSON
	// clients can not handle binary.
	DisconnectInappropriateProtocol = Disconnect{
		Code:   3506,
		Reason: "inappropriate protocol",
	}
	// DisconnectPermissionDenied may be issued when client attempts accessing a server without
	// enough permissions.
	DisconnectPermissionDenied = Disconnect{
		Code:   3507,
		Reason: "permission denied",
	}
	// DisconnectNotAvailable may be issued when ErrorNotAvailable does not fit message type, for example
	// we issue DisconnectNotAvailable when client sends asynchronous message without MessageHandler set
	// on server side.
	DisconnectNotAvailable = Disconnect{
		Code:   3508,
		Reason: "not available",
	}
	// DisconnectTooManyErrors may be issued when client generates too many errors.
	DisconnectTooManyErrors = Disconnect{
		Code:   3509,
		Reason: "too many errors",
	}
	// DisconnectIdleTimeout may be issued when client connection is idle for too long.
	DisconnectIdleTimeout = Disconnect{
		Code:   3511,
		Reason: "idle timeout",
	}
)

The codes below are built-in terminal codes.

View Source
var DisconnectConnectionClosed = Disconnect{
	Code:   3000,
	Reason: "connection closed",
}

DisconnectConnectionClosed is a special Disconnect object used when client connection was closed without any advice from a server side. This can be a clean disconnect, or temporary disconnect of the client due to internet connection loss. Server can not distinguish the actual reason of disconnect.

Marshalers is a list of available marshalers.

View Source
var ProtoJSONMarshaler = &protoJSONMarshaler{
	Marshaler: protojson.MarshalOptions{
		UseProtoNames:   true,
		EmitUnpopulated: false,
	},
	Unmarshaler: protojson.UnmarshalOptions{
		DiscardUnknown: true,
	},
}

ProtoJSONMarshaler is a JSON marshaler that uses protobuf JSON encoding.

Functions

func MakeOutboundMessage

func MakeOutboundMessage(in *clientpb.InboundMessage, bodyFunc func(out *clientpb.OutboundMessage)) *clientpb.OutboundMessage

func NewClientSession

func NewClientSession(ctx context.Context, node *Node, t Transport, marshaler Marshaler) (*ClientSession, ClientCloseFunc, error)

Types

type Broker

type Broker interface {
	// RegisterEventHandler called once on start when Broker already set to Node. At
	// this moment node is ready to process broker events.
	RegisterEventHandler(BrokerEventHandler) error

	// Subscribe node on channel to listen all messages coming from channel.
	Subscribe(ch string) error
	// Unsubscribe node from channel to stop listening messages from it.
	Unsubscribe(ch string) error

	// Publish allows sending data into channel. Data should be
	// delivered to all clients subscribed to this channel at moment on any
	// Centrifuge node (with at most once delivery guarantee).
	//
	// Broker can optionally maintain publication history inside channel according
	// to PublishOptions provided. See History method for rules that should be implemented
	// for accessing publications from history stream.
	//
	// Saving message to a history stream and publish to PUB/SUB should be an atomic
	// operation per channel. If this is not true – then publication to one channel
	// must be serialized on the caller side, i.e. publish requests must be issued one
	// after another. Otherwise, the order of publications and stable behaviour of
	// subscribers with positioning/recovery enabled can't be guaranteed.
	//
	// StreamPosition returned here describes stream epoch and offset assigned to
	// the publication. For channels without history this StreamPosition should be
	// zero value.
	// Second bool value returned here means whether Publish was suppressed due to
	// the use of PublishOptions.IdempotencyKey. In this case StreamPosition is
	// returned from the cache maintained by Broker.
	Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, bool, error)
	// PublishJoin publishes Join Push message into channel.
	PublishJoin(ch string, info *ClientDesc) error
	// PublishLeave publishes Leave Push message into channel.
	PublishLeave(ch string, info *ClientDesc) error

	// History used to extract Publications from history stream.
	// Publications returned according to HistoryFilter which allows to set several
	// filtering options. StreamPosition returned describes current history stream
	// top offset and epoch.
	History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error)
	// RemoveHistory removes history from channel. This is in general not
	// needed as history expires automatically (based on history_lifetime)
	// but sometimes can be useful for application logic.
	RemoveHistory(ch string) error
}

func NewMemoryBroker

func NewMemoryBroker() Broker

type BrokerEventHandler

type BrokerEventHandler interface {
	// HandlePublication to handle received Publications.
	HandlePublication(ch string, pub *Publication) error
	// HandleJoin to handle received Join messages.
	HandleJoin(ch string, info *ClientDesc) error
	// HandleLeave to handle received Leave messages.
	HandleLeave(ch string, info *ClientDesc) error
}

BrokerEventHandler can handle messages received from PUB/SUB system.

type ClientCloseFunc

type ClientCloseFunc func() error

type ClientDesc

type ClientDesc struct {
	ClientID  string `json:"client_id"`
	SessionID string `json:"session_id"`
	UserID    string `json:"user_id"`
}

type ClientSession

type ClientSession struct {
	// contains filtered or unexported fields
}

func (*ClientSession) Authenticated

func (c *ClientSession) Authenticated() bool

func (*ClientSession) Channels

func (c *ClientSession) Channels() []string

func (*ClientSession) ClientID

func (c *ClientSession) ClientID() string

func (*ClientSession) ClientInfo

func (c *ClientSession) ClientInfo() *ClientDesc

func (*ClientSession) Close

func (c *ClientSession) Close(disconnect Disconnect) error

Close closes the client session with a disconnect reason. This is an exported method for use by the server-side API.

func (*ClientSession) HandleMessage

func (c *ClientSession) HandleMessage(ctx context.Context, in *clientpb.InboundMessage) error

func (*ClientSession) LastSurveyRequestID

func (c *ClientSession) LastSurveyRequestID() string

LastSurveyRequestID returns the last received survey request ID. This is useful for testing purposes.

func (*ClientSession) ResetActivity

func (c *ClientSession) ResetActivity()

ResetActivity resets the last activity timestamp to now.

func (*ClientSession) Send

func (*ClientSession) SessionID

func (c *ClientSession) SessionID() string

func (*ClientSession) UserID

func (c *ClientSession) UserID() string

type Disconnect

type Disconnect struct {
	// Code is a disconnect code.
	Code uint32 `json:"code,omitempty"`
	// Reason is a short description of disconnect code for humans.
	Reason string `json:"reason"`
}

func (Disconnect) Error

func (d Disconnect) Error() string

Error to use Disconnect as a callback handler error to signal Centrifuge that client must be disconnected with corresponding Code and Reason.

func (Disconnect) String

func (d Disconnect) String() string

String representation.

type EncodingType

type EncodingType int
const (
	EncodingTypeJSON     EncodingType = 1
	EncodingTypeProtobuf EncodingType = 2
)

type HeartbeatConfig

type HeartbeatConfig struct {
	IdleTimeout time.Duration
}

HeartbeatConfig contains parsed heartbeat configuration durations.

type HeartbeatManager

type HeartbeatManager struct {
	// contains filtered or unexported fields
}

HeartbeatManager manages client heartbeat monitoring.

func NewHeartbeatManager

func NewHeartbeatManager(cfg HeartbeatConfig) *HeartbeatManager

NewHeartbeatManager creates a new HeartbeatManager with the given config.

func (*HeartbeatManager) Config

func (hm *HeartbeatManager) Config() HeartbeatConfig

Config returns the heartbeat configuration.

func (*HeartbeatManager) Start

func (hm *HeartbeatManager) Start(ctx context.Context, client *ClientSession)

Start starts the heartbeat goroutine for a client session.

type HistoryFilter

type HistoryFilter struct {
	// Since used to extract publications from stream since provided StreamPosition.
	Since *StreamPosition
	// Limit number of publications to return.
	// -1 means no limit - i.e. return all publications currently in stream.
	// 0 means that caller only interested in current stream top position so
	// Broker should not return any publications.
	Limit int
	// Reverse direction.
	Reverse bool
}

HistoryFilter allows filtering history according to fields set.

type HistoryOptions

type HistoryOptions struct {
	// Filter for history publications.
	Filter HistoryFilter
	// MetaTTL allows overriding default (set in Config.HistoryMetaTTL) history
	// meta information expiration time.
	MetaTTL time.Duration
}

HistoryOptions define some fields to alter History method behaviour.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

func (*Hub) GetSubscribers

func (h *Hub) GetSubscribers(ch string) []*ClientSession

GetSubscribers returns a copy of all subscribers for a given channel.

func (*Hub) LookupSession

func (h *Hub) LookupSession(sessionID string) *ClientSession

LookupSession returns a client session by session ID. Returns nil if session not found.

func (*Hub) NumSubscribers

func (h *Hub) NumSubscribers(ch string) int

NumSubscribers returns number of current subscribers for a given channel.

func (*Hub) RemoveSession

func (h *Hub) RemoveSession(sessionID string)

RemoveSession removes a session from the sessions map and connShards.

type JSONMarshaler

type JSONMarshaler struct{}

JSONMarshaler implements JSON marshaling for protocol messages.

func (JSONMarshaler) Marshal

func (JSONMarshaler) Marshal(msg any) ([]byte, error)

func (JSONMarshaler) Name

func (JSONMarshaler) Name() string

func (JSONMarshaler) Unmarshal

func (JSONMarshaler) Unmarshal(data []byte, msg any) error

func (JSONMarshaler) UseBytes

func (JSONMarshaler) UseBytes() bool

type MarshalTypeError

type MarshalTypeError struct {
	Type any
}

MarshalTypeError is returned when Marshal receives an unexpected type.

func (*MarshalTypeError) Error

func (e *MarshalTypeError) Error() string

type Marshaler

type Marshaler interface {
	// Marshal converts a message to bytes.
	Marshal(msg any) ([]byte, error)
	// Unmarshal converts bytes to a message.
	Unmarshal(data []byte, msg any) error
	// Name returns the marshaler name.
	Name() string
	// UseBytes returns true if this marshaler uses binary encoding.
	UseBytes() bool
}

Marshaler defines the interface for marshaling protocol messages.

type Node

type Node struct {
	// contains filtered or unexported fields
}

func NewNode

func NewNode(cfg *config.Server) *Node

func (*Node) AddClient

func (n *Node) AddClient(c *ClientSession)

AddClient adds a client session to the node's hub.

func (*Node) AddProxy

func (n *Node) AddProxy(p proxy.Proxy, channelPattern, methodPattern string) error

AddProxy adds a proxy to the router.

func (*Node) AddSubscription

func (n *Node) AddSubscription(ctx context.Context, ch string, sub Subscriber) error

AddSubscription adds a subscription for a client to a channel. This is an exported method for use by the server-side API.

func (*Node) AddSurveyResponse

func (n *Node) AddSurveyResponse(ctx context.Context, sessionID string, requestID string, payload []byte, err error)

AddSurveyResponse adds a client response to the appropriate survey.

func (*Node) Broker

func (n *Node) Broker() Broker

func (*Node) FindProxy

func (n *Node) FindProxy(channel, method string) proxy.Proxy

FindProxy finds a proxy for the given channel and method. Returns nil if no matching proxy is found.

func (*Node) GetHeartbeatIdleTimeout

func (n *Node) GetHeartbeatIdleTimeout() time.Duration

GetHeartbeatIdleTimeout returns the configured heartbeat idle timeout. Returns 0 if heartbeat manager is not configured.

func (*Node) GetRPCTimeout

func (n *Node) GetRPCTimeout() time.Duration

GetRPCTimeout returns the configured RPC timeout.

func (*Node) HandleJoin

func (n *Node) HandleJoin(ch string, info *ClientDesc) error

func (*Node) HandleLeave

func (n *Node) HandleLeave(ch string, info *ClientDesc) error

func (*Node) HandlePublication

func (n *Node) HandlePublication(ch string, pub *Publication) error

func (*Node) Hub

func (n *Node) Hub() *Hub

func (*Node) ProxyRPC

func (n *Node) ProxyRPC(ctx context.Context, channel, method string, req *proxy.RPCProxyRequest) (*proxy.RPCProxyResponse, error)

ProxyRPC proxies an RPC request to the configured backend.

func (*Node) Publish

func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) error

func (*Node) RemoveSubscription

func (n *Node) RemoveSubscription(ch string, c *ClientSession) error

RemoveSubscription removes a subscription for a client from a channel. This is an exported method for use by the server-side API.

func (*Node) Run

func (n *Node) Run() error

func (*Node) SetBroker

func (n *Node) SetBroker(broker Broker)

func (*Node) SetupProxy

func (n *Node) SetupProxy(cfgs []*proxy.ProxyConfig) error

SetupProxy configures the proxy router with the given proxy configurations.

func (*Node) Survey

func (n *Node) Survey(ctx context.Context, channel string, payload []byte, timeout time.Duration) ([]*SurveyResult, error)

Survey sends a request to all subscribers of a channel and collects responses. Returns collected results or error if the channel has no subscribers.

type ProtobufMarshaler

type ProtobufMarshaler struct{}

ProtobufMarshaler implements protobuf marshaling for protocol messages.

func (ProtobufMarshaler) Marshal

func (ProtobufMarshaler) Marshal(msg any) ([]byte, error)

func (ProtobufMarshaler) Name

func (ProtobufMarshaler) Name() string

func (ProtobufMarshaler) Unmarshal

func (ProtobufMarshaler) Unmarshal(data []byte, msg any) error

func (ProtobufMarshaler) UseBytes

func (ProtobufMarshaler) UseBytes() bool

type Publication

type Publication struct {
	Channel   string
	Offset    uint64
	Metadata  map[string]interface{}
	IsBlob    bool
	IsText    bool   // true if original data was text_data, false if binary_data
	EventType string // original CloudEvent type (e.g., "hello")
	Payload   []byte
	Time      int64
}

type PublishOption

type PublishOption func(*PublishOptions)

func WithAsBytes

func WithAsBytes(asBytes bool) PublishOption

func WithClientDesc

func WithClientDesc(info *ClientDesc) PublishOption

WithClientDesc adds ClientDesc to Publication.

func WithEventType

func WithEventType(eventType string) PublishOption

func WithIsText

func WithIsText(isText bool) PublishOption

type PublishOptions

type PublishOptions struct {
	ClientDesc *ClientDesc
	AsBytes    bool
	IsText     bool
	EventType  string
}

type StreamPosition

type StreamPosition struct {
	// Offset defines publication incremental offset inside a stream.
	Offset uint64
	// Epoch allows handling situations when storage
	// lost stream entirely for some reason (expired or lost after restart) and we
	// want to track this fact to prevent successful recovery from another stream.
	// I.e. for example we have a stream [1, 2, 3], then it's lost and new stream
	// contains [1, 2, 3, 4], client that recovers from position 3 will only receive
	// publication 4 missing 1, 2, 3 from new stream. With epoch, we can tell client
	// that correct recovery is not possible.
	Epoch string
}

type Subscriber

type Subscriber struct {
	Client    *ClientSession
	Ephemeral bool
}

Subscriber represents a client that can subscribe to channels.

func NewSubscriber

func NewSubscriber(client *ClientSession, ephemeral bool) Subscriber

NewSubscriber creates a new Subscriber.

type Survey

type Survey struct {
	// contains filtered or unexported fields
}

Survey manages the lifecycle of a survey request and response collection.

func NewSurvey

func NewSurvey(id, channel string, payload []byte, timeout time.Duration) *Survey

NewSurvey creates a new Survey instance.

func (*Survey) AddResponse

func (s *Survey) AddResponse(sessionID string, payload []byte, err error)

AddResponse adds a client response to the survey.

func (*Survey) Channel

func (s *Survey) Channel() string

Channel returns the target channel for this survey.

func (*Survey) Close

func (s *Survey) Close()

Close cleans up survey resources.

func (*Survey) ID

func (s *Survey) ID() string

ID returns the survey request ID.

func (*Survey) Payload

func (s *Survey) Payload() []byte

Payload returns the survey request payload.

func (*Survey) Results

func (s *Survey) Results() []*SurveyResult

Results returns a copy of the current collected results.

func (*Survey) Timeout

func (s *Survey) Timeout() time.Duration

Timeout returns the survey timeout duration.

func (*Survey) Wait

func (s *Survey) Wait(ctx context.Context) []*SurveyResult

Wait waits for responses until timeout or context cancellation. Returns collected results.

type SurveyResult

type SurveyResult struct {
	SessionID string
	Payload   []byte
	Error     error
}

SurveyResult represents a response from a client to a survey request.

type Transport

type Transport interface {
	Write([]byte) error
	WriteMany(...[]byte) error
	Close(Disconnect) error
}

type UnmarshalTypeError

type UnmarshalTypeError struct {
	Type any
}

UnmarshalTypeError is returned when Unmarshal receives an unexpected type.

func (*UnmarshalTypeError) Error

func (e *UnmarshalTypeError) Error() string

Directories

Path Synopsis
cmd
server command
pkg
sdks
go module
shared module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL