Documentation
¶
Overview ¶
Package bridge contains all things bridge / broker connector
Example (ConnectUsingBrokerViaTCP) ¶
package main
import (
"fmt"
"github.com/pb33f/ranch/bridge"
"github.com/pb33f/ranch/bus"
)
func main() {
// get a reference to the event bus.
b := bus.GetBus()
// create a broker connector configuration, using WebSockets.
// Make sure you have a STOMP TCP server running like RabbitMQ
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: ":61613",
STOMPHeader: map[string]string{
"access-token": "test",
},
}
// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
fmt.Printf("unable to connect, error: %e", err)
}
defer c.Disconnect()
// subscribe to our demo simple-stream
s, _ := c.Subscribe("/queue/sample")
// set a counter
n := 0
// create a control chan
done := make(chan bool)
// listen for messages
var consumer = func() {
for {
// listen for incoming messages from subscription.
m := <-s.GetMsgChannel()
n++
// get byte array.
d := m.Payload.([]byte)
fmt.Printf("Message Received: %s\n", string(d))
// listen for 5 messages then stop.
if n >= 5 {
break
}
}
done <- true
}
// send messages
var producer = func() {
for i := 0; i < 5; i++ {
c.SendMessage("/queue/sample", "text/plain", []byte(fmt.Sprintf("message: %d", i)))
}
}
// listen for incoming messages on subscription for destination /queue/sample
go consumer()
// send some messages to the broker on destination /queue/sample
go producer()
// wait for messages to be processed.
<-done
}
Example (ConnectUsingBrokerViaWebSocket) ¶
package main
import (
"encoding/json"
"fmt"
"github.com/pb33f/ranch/bridge"
"github.com/pb33f/ranch/bus"
"github.com/pb33f/ranch/model"
)
func main() {
// get a reference to the event bus.
b := bus.GetBus()
// create a broker connector configuration, using WebSockets.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "appfabric.vmware.com",
WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/fabric"},
UseWS: true,
STOMPHeader: map[string]string{
"access-token": "test",
},
}
// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
fmt.Printf("unable to connect, error: %e", err)
}
// subscribe to our demo simple-stream
s, _ := c.Subscribe("/topic/simple-stream")
// set a counter
n := 0
// create a control chan
done := make(chan bool)
var listener = func() {
for {
// listen for incoming messages from subscription.
m := <-s.GetMsgChannel()
// unmarshal message.
r := &model.Response{}
d := m.Payload.([]byte)
json.Unmarshal(d, &r)
fmt.Printf("Message Received: %s\n", r.Payload.(string))
n++
// listen for 5 messages then stop.
if n >= 5 {
break
}
}
done <- true
}
// listen for incoming messages on subscription.
go listener()
<-done
c.Disconnect()
}
Index ¶
- type BridgeClient
- func (ws *BridgeClient) Connect(url *url.URL, config *BrokerConnectorConfig) error
- func (ws *BridgeClient) Disconnect() error
- func (ws *BridgeClient) Send(destination, contentType string, payload []byte, ...)
- func (ws *BridgeClient) SendFrame(f *frame.Frame)
- func (ws *BridgeClient) Subscribe(destination string) *BridgeClientSub
- type BridgeClientSub
- type BrokerConnector
- type BrokerConnectorConfig
- type Connection
- type Subscription
- type WebSocketConfig
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BridgeClient ¶
type BridgeClient struct {
WSc *websocket.Conn // WebSocket connection
TCPc *stomp.Conn // STOMP TCP Connection
ConnectedChan chan bool
Subscriptions map[string]*BridgeClientSub
// contains filtered or unexported fields
}
BridgeClient encapsulates all subscriptions and io to and from brokers.
func NewBridgeWsClient ¶
func NewBridgeWsClient(enableLogging bool) *BridgeClient
NewBridgeWsClient Create a new WebSocket client.
func (*BridgeClient) Connect ¶
func (ws *BridgeClient) Connect(url *url.URL, config *BrokerConnectorConfig) error
Connect to broker endpoint.
func (*BridgeClient) Disconnect ¶
func (ws *BridgeClient) Disconnect() error
Disconnect from broker endpoint
func (*BridgeClient) Send ¶
func (ws *BridgeClient) Send(destination, contentType string, payload []byte, opts ...func(fr *frame.Frame) error)
Send a payload to a destination
func (*BridgeClient) SendFrame ¶
func (ws *BridgeClient) SendFrame(f *frame.Frame)
SendFrame fire a STOMP frame down the WebSocket
func (*BridgeClient) Subscribe ¶
func (ws *BridgeClient) Subscribe(destination string) *BridgeClientSub
Subscribe to destination
type BridgeClientSub ¶
type BridgeClientSub struct {
C chan *model.Message // MESSAGE payloads
E chan *model.Message // ERROR payloads.
Id *uuid.UUID
Destination string
Client *BridgeClient
// contains filtered or unexported fields
}
BridgeClientSub is a client subscription that encapsulates message and error channels for a subscription
func (*BridgeClientSub) Unsubscribe ¶
func (cs *BridgeClientSub) Unsubscribe()
Send an UNSUBSCRIBE frame for subscription destination.
type BrokerConnector ¶
type BrokerConnector interface {
Connect(config *BrokerConnectorConfig, enableLogging bool) (Connection, error)
}
BrokerConnector is used to connect to a message broker over TCP or WebSocket.
type BrokerConnectorConfig ¶
type BrokerConnectorConfig struct {
Username string
Password string
ServerAddr string
UseWS bool // use WebSocket instead of TCP
WebSocketConfig *WebSocketConfig // WebSocket configuration for when UseWS is true
HostHeader string
HeartBeatOut time.Duration // outbound heartbeat interval (from client to server)
HeartBeatIn time.Duration // inbound heartbeat interval (from server to client)
STOMPHeader map[string]string // additional STOMP headers for handshake
HttpHeader http.Header // additional HTTP headers for WebSocket Upgrade
}
BrokerConnectorConfig is a configuration used when connecting to a message broker
type Connection ¶
type Connection interface {
GetId() *uuid.UUID
Subscribe(destination string) (Subscription, error)
SubscribeReplyDestination(destination string) (Subscription, error)
Disconnect() (err error)
SendJSONMessage(destination string, payload []byte, opts ...func(*frame.Frame) error) error
SendMessage(destination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error
SendMessageWithReplyDestination(destination, replyDestination, contentType string, payload []byte, opts ...func(*frame.Frame) error) error
Conversation(destination string, payload []byte, opts ...func(*frame.Frame) error) (Subscription, error)
RequestResponse(ctx context.Context, payload []byte, opts ...func(*frame.Frame) error) (*model.Message, error)
}
type Subscription ¶
type WebSocketConfig ¶
type WebSocketConfig struct {
WSPath string // if UseWS is true, set this to your websocket path (e.g. '/fabric')
UseTLS bool // use TLS encryption with WebSocket connection
TLSConfig *tls.Config // TLS config for WebSocket connection
CertFile string // X509 certificate for TLS
KeyFile string // matching key file for the X509 certificate
}
func (*WebSocketConfig) LoadX509KeyPairFromFiles ¶
func (b *WebSocketConfig) LoadX509KeyPairFromFiles(certFile, keyFile string) error
LoadX509KeyPairFromFiles loads from paths to x509 cert and its matching key files and initializes the Certificates field of the TLS config instance with their contents, only if both Certificates is an empty slice and GetCertificate is nil