pubsub

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2023 License: BSD-3-Clause Imports: 4 Imported by: 4

Documentation

Overview

Package pubsub defines interfaces for accessing dynamically changing process configuration information.

Settings represent configuration parameters and their value. Settings are published to named Streams. Streams are forked to add additional consumers, i.e. readers of the Settings published to the Stream.

Settings are represented by an interface type that wraps the data and provides a name and description for each Settings. Streams are similarly named and also have a description. When streams are 'forked' the latest value of all Settings that have been sent over the Stream are made available to the caller. This allows for a rendezvous between the single producer of Settings and multiple consumers of those Settings that may be added at arbitrary points in time.

Streams are hosted by a Publisher type, which in addition to the methods required for managing Streams provides a means to shut down all of the Streams it hosts.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Format

func Format(s Setting) string

Format formats a Setting in a consistent manner, it is intended to be used when implementing the Setting interface.

Types

type Any

type Any struct {
	// contains filtered or unexported fields
}

Type Any can be used to represent or implement a Setting of any type.

func (*Any) Description

func (s *Any) Description() string

func (*Any) Name

func (s *Any) Name() string

func (*Any) String

func (s *Any) String() string

func (*Any) Value

func (s *Any) Value() interface{}

type DurationFlag

type DurationFlag struct{ time.Duration }

DurationFlag implements flag.Value in order to provide validation of duration values in the flag package.

func (DurationFlag) Get

func (d DurationFlag) Get() interface{}

Implements flag.Value.Get

func (*DurationFlag) Set

func (d *DurationFlag) Set(s string) error

Implements flag.Value.Set

func (DurationFlag) String

func (d DurationFlag) String() string

Implements flag.Value.String

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

A Publisher provides a mechanism for communicating Settings from a set of producers to multiple consumers. Each such producer and associated consumers are called a Stream. Operations are provided for creating streams (CreateStream) and adding new consumers (ForkStream). Communication is implemented using channels, with the producer and consumers providing channels to send and receive Settings over. A Stream remembers the last value of all Settings that were sent over it; these can be retrieved via ForkStream or the Latest method.

The Publisher may be shut down by calling its Shutdown method and the producers will be notified via the channel returned by CreateStream, at which point they should close the channel they use for publishing Settings. If producers fail to close this channel then the Publisher will leak goroutines (one per stream) when it is shutdown.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"v.io/x/ref/lib/pubsub"
)

func main() {
	in := make(chan pubsub.Setting)
	pub := pubsub.NewPublisher()
	pub.CreateStream("net", "network settings", in)

	// Publish an initial Setting to the Stream.
	// Note that this, and the call to ForkStream below are in a race since
	// although the send below will not complete until the value has been read
	// from it by the internal goroutine in the publisher, that goroutine may
	// get descheduled before it can update the internal state in the publisher
	// that ForkStream uses to capture the latest value. That is, the latest
	// value returned by ForkStream below may be nil, hence the while loop below.
	in <- pubsub.NewString("ip", "address", "1.2.3.4")

	// A simple producer of IP address settings.
	producer := func() {
		in <- pubsub.NewString("ip", "address", "1.2.3.5")
	}

	var waiter sync.WaitGroup
	waiter.Add(2)

	// A simple consumer of IP address Settings.
	consumer := func(ch chan pubsub.Setting) {
		fmt.Printf("consumer: %v\n", <-ch)
		waiter.Done()
	}

	// Fork the stream twice, and read the latest value.
	ch1 := make(chan pubsub.Setting)
	var st *pubsub.Stream
	for st == nil { // See comment above.
		time.Sleep(time.Millisecond)
		st, _ = pub.ForkStream("net", ch1)
	}
	fmt.Printf("1: %v\n", st.Latest["ip"])
	st = nil
	ch2 := make(chan pubsub.Setting)
	for st == nil { // See comment above.
		time.Sleep(time.Millisecond)
		st, _ = pub.ForkStream("net", ch2)
	}
	fmt.Printf("2: %v\n", st.Latest["ip"])

	// Now we can read new Settings as they are generated.
	go producer()
	go consumer(ch1)
	go consumer(ch2)

	waiter.Wait()

}
Output:

1: ip: address: (string: 1.2.3.4)
2: ip: address: (string: 1.2.3.4)
consumer: ip: address: (string: 1.2.3.5)
consumer: ip: address: (string: 1.2.3.5)

func NewPublisher

func NewPublisher() *Publisher

NewPublisher creates a Publisher.

func (*Publisher) CloseFork

func (p *Publisher) CloseFork(name string, ch chan<- Setting) error

CloseFork removes the specified channel from the named stream. The caller must drain the channel before closing it. TODO(cnicolaou): add tests for this.

func (*Publisher) CreateStream

func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error)

CreateStream creates a Stream with the provided name and description (note, Settings have their own names and description, these are for the stream). In general, no buffering is required for this channel since the Publisher itself will read from it, however, if the consumers are slow then the publisher may be slow in draining the channel. The publisher should provide additional buffering if this is a concern. Consequently this mechanism should be used for rarely changing Settings, such as network address changes forced by DHCP and hence no buffering will be required. The channel returned by CreateStream is closed when the publisher is shut down and hence the caller should wait for this to occur and then close the channel it has passed to CreateStream.

func (*Publisher) ForkStream

func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error)

ForkStream 'forks' the named stream to add a new consumer. The channel provided is to be used to read Settings sent down the stream. This channel will be closed by the Publisher when it is asked to shut down. The reader on this channel must be able to keep up with the flow of Settings through the Stream in order to avoid blocking all other readers and hence should set an appropriate amount of buffering for the channel it passes in. ForkStream returns the most recent values of all Settings previously sent over the stream, thus allowing its caller to synchronise with the stream.

func (*Publisher) Latest

func (p *Publisher) Latest(name string) *Stream

Latest returns information on the requested stream, including the last instance of all Settings, if any, that flowed over it.

func (*Publisher) Shutdown

func (p *Publisher) Shutdown()

Shutdown initiates the process of stopping the operation of the Publisher. All of the channels passed to CreateStream must be closed by their owner to ensure that all goroutines are garbage collected.

Example
package main

import (
	"fmt"
	"sync"

	"v.io/x/ref/lib/pubsub"
)

func main() {
	in := make(chan pubsub.Setting)
	pub := pubsub.NewPublisher()
	stop, _ := pub.CreateStream("net", "network settings", in)

	var producerReady sync.WaitGroup
	producerReady.Add(1)
	var consumersReady sync.WaitGroup
	consumersReady.Add(2)

	// A producer to write 100 Settings before signalling that it's
	// ready to be shutdown. This is purely to demonstrate how to use
	// Shutdown.
	producer := func() {
		for i := 0; ; i++ {
			select {
			case <-stop:
				close(in)
				return
			default:
				in <- pubsub.NewString("ip", "address", "1.2.3.4")
				if i == 100 {
					producerReady.Done()
				}
			}
		}
	}

	var waiter sync.WaitGroup
	waiter.Add(2)

	consumer := func() {
		ch := make(chan pubsub.Setting, 10)
		pub.ForkStream("net", ch) //nolint:errcheck
		consumersReady.Done()
		i := 0
		for {
			if _, ok := <-ch; !ok {
				// The channel has been closed when the publisher
				// is asked to shut down.
				break
			}
			i++
		}
		if i >= 100 {
			// We've received at least 100 Settings as per the producer above.
			fmt.Println("done")
		}
		waiter.Done()
	}

	go consumer()
	go consumer()
	consumersReady.Wait()
	go producer()
	producerReady.Wait()
	pub.Shutdown()
	waiter.Wait()
}
Output:

done
done

func (*Publisher) String

func (p *Publisher) String() string

String returns a string representation of the publisher, including the names and descriptions of all the streams it currently supports.

type Setting

type Setting interface {
	String() string
	// Name returns the name of the Setting
	Name() string
	// Description returns the description of the Setting
	Description() string
	// Value returns the value of the Setting.
	Value() interface{}
}

Setting must be implemented by all data types to sent over Publisher streams.

func NewAny

func NewAny(name, description string, value interface{}) Setting

func NewBool

func NewBool(name, description string, value bool) Setting

func NewDuration

func NewDuration(name, description string, value time.Duration) Setting

func NewFloat64

func NewFloat64(name, description string, value float64) Setting

func NewInt

func NewInt(name, description string, value int) Setting

func NewInt64

func NewInt64(name, description string, value int64) Setting

func NewString

func NewString(name, description string, value string) Setting

type Stream

type Stream struct {
	Name, Description string
	// Latest is a map of Setting names to the Setting itself.
	Latest map[string]Setting
}

Stream is returned by Latest and includes the name and description for the stream and the most recent values of the Setting that flowed through it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL