mqhub

package
Version: v0.0.0-...-3c92e55 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2017 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const Protocol = "mqhub"

Protocol defines the protocol in the URL A few examples of URL acceptable:

- mqhub+mqtt://server:port/topic
- mqhub+mqtt+ws://server:port/topic
- mqtt://server:port/topic
- mqtt+ws://server:port/topic

Variables

View Source
var (
	// ErrNoMessageSink is reported by DataPoint when message sink is not yet
	// connected
	ErrNoMessageSink = fmt.Errorf("message sink unavailable")
)

Functions

func RegisterConnectorFactory

func RegisterConnectorFactory(protocol string, factory ConnectorFactory)

RegisterConnectorFactory registers a ConnectorFactory for the specified protocol

Types

type ChanMsgSink

type ChanMsgSink struct {
	C chan Message
}

ChanMsgSink is a MessageSink and emits message to a chan

func NewChanMsgSink

func NewChanMsgSink() *ChanMsgSink

NewChanMsgSink creates a ChanMsgSink

func (*ChanMsgSink) ConsumeMessage

func (c *ChanMsgSink) ConsumeMessage(msg Message) Future

ConsumeMessage implements MessageSink

type Component

type Component interface {
	Identity
	// Endpoints enumerates the endpoints this component exposes
	Endpoints() []Endpoint
}

Component is the unit of an object which can be exposed to hub

type ComponentBase

type ComponentBase struct {
	IdentityImpl
}

ComponentBase implements Component

type Composer

type Composer interface {
	AddComponent(Component)
}

Composer defines composition operations

type Composite

type Composite interface {
	Components() []Component
}

Composite is a collection of components

type CompositeBase

type CompositeBase struct {
	// contains filtered or unexported fields
}

CompositeBase implements Composite + Composer

func (*CompositeBase) AddComponent

func (c *CompositeBase) AddComponent(comp Component)

AddComponent implements Composer

func (*CompositeBase) AddComponents

func (c *CompositeBase) AddComponents(comps ...Component)

AddComponents add components

func (*CompositeBase) Components

func (c *CompositeBase) Components() []Component

Components implements Composite

type Connector

type Connector interface {
	io.Closer
	Watchable
	Connect() Future
	Publish(Component) (Publication, error)
	Describe(componentID string) Descriptor
}

Connector defines a general connector to message source/bus

func NewConnector

func NewConnector(URL string) (Connector, error)

NewConnector creates a connector by parsing URL

func NewConnectorURL

func NewConnectorURL(connURL url.URL) (Connector, error)

NewConnectorURL creates a connector from a parsed URL

type ConnectorFactory

type ConnectorFactory func(URL url.URL) (Connector, error)

ConnectorFactory creates a connector from a URL

type ContextRunner

type ContextRunner interface {
	Run(context.Context)
}

ContextRunner defines a runner accepts a context the runner should be started using go runner.Run(ctx)

type DataPoint

type DataPoint struct {
	Name   string
	Retain bool
	Sink   MessageSink
}

DataPoint implements Endpoint for a data point

func NewDataPoint

func NewDataPoint(name string) *DataPoint

NewDataPoint creates a new datapoint

func NewDataPointFromEndpointRef

func NewDataPointFromEndpointRef(ref EndpointRef) *DataPoint

NewDataPointFromEndpointRef creates a datapoint using an EndpointRef

func NewRetainDataPoint

func NewRetainDataPoint(name string) *DataPoint

NewRetainDataPoint creates a new retain datapoint

func (*DataPoint) ID

func (p *DataPoint) ID() string

ID implements Endpoint

func (*DataPoint) SinkMessage

func (p *DataPoint) SinkMessage(sink MessageSink)

SinkMessage implements MessageSource

func (*DataPoint) Update

func (p *DataPoint) Update(state interface{}) Future

Update updates the state

type Descriptor

type Descriptor interface {
	Watchable
	ID() string
	SubComponent(id ...string) Descriptor
	Endpoint(name string) EndpointRef
}

Descriptor describes a publication

type EncodedPayload

type EncodedPayload interface {
	Payload() ([]byte, error)
}

EncodedPayload represent an already encoded message

type Endpoint

type Endpoint interface {
	Identity
}

Endpoint defines the endpoints exposed from a component an endpoint can be either a datapoint or a reactor

type EndpointRef

type EndpointRef interface {
	Watchable
	MessageSink
}

EndpointRef references remote endpoints

type Future

type Future interface {
	Wait() error
}

Future represent an async operation

type Identity

type Identity interface {
	ID() string
}

Identity represents an object with ID

type IdentityImpl

type IdentityImpl struct {
	// contains filtered or unexported fields
}

IdentityImpl implements Identity

func (*IdentityImpl) ID

func (i *IdentityImpl) ID() string

ID implements Identity

func (*IdentityImpl) SetID

func (i *IdentityImpl) SetID(id string)

SetID specifies the ID

type ImmediateFuture

type ImmediateFuture struct {
	Error error
}

ImmediateFuture implements a future with immediate result

func (*ImmediateFuture) Wait

func (f *ImmediateFuture) Wait() error

Wait implements Future

type Message

type Message interface {
	// Component is the ID of component emitted this message
	Component() string
	// Endpoint is the name of the endpoint
	Endpoint() string
	// Value gets the original value which is not encoded
	Value() (interface{}, bool)
	// IsState indicate this is a datapoint as state
	IsState() bool
	// As decodes the message as the specified type
	As(interface{}) error
}

Message is the abstraction of data entity passing through the hub

type MessageSink

type MessageSink interface {
	ConsumeMessage(Message) Future
}

MessageSink defines the consumer of a message

func MessageSinkAs

func MessageSinkAs(handler interface{}) MessageSink

MessageSinkAs converts a func with arbitrary parameter to MessageSink

type MessageSinkFunc

type MessageSinkFunc func(Message) Future

MessageSinkFunc is func form of MessageSink

func (MessageSinkFunc) ConsumeMessage

func (f MessageSinkFunc) ConsumeMessage(msg Message) Future

ConsumeMessage implements MessageSink

type MessageSource

type MessageSource interface {
	SinkMessage(MessageSink)
}

MessageSource emits messages

type OriginMsg

type OriginMsg struct {
	ComponentID  string
	EndpointName string
	V            interface{}
	State        bool
}

OriginMsg wraps existing value

func MakeMsg

func MakeMsg(v interface{}, state bool) *OriginMsg

MakeMsg creates an OriginMsg

func MsgFrom

func MsgFrom(v interface{}) *OriginMsg

MsgFrom makes a stateless message

func StateFrom

func StateFrom(v interface{}) *OriginMsg

StateFrom makes a stateful message

func (*OriginMsg) As

func (m *OriginMsg) As(interface{}) error

As implements Message

func (*OriginMsg) Component

func (m *OriginMsg) Component() string

Component implements Message

func (*OriginMsg) Endpoint

func (m *OriginMsg) Endpoint() string

Endpoint implements Message

func (*OriginMsg) IsState

func (m *OriginMsg) IsState() bool

IsState implements Message

func (*OriginMsg) Value

func (m *OriginMsg) Value() (interface{}, bool)

Value implements Message

type Publication

type Publication interface {
	io.Closer
	Component() Component
}

Publication is the expose of the component

type Publisher

type Publisher interface {
	Publish(Component) (Publication, error)
}

Publisher exposes components to hub

type Reactor

type Reactor struct {
	Name    string
	Handler MessageSink
}

Reactor implements Endpoint for a reactor to an update

func ReactorAs

func ReactorAs(name string, handler interface{}) *Reactor

ReactorAs accepts a func with arbitrary parameter

func ReactorFunc

func ReactorFunc(name string, handler MessageSinkFunc) *Reactor

ReactorFunc creates a Reactor from MessageSinkFunc

func (*Reactor) ConsumeMessage

func (a *Reactor) ConsumeMessage(msg Message) Future

ConsumeMessage implements MessageSink

func (*Reactor) Do

func (a *Reactor) Do(sink MessageSink) *Reactor

Do sets the message handler

func (*Reactor) DoFunc

func (a *Reactor) DoFunc(handler MessageSinkFunc) *Reactor

DoFunc is same as Do but accepts a func

func (*Reactor) ID

func (a *Reactor) ID() string

ID implements Endpoint

type StreamMessage

type StreamMessage []byte

StreamMessage implements Message with raw bytes

func (StreamMessage) As

func (m StreamMessage) As(out interface{}) error

As implements Message

func (StreamMessage) Component

func (m StreamMessage) Component() string

Component implements Message

func (StreamMessage) Endpoint

func (m StreamMessage) Endpoint() string

Endpoint implements Message

func (StreamMessage) IsState

func (m StreamMessage) IsState() bool

IsState implements Message

func (StreamMessage) Payload

func (m StreamMessage) Payload() ([]byte, error)

Payload implements EncodedPayload

func (StreamMessage) Value

func (m StreamMessage) Value() (interface{}, bool)

Value implements mqhub.Message

type Watchable

type Watchable interface {
	Watch(MessageSink) (Watcher, error)
}

Watchable is an object which others can watch for changes

type Watcher

type Watcher interface {
	io.Closer
	Watched() Watchable
}

Watcher is an established watching state

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL