Documentation ¶
Overview ¶
NotifyDispatcher attempts to make working with a single Listener easy for a dynamic set of independent listeners.
Usage ¶
Viz:
import ( "github.com/lib/pq" "github.com/johto/notifyutils/notifydispatcher" "fmt" "time" ) func listener(dispatcher *notifydispatcher.NotifyDispatcher) { ch := make(chan *pq.Notification, 8) err := dispatcher.Listen("listenerchannel", ch) if err != nil { panic(err) } for n := range ch { if n == nil { fmt.Println("lost connection, but we're fine now!") continue } fmt.Println("received notification!") // do something with notification } panic("could not keep up!") } func main() { dispatcher := notifydispatcher.NewNotifyDispatcher(pq.NewListener("", time.Second, time.Minute, nil)) for i := 0; i < 8; i++ { go listener(dispatcher) } select{} }
Index ¶
- Variables
- type BroadcastChannel
- type Listener
- type NotifyDispatcher
- func (d *NotifyDispatcher) Close() error
- func (d *NotifyDispatcher) CloseBroadcastChannel(ch BroadcastChannel)
- func (d *NotifyDispatcher) Listen(channel string, ch chan<- *pq.Notification) error
- func (d *NotifyDispatcher) NumActiveChannels() int
- func (d *NotifyDispatcher) OpenBroadcastChannel() BroadcastChannel
- func (d *NotifyDispatcher) SetBroadcastOnConnectionLoss(value bool)
- func (d *NotifyDispatcher) SetSlowReaderEliminationStrategy(strategy SlowReaderEliminationStrategy)
- func (d *NotifyDispatcher) Unlisten(channel string, ch chan<- *pq.Notification) error
- type SlowReaderEliminationStrategy
Constants ¶
This section is empty.
Variables ¶
var ( ErrChannelAlreadyActive = errors.New("channel is already active") ErrChannelNotActive = errors.New("channel is not active") )
Functions ¶
This section is empty.
Types ¶
type BroadcastChannel ¶
type BroadcastChannel struct { Channel chan struct{} // contains filtered or unexported fields }
type Listener ¶
type Listener interface { Listen(channel string) error Unlisten(channel string) error NotificationChannel() <-chan *pq.Notification }
This is the part of *pq.Listener's interface we're using. pq itself doesn't provide such an interface, but we can just roll our own.
type NotifyDispatcher ¶
type NotifyDispatcher struct {
// contains filtered or unexported fields
}
func NewNotifyDispatcher ¶
func NewNotifyDispatcher(l Listener) *NotifyDispatcher
NewNotifyDispatcher creates a new NotifyDispatcher, using the supplied pq.Listener underneath. The ownership of the Listener is transferred to NotifyDispatcher. You should not use it after calling NewNotifyDispatcher.
func (*NotifyDispatcher) Close ¶
func (d *NotifyDispatcher) Close() error
func (*NotifyDispatcher) CloseBroadcastChannel ¶
func (d *NotifyDispatcher) CloseBroadcastChannel(ch BroadcastChannel)
Closes the broadcast channel ch.
func (*NotifyDispatcher) Listen ¶
func (d *NotifyDispatcher) Listen(channel string, ch chan<- *pq.Notification) error
Listen adds ch to the set of listeners for notification channel channel. ch should be a buffered channel. If ch is already in the set of listeners for channel, ErrChannelAlreadyActive is returned. After Listen has returned, the notification channel is open and the dispatcher will attempt to deliver all notifications received for that channel to ch.
If SlowReaderEliminationStrategy is CloseSlowReaders, reusing the same ch for multiple notification channels is not allowed.
func (*NotifyDispatcher) NumActiveChannels ¶
func (d *NotifyDispatcher) NumActiveChannels() int
Returns the number of channels which were active at some point in time during the call to NumActiveChannels.
func (*NotifyDispatcher) OpenBroadcastChannel ¶
func (d *NotifyDispatcher) OpenBroadcastChannel() BroadcastChannel
Opens a new "broadcast channel". A broadcast channel is sent to by the NotifyDispatcher every time the underlying Listener has re-established its database connection.
func (*NotifyDispatcher) SetBroadcastOnConnectionLoss ¶
func (d *NotifyDispatcher) SetBroadcastOnConnectionLoss(value bool)
Controls whether a nil notification from the underlying Listener is broadcast to all channels in the set.
func (*NotifyDispatcher) SetSlowReaderEliminationStrategy ¶
func (d *NotifyDispatcher) SetSlowReaderEliminationStrategy(strategy SlowReaderEliminationStrategy)
Sets the strategy for mitigating the adverse effects slow readers might have on the dispatcher. See SlowReaderEliminationStrategy.
func (*NotifyDispatcher) Unlisten ¶
func (d *NotifyDispatcher) Unlisten(channel string, ch chan<- *pq.Notification) error
Removes ch from the set of listeners for notification channel channel. If ch is not in the set of listeners for channel, ErrChannelNotActive is returned.
type SlowReaderEliminationStrategy ¶
type SlowReaderEliminationStrategy int
SlowReaderEliminationStrategy controls the behaviour of the dispatcher in case the buffer of a listener's channel is full and attempting to send to it would block the dispatcher, preventing it from delivering notifications for unrelated listeners. The default is CloseSlowReaders, but it can be changed at any point during a dispatcher's lifespan using SetSlowReaderEliminationStrategy.
const ( // When a send would block, the listener's channel is removed from the set // of listeners for that notification channel, and the channel is closed. // This is the default strategy. CloseSlowReaders SlowReaderEliminationStrategy = iota // When a send would block, the notification is not delivered. Delivery is // not reattempted. NeglectSlowReaders )