Documentation
¶
Index ¶
Examples ¶
Constants ¶
const AlertTopic = "hub.subscription.messageslost"
AlertTopic is used to notify when a nonblocking subscriber loose one message You can subscribe on this topic and log or send metrics.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Fields ¶
type Fields map[string]interface{}
Fields is a [key]value storage for Messages values.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is a component that provides publish and subscribe capabilities for messages. Every message has a Name used to route them to subscribers and this can be used like RabbitMQ topics exchanges. Where every word is separated by dots `.` and you can use `*` as a wildcard.
Example ¶
package main
import (
"fmt"
"sync"
"github.com/leandro-lugaresi/hub"
)
func main() {
h := hub.New()
var wg sync.WaitGroup
// the cap param is used to create one buffered channel with cap = 10
// If you wan an unbuferred channel use the 0 cap
sub := h.Subscribe(10, "account.login.*", "account.changepassword.*")
wg.Add(1)
go func(s hub.Subscription) {
for msg := range s.Receiver {
fmt.Printf("receive msg with topic %s and id %d\n", msg.Name, msg.Fields["id"])
}
wg.Done()
}(sub)
h.Publish(hub.Message{
Name: "account.login.failed",
Fields: hub.Fields{"id": 123},
})
h.Publish(hub.Message{
Name: "account.changepassword.failed",
Fields: hub.Fields{"id": 456},
})
h.Publish(hub.Message{
Name: "account.login.success",
Fields: hub.Fields{"id": 123},
})
// message not routed to this subscriber
h.Publish(hub.Message{
Name: "account.foo.failed",
Fields: hub.Fields{"id": 789},
})
// close all the subscribers
h.Close()
// wait until finish all the messages on buffer
wg.Wait()
}
Output: receive msg with topic account.login.failed and id 123 receive msg with topic account.changepassword.failed and id 456 receive msg with topic account.login.success and id 123
func (*Hub) Close ¶
func (h *Hub) Close()
Close will unsubscribe all the subscriptions and close them all.
func (*Hub) NonBlockingSubscribe ¶
func (h *Hub) NonBlockingSubscribe(cap int, topics ...string) Subscription
NonBlockingSubscribe create a nonblocking subscription to receive events for a given topic. This subscriber will loose messages if the buffer reaches the max capability.
func (*Hub) Subscribe ¶
func (h *Hub) Subscribe(cap int, topics ...string) Subscription
Subscribe create a blocking subscription to receive events for a given topic. The cap param is used inside the subscriber and in this case used to create a channel. cap(1) = unbuffered channel.
func (*Hub) Unsubscribe ¶
func (h *Hub) Unsubscribe(sub Subscription)
Unsubscribe remove and close the Subscription.
type Message ¶
Message represent some message/event passed into the hub It also contain some helper functions to convert the fields to primitive types.
type Subscription ¶
type Subscription struct {
Topics []string
Receiver <-chan Message
// contains filtered or unexported fields
}
Subscription represents a topic subscription.