Documentation ¶
Index ¶
- Constants
- Variables
- func RegisterConnectorFactory(protocol string, factory ConnectorFactory)
- type ChanMsgSink
- type Component
- type ComponentBase
- type Composer
- type Composite
- type CompositeBase
- type Connector
- type ConnectorFactory
- type ContextRunner
- type DataPoint
- type Descriptor
- type EncodedPayload
- type Endpoint
- type EndpointRef
- type Future
- type Identity
- type IdentityImpl
- type ImmediateFuture
- type Message
- type MessageSink
- type MessageSinkFunc
- type MessageSource
- type OriginMsg
- type Publication
- type Publisher
- type Reactor
- type StreamMessage
- type Watchable
- type Watcher
Constants ¶
const Protocol = "mqhub"
Protocol defines the protocol in the URL A few examples of URL acceptable:
- mqhub+mqtt://server:port/topic
- mqhub+mqtt+ws://server:port/topic
- mqtt://server:port/topic
- mqtt+ws://server:port/topic
Variables ¶
var ( // ErrNoMessageSink is reported by DataPoint when message sink is not yet // connected ErrNoMessageSink = fmt.Errorf("message sink unavailable") )
Functions ¶
func RegisterConnectorFactory ¶
func RegisterConnectorFactory(protocol string, factory ConnectorFactory)
RegisterConnectorFactory registers a ConnectorFactory for the specified protocol
Types ¶
type ChanMsgSink ¶
type ChanMsgSink struct {
C chan Message
}
ChanMsgSink is a MessageSink and emits message to a chan
func (*ChanMsgSink) ConsumeMessage ¶
func (c *ChanMsgSink) ConsumeMessage(msg Message) Future
ConsumeMessage implements MessageSink
type Component ¶
type Component interface { Identity // Endpoints enumerates the endpoints this component exposes Endpoints() []Endpoint }
Component is the unit of an object which can be exposed to hub
type Composer ¶
type Composer interface {
AddComponent(Component)
}
Composer defines composition operations
type Composite ¶
type Composite interface {
Components() []Component
}
Composite is a collection of components
type CompositeBase ¶
type CompositeBase struct {
// contains filtered or unexported fields
}
CompositeBase implements Composite + Composer
func (*CompositeBase) AddComponent ¶
func (c *CompositeBase) AddComponent(comp Component)
AddComponent implements Composer
func (*CompositeBase) AddComponents ¶
func (c *CompositeBase) AddComponents(comps ...Component)
AddComponents add components
func (*CompositeBase) Components ¶
func (c *CompositeBase) Components() []Component
Components implements Composite
type Connector ¶
type Connector interface { io.Closer Watchable Connect() Future Publish(Component) (Publication, error) Describe(componentID string) Descriptor }
Connector defines a general connector to message source/bus
func NewConnector ¶
NewConnector creates a connector by parsing URL
type ConnectorFactory ¶
ConnectorFactory creates a connector from a URL
type ContextRunner ¶
ContextRunner defines a runner accepts a context the runner should be started using go runner.Run(ctx)
type DataPoint ¶
type DataPoint struct { Name string Retain bool Sink MessageSink }
DataPoint implements Endpoint for a data point
func NewDataPointFromEndpointRef ¶
func NewDataPointFromEndpointRef(ref EndpointRef) *DataPoint
NewDataPointFromEndpointRef creates a datapoint using an EndpointRef
func NewRetainDataPoint ¶
NewRetainDataPoint creates a new retain datapoint
func (*DataPoint) SinkMessage ¶
func (p *DataPoint) SinkMessage(sink MessageSink)
SinkMessage implements MessageSource
type Descriptor ¶
type Descriptor interface { Watchable ID() string SubComponent(id ...string) Descriptor Endpoint(name string) EndpointRef }
Descriptor describes a publication
type EncodedPayload ¶
EncodedPayload represent an already encoded message
type Endpoint ¶
type Endpoint interface { Identity }
Endpoint defines the endpoints exposed from a component an endpoint can be either a datapoint or a reactor
type EndpointRef ¶
type EndpointRef interface { Watchable MessageSink }
EndpointRef references remote endpoints
type IdentityImpl ¶
type IdentityImpl struct {
// contains filtered or unexported fields
}
IdentityImpl implements Identity
type ImmediateFuture ¶
type ImmediateFuture struct {
Error error
}
ImmediateFuture implements a future with immediate result
type Message ¶
type Message interface { // Component is the ID of component emitted this message Component() string // Endpoint is the name of the endpoint Endpoint() string // Value gets the original value which is not encoded Value() (interface{}, bool) // IsState indicate this is a datapoint as state IsState() bool // As decodes the message as the specified type As(interface{}) error }
Message is the abstraction of data entity passing through the hub
type MessageSink ¶
MessageSink defines the consumer of a message
func MessageSinkAs ¶
func MessageSinkAs(handler interface{}) MessageSink
MessageSinkAs converts a func with arbitrary parameter to MessageSink
type MessageSinkFunc ¶
MessageSinkFunc is func form of MessageSink
func (MessageSinkFunc) ConsumeMessage ¶
func (f MessageSinkFunc) ConsumeMessage(msg Message) Future
ConsumeMessage implements MessageSink
type MessageSource ¶
type MessageSource interface {
SinkMessage(MessageSink)
}
MessageSource emits messages
type Publication ¶
Publication is the expose of the component
type Publisher ¶
type Publisher interface {
Publish(Component) (Publication, error)
}
Publisher exposes components to hub
type Reactor ¶
type Reactor struct { Name string Handler MessageSink }
Reactor implements Endpoint for a reactor to an update
func ReactorFunc ¶
func ReactorFunc(name string, handler MessageSinkFunc) *Reactor
ReactorFunc creates a Reactor from MessageSinkFunc
func (*Reactor) ConsumeMessage ¶
ConsumeMessage implements MessageSink
func (*Reactor) DoFunc ¶
func (a *Reactor) DoFunc(handler MessageSinkFunc) *Reactor
DoFunc is same as Do but accepts a func
type StreamMessage ¶
type StreamMessage []byte
StreamMessage implements Message with raw bytes
func (StreamMessage) Component ¶
func (m StreamMessage) Component() string
Component implements Message
func (StreamMessage) Endpoint ¶
func (m StreamMessage) Endpoint() string
Endpoint implements Message
func (StreamMessage) Payload ¶
func (m StreamMessage) Payload() ([]byte, error)
Payload implements EncodedPayload
func (StreamMessage) Value ¶
func (m StreamMessage) Value() (interface{}, bool)
Value implements mqhub.Message
type Watchable ¶
type Watchable interface {
Watch(MessageSink) (Watcher, error)
}
Watchable is an object which others can watch for changes