artemis

package module
v0.0.0-...-f538a29 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2017 License: MIT Imports: 7 Imported by: 0

README

Artemis is an RTMES (Real Time Event & Message Server) with a Go API

Built on top of gorilla/websocket, to be expanded to support other real time protocols

Use Artemis to manage server-side WS (for now) connections. Create agents to receive both ws messages and artemis-generated events within an isolated hub. Respond to these events with the agents themselves or with delegated objects of your own.

This information is outdated (since always) - leaving in case some ideas need to be reviewed/used

Updated README coming soon.

App is responsible for handling http request (authentication, any other operation desired) App is responsible for creating any hubs or families that the client should belong to, and adding the client to those.

  • rtmes.NewHub
  • rtmes.NewFamily

rtmes will manage the persistence of hubs and families. It will expose an API to access hubs and families if necessary.

  • rtmes.GetHub(id) *Hub
  • rtmes.Hubs []*Hub
  • Hub.GetFamily(id) *Family
  • Hub.Families []*Family

App then hands the request to rtmes to upgrade the connection to WS. From there, rtmes manages the connection and any interconnectivity to other clients. It does this through the

  • rtmes.NewClient(r, w, families, hub)

A client is created when the connection is opened. It remains open until the client (device) closes the connection.

The client is responsible for adding itself to 0 or more hubs and 0 or more families. If the client does not specify a hub, then it will be added to the default hub. If the client does not specify a family, then it will not be a member of any families.

Families and Clients both implement an interface for responding to events. (EventResponder)

Families and Clients both implement an interface for responding to messages. (MessageResponder)

Families and Clients both implement an interface for responding pushing messages from the server. (MessagePusher)

Events are specific to a Hub - only EventResponders within the Hub that fires the event will respond. Are there some default events? Maybe, probably.

Requirements for messages:

Messages must be parsable into an object with an type/name

Documentation

Overview

Package artemis manages Socket connections and provides an API for handling incoming messages, sending messages back to the client, and subscribing to and firing server-side events.

artemis is a wordification of rtmes, which stands for Real Time Message & Event Server

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultTextParser can be overridden to implement text parsing for Client without
	// providing a custom parser
	DefaultTextParser = ParseJSONMessage

	// Timeout is the time allowed to write messages
	Timeout = 10 * time.Second

	// Default WS configs - can be set at package level
	// TODO, update for multiple protocols
	ReadLimit                       int64 = 4096
	HandshakeTimeout                      = 10 * time.Second
	ReadBufferSize, WriteBufferSize int

	// Errors sends errors encountered during send and receive and is meant to be consumed by a logger
	// TODO provide default logger to stdout
	Errors   = make(chan error, 256)
	Warnings = make(chan error, 256)

	// ErrHubMismatch occurs when trying to add a client to family with a different hub.
	ErrHubMismatch = errors.New("Unable to add a client to a family in a different hub.")

	// ErrDuplicateClient occurs when a client ID matches an existing client ID in the family on join.
	ErrDuplicateDelegate = errors.New("Tried to add a duplicate delegate to family.")

	ErrNoDelegates = errors.New("Tried to remove a delegate from a family, but it wasn't a member.")

	ErrNoSubscriptions = errors.New("Tried to remove a subscription to a message agent with no subscriptions of that kind.")

	// ErrAlreadySubscribed occurs when trying to add an event handler to a Responder that already has one.
	ErrAlreadySubscribed = errors.New("Trying to add duplicate event to responder.")

	// ErrUnparseableMessage indicates that a message does not contain some expected data.
	ErrUnparseableMessage = errors.New("The message parser does not recognize the message format.")

	// TODO enumerate what type etc.
	ErrBadMessageType = errors.New("Tried to send message with unrecognized type.")

	// ErrDuplicateAction means that a MessageResponder is already listening to perform the same action
	// in response to the same message
	ErrDuplicateHandler = errors.New("An action already exists for that event or message name.")

	ErrIllegalPingTimeout = errors.New("pingPeriod must be shorter than pongTimeout")

	ErrEventChannelHasClosed = errors.New("This client is no longer receiving events.")

	// TODO ID agent, provide IsLostConnError()
	ErrMessageConnectionLost = errors.New("A message agent has lost its connection.")

	ErrNoSubscribers = errors.New("Hub fired event but no one is listening.")
)
View Source
var (

	// ErrDuplicateHubID indicates that hub creation failed because the name is already in use.
	ErrDuplicateHubID = errors.New("A hub with that ID already exists.")
)

Functions

func SetPingPeriod

func SetPingPeriod(n time.Duration) error

SetPingPeriod allows the application to specify the period between sending ping messages to clients

func SetPongTimeout

func SetPongTimeout(n time.Duration) error

SetPongTimeout allows the application to specify the period allowed to receive a pong message from clients

Types

type Client

type Client struct {
	ID string

	Messages *MessageAgent
	Events   *EventAgent
}

func NewClient

func NewClient(w http.ResponseWriter, r *http.Request) (*Client, error)

func (*Client) BelongsTo

func (c *Client) BelongsTo(f *Family) bool

func (*Client) EventAgent

func (c *Client) EventAgent() *EventAgent

func (*Client) Join

func (c *Client) Join(families ...*Family)

func (*Client) Leave

func (c *Client) Leave(f *Family)

func (*Client) MessageAgent

func (c *Client) MessageAgent() *MessageAgent

func (*Client) PushMessage

func (c *Client) PushMessage(m []byte, mtype int)

func (*Client) Trigger

func (c *Client) Trigger(eventKind string, data DataGetter)

type DataGetter

type DataGetter interface {
	Data() interface{}
}

type Delegate

type Delegate interface {
	EventDelegate
	MessageDelegate
}

type Event

type Event struct {
	Kind      string
	Data      interface{}
	Recipient interface{}
	Source    interface{}
}

type EventAgent

type EventAgent struct {
	Hub *Hub

	// Delegate will become the recipient on Event objects received if set.
	Delegate interface{}
	// contains filtered or unexported fields
}

func NewEventAgent

func NewEventAgent() *EventAgent

func (*EventAgent) EventAgent

func (agent *EventAgent) EventAgent() *EventAgent

func (*EventAgent) Subscribe

func (agent *EventAgent) Subscribe(kind string, do EventHandler)

func (*EventAgent) Unsubscribe

func (agent *EventAgent) Unsubscribe(kind string, do EventHandler)

type EventData

type EventData struct {
	// contains filtered or unexported fields
}

func (*EventData) Data

func (ed *EventData) Data() interface{}

type EventDelegate

type EventDelegate interface {
	EventAgent() *EventAgent
}

type EventHandler

type EventHandler func(*Event)

EventHandler is a function that handles events.

type EventHandlerSet

type EventHandlerSet map[string]EventHandler

func (EventHandlerSet) Add

func (ehs EventHandlerSet) Add(h EventHandler)

func (EventHandlerSet) Remove

func (ehs EventHandlerSet) Remove(h EventHandler)

type Family

type Family struct {
	ID  string
	Hub *Hub

	Messages messageSubscriber
	Events   eventSubscriber
}

Family is group of Agents and AgentDelegates (both Message and Event type). Families can subscribe all of their members to handle messages and/or events. The family is "dumb" - no handling happens here.

func NewFamily

func NewFamily(id string) *Family

NewFamily creates a new instance of Family and adds it to a hub.

func (*Family) Add

func (f *Family) Add(d Delegate)

func (*Family) PushMessage

func (f *Family) PushMessage(m []byte, messageType int)

PushMessage implements MessagePusher

func (*Family) Remove

func (f *Family) Remove(d Delegate)

type Hub

type Hub struct {
	ID string
	// contains filtered or unexported fields
}

Hub is an isolated system for communication among member EventResponders An EventResponder should only belong to a single Hub at any given time. Hub does not interact with messages at all.

func DefaultHub

func DefaultHub() *Hub

DefaultHub can be used in situations where all EventResponders in the app share the same namespace and are allowed to communicate with one another. It is loaded lazily the first time this function is called.

func NewHub

func NewHub(id string) (*Hub, error)

NewHub creates a new Hub with a unique name. If the ID is already in use NewHub returns the hub with that ID as well as ErrDuplicateHubID

func (*Hub) Broadcast

func (h *Hub) Broadcast(eventKind string, data DataGetter, source interface{})

Broadcast informs all subscribed listeners to eventKind of the event. Source is optionally available as source of the event, and can be nil.

func (*Hub) NewClient

func (h *Hub) NewClient(w http.ResponseWriter, r *http.Request) (c *Client, err error)

func (*Hub) NewEventAgent

func (h *Hub) NewEventAgent() *EventAgent

func (*Hub) NewFamily

func (h *Hub) NewFamily(id string) *Family

func (*Hub) NewMessageAgent

func (h *Hub) NewMessageAgent(w http.ResponseWriter, r *http.Request) (*MessageAgent, error)

TODO tj - this should be protocol agnostic - for now, just pass in the http params

type Message

type Message struct {
	Kind      string
	Data      interface{}
	Recipient interface{}
	Source    *MessageAgent

	Raw []byte
}

type MessageAgent

type MessageAgent struct {
	Hub *Hub

	// Parser overrides the default message parsing behavior if defined.  Default is nil
	Parser MessageParser
	// Delegate allows another object to act as the Recipient of messages from this agent if defined.
	// Default is nil.
	Delegate interface{}
	// contains filtered or unexported fields
}

func NewMessageAgent

func NewMessageAgent(w http.ResponseWriter, r *http.Request) (*MessageAgent, error)

func (*MessageAgent) MessageAgent

func (agent *MessageAgent) MessageAgent() *MessageAgent

MessageAgent implements MessageDelegate

func (*MessageAgent) ParseBinary

func (agent *MessageAgent) ParseBinary(m []byte) (*ParsedMessage, error)

func (*MessageAgent) ParseText

func (agent *MessageAgent) ParseText(m []byte) (*ParsedMessage, error)

func (*MessageAgent) PushMessage

func (agent *MessageAgent) PushMessage(m []byte, mtype int)

func (*MessageAgent) StopListening

func (agent *MessageAgent) StopListening(kind string)

func (*MessageAgent) Subscribe

func (agent *MessageAgent) Subscribe(kind string, do MessageHandler)

func (*MessageAgent) Unsubscribe

func (agent *MessageAgent) Unsubscribe(kind string, do MessageHandler)

type MessageDelegate

type MessageDelegate interface {
	MessageAgent() *MessageAgent
}

type MessageHandler

type MessageHandler func(*Message)

MessageResponse is a function that is executed in response to a message.

type MessageHandlerSet

type MessageHandlerSet map[string]MessageHandler

MessageResponseSet stores a set of unique actions. Comparison is based on function pointer identity.

func (MessageHandlerSet) Add

func (mhs MessageHandlerSet) Add(h MessageHandler)

Add puts a new MessageHandler into the set. Warns asynchronously if r is already in the set.

func (MessageHandlerSet) Remove

func (mhs MessageHandlerSet) Remove(h MessageHandler)

Remove ensures that MessageHandler "r" is no longer present in the MessageHandlerSet

type MessageParser

type MessageParser interface {
	ParseText([]byte) (*ParsedMessage, error)
	ParseBinary([]byte) (*ParsedMessage, error)
}

MessageParser parses bytes into ParsedMessages

type MessagePusher

type MessagePusher interface {
	PushMessage([]byte, int)
}

MessagePusher can send a message over an existing WS connection

type ParsedMessage

type ParsedMessage struct {
	Value interface{}
	Raw   []byte
	Kind  string
}

func NewParsedMessage

func NewParsedMessage(kind string, data interface{}, raw []byte) *ParsedMessage

func ParseJSONMessage

func ParseJSONMessage(m []byte) (*ParsedMessage, error)

ParseJSONMessage parses a ParsedMessage containing JSON data from bytes if possible.

type SubscriptionSet

type SubscriptionSet map[chan *Event]struct{}

func (SubscriptionSet) Add

func (ss SubscriptionSet) Add(c chan *Event)

func (SubscriptionSet) Remove

func (ss SubscriptionSet) Remove(c chan *Event)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL