notifydispatcher

package
v0.0.0-...-a8b71d7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2015 License: MIT Imports: 5 Imported by: 1

Documentation

Overview

NotifyDispatcher attempts to make working with a single Listener easy for a dynamic set of independent listeners.

Usage

Viz:

import (
    "github.com/lib/pq"
    "github.com/johto/notifyutils/notifydispatcher"
    "fmt"
    "time"
)

func listener(dispatcher *notifydispatcher.NotifyDispatcher) {
    ch := make(chan *pq.Notification, 8)
    err := dispatcher.Listen("listenerchannel", ch)
    if err != nil {
        panic(err)
    }
    for n := range ch {
        if n == nil {
            fmt.Println("lost connection, but we're fine now!")
            continue
        }

        fmt.Println("received notification!")
        // do something with notification
    }
    panic("could not keep up!")
}

func main() {
    dispatcher := notifydispatcher.NewNotifyDispatcher(pq.NewListener("", time.Second, time.Minute, nil))
    for i := 0; i < 8; i++ {
        go listener(dispatcher)
    }
    select{}
}

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrChannelAlreadyActive = errors.New("channel is already active")
	ErrChannelNotActive     = errors.New("channel is not active")
)

Functions

This section is empty.

Types

type BroadcastChannel

type BroadcastChannel struct {
	Channel chan struct{}
	// contains filtered or unexported fields
}

type Listener

type Listener interface {
	Listen(channel string) error
	Unlisten(channel string) error
	NotificationChannel() <-chan *pq.Notification
}

This is the part of *pq.Listener's interface we're using. pq itself doesn't provide such an interface, but we can just roll our own.

type NotifyDispatcher

type NotifyDispatcher struct {
	// contains filtered or unexported fields
}

func NewNotifyDispatcher

func NewNotifyDispatcher(l Listener) *NotifyDispatcher

NewNotifyDispatcher creates a new NotifyDispatcher, using the supplied pq.Listener underneath. The ownership of the Listener is transferred to NotifyDispatcher. You should not use it after calling NewNotifyDispatcher.

func (*NotifyDispatcher) Close

func (d *NotifyDispatcher) Close() error

func (*NotifyDispatcher) CloseBroadcastChannel

func (d *NotifyDispatcher) CloseBroadcastChannel(ch BroadcastChannel)

Closes the broadcast channel ch.

func (*NotifyDispatcher) Listen

func (d *NotifyDispatcher) Listen(channel string, ch chan<- *pq.Notification) error

Listen adds ch to the set of listeners for notification channel channel. ch should be a buffered channel. If ch is already in the set of listeners for channel, ErrChannelAlreadyActive is returned. After Listen has returned, the notification channel is open and the dispatcher will attempt to deliver all notifications received for that channel to ch.

If SlowReaderEliminationStrategy is CloseSlowReaders, reusing the same ch for multiple notification channels is not allowed.

func (*NotifyDispatcher) NumActiveChannels

func (d *NotifyDispatcher) NumActiveChannels() int

Returns the number of channels which were active at some point in time during the call to NumActiveChannels.

func (*NotifyDispatcher) OpenBroadcastChannel

func (d *NotifyDispatcher) OpenBroadcastChannel() BroadcastChannel

Opens a new "broadcast channel". A broadcast channel is sent to by the NotifyDispatcher every time the underlying Listener has re-established its database connection.

func (*NotifyDispatcher) SetBroadcastOnConnectionLoss

func (d *NotifyDispatcher) SetBroadcastOnConnectionLoss(value bool)

Controls whether a nil notification from the underlying Listener is broadcast to all channels in the set.

func (*NotifyDispatcher) SetSlowReaderEliminationStrategy

func (d *NotifyDispatcher) SetSlowReaderEliminationStrategy(strategy SlowReaderEliminationStrategy)

Sets the strategy for mitigating the adverse effects slow readers might have on the dispatcher. See SlowReaderEliminationStrategy.

func (*NotifyDispatcher) Unlisten

func (d *NotifyDispatcher) Unlisten(channel string, ch chan<- *pq.Notification) error

Removes ch from the set of listeners for notification channel channel. If ch is not in the set of listeners for channel, ErrChannelNotActive is returned.

type SlowReaderEliminationStrategy

type SlowReaderEliminationStrategy int

SlowReaderEliminationStrategy controls the behaviour of the dispatcher in case the buffer of a listener's channel is full and attempting to send to it would block the dispatcher, preventing it from delivering notifications for unrelated listeners. The default is CloseSlowReaders, but it can be changed at any point during a dispatcher's lifespan using SetSlowReaderEliminationStrategy.

const (
	// When a send would block, the listener's channel is removed from the set
	// of listeners for that notification channel, and the channel is closed.
	// This is the default strategy.
	CloseSlowReaders SlowReaderEliminationStrategy = iota

	// When a send would block, the notification is not delivered.  Delivery is
	// not reattempted.
	NeglectSlowReaders
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL