This section is empty.


View Source
var (
	// ErrClosed is returned from calls to a service or interface in the event
	// that the Close() function has already been called.
	ErrClosed = errors.New("monitor is closing or already closed")


This section is empty.


type Monitor

type Monitor struct {

	// contains filtered or unexported fields


Monitor represents a DFSR backlog monitor for a domain.

func New

func New(source Source, interval, timeout, cache time.Duration, limit uint) *Monitor

New creates a new Monitor with the given source and polling interval.

If cache is nonzero then the monitor will cache version vectors for the given duration.

If limit is nonzero then the monitor will limit the number of active queries to an individual DFSR member to the given value.

The returned monitor will not function until start is called.

func (*Monitor) Close

func (m *Monitor) Close()

Close will release resources consumed by the monitor. It should be called when finished with the monitor.

func (*Monitor) Listen

func (m *Monitor) Listen(chanSize int) <-chan *Update

Listen returns a channel on which DFSR backlog updates will be broadcast. The channel will be closed when the monitor is closed or when unlisten is called for the returned channel.

The returned channel will use the provided channel buffer size.

When the monitor starts a new round of polling it sends an update to all of the currently registered listeners. This lets the listeners know that a polling session has started and allows them to interact with the update in the way that is best suited to their use case, either through update.Listen() or update.Values().

If any of the listeners have stalled out or are being processed slowly enough that their channel buffer is full, the monitor will block until the update can be received. The monitor will not begin to execute queries until all of the listeners are ready to receive data. This is an intentional design decision that avoids exhaustion of system resources when the consumers of the monitor's updates are unable to function normally.

func (*Monitor) Start

func (m *Monitor) Start() error

Start starts the monitor. If the monitor is already running start does nothing and returns nil. If it is unable to initialize a DFSR client start will return an error. If the monitor is already closed ErrClosed will be returned.

func (*Monitor) Stop

func (m *Monitor) Stop()

Stop stops the monitor and prevents further polling of DFSR backlogs until start is called again.

func (*Monitor) Unlisten

func (m *Monitor) Unlisten(c <-chan *Update) (found bool)

Unlisten closes the given listener's channel and removes it from the set of listeners that receive DFSR backlog values.

Unlisten returns false if the listener was not present.

func (*Monitor) Update

func (m *Monitor) Update()

Update requests immediate retrieval of DFSR backlogs. It does not wait for the retrieval to complete.

If the monitor has not been started Update will do nothing. If an update is already running a second update will not be started.

type Source

type Source interface {
	Value() (*dfsr.Domain, time.Time, error)

Source represents a domain-wide configuration source.

type Update

type Update struct {
	Domain *dfsr.Domain // Domain configuration at the time of the update

	// contains filtered or unexported fields


Update represents an in-progress DFSR update performed by the monitor.

func (*Update) Duration

func (u *Update) Duration() time.Duration

Duration will wait until the update has finished, then return the total wall time of the update.

func (*Update) End

func (u *Update) End() time.Time

End will wait until the update has finished, then return the completion time of the update.

func (*Update) Listen

func (u *Update) Listen() <-chan *dfsr.Backlog

Listen returns a channel that receives the DFSR backlog values as they are retrieved for this update, in no particular order.

The returned channel will be buffered with sufficient capacity to hold the maximum number of values that could be returned. Once all of the values for this update have been sent the channel will be closed. This makes the channel suitable for consumption with range:

for backlog := range update.Listen() {
  log.Printf("Backlog retrieved: %d", backlog.Sum())

func (*Update) Size

func (u *Update) Size() int

Size returns the number of values that will be returned in the update if it completes successfully. If the update is canceled the actual number of values returned may be less than this number.

func (*Update) Start

func (u *Update) Start() time.Time

Start will return the start time of the update.

func (*Update) Values

func (u *Update) Values() (values []*dfsr.Backlog)

Values will block until the update has finished, then return a slice of the retrieved backlog data.


Path Synopsis