Documentation ¶
Overview ¶
Package pubsub defines interfaces for accessing dynamically changing process configuration information.
Settings represent configuration parameters and their value. Settings are published to named Streams. Streams are forked to add additional consumers, i.e. readers of the Settings published to the Stream.
Settings are represented by an interface type that wraps the data and provides a name and description for each Settings. Streams are similarly named and also have a description. When streams are 'forked' the latest value of all Settings that have been sent over the Stream are made available to the caller. This allows for a rendezvous between the single producer of Settings and multiple consumers of those Settings that may be added at arbitrary points in time.
Streams are hosted by a Publisher type, which in addition to the methods required for managing Streams provides a means to shut down all of the Streams it hosts.
Index ¶
- func Format(s Setting) string
- type Any
- type DurationFlag
- type Publisher
- func (p *Publisher) CloseFork(name string, ch chan<- Setting) error
- func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error)
- func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error)
- func (p *Publisher) Latest(name string) *Stream
- func (p *Publisher) Shutdown()
- func (p *Publisher) String() string
- type Setting
- func NewAny(name, description string, value interface{}) Setting
- func NewBool(name, description string, value bool) Setting
- func NewDuration(name, description string, value time.Duration) Setting
- func NewFloat64(name, description string, value float64) Setting
- func NewInt(name, description string, value int) Setting
- func NewInt64(name, description string, value int64) Setting
- func NewString(name, description string, value string) Setting
- type Stream
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Any ¶
type Any struct {
// contains filtered or unexported fields
}
Type Any can be used to represent or implement a Setting of any type.
func (*Any) Description ¶
type DurationFlag ¶
DurationFlag implements flag.Value in order to provide validation of duration values in the flag package.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
A Publisher provides a mechanism for communicating Settings from a set of producers to multiple consumers. Each such producer and associated consumers are called a Stream. Operations are provided for creating streams (CreateStream) and adding new consumers (ForkStream). Communication is implemented using channels, with the producer and consumers providing channels to send and receive Settings over. A Stream remembers the last value of all Settings that were sent over it; these can be retrieved via ForkStream or the Latest method.
The Publisher may be shut down by calling its Shutdown method and the producers will be notified via the channel returned by CreateStream, at which point they should close the channel they use for publishing Settings. If producers fail to close this channel then the Publisher will leak goroutines (one per stream) when it is shutdown.
Example ¶
package main import ( "fmt" "sync" "time" "v.io/x/ref/lib/pubsub" ) func main() { in := make(chan pubsub.Setting) pub := pubsub.NewPublisher() pub.CreateStream("net", "network settings", in) // Publish an initial Setting to the Stream. // Note that this, and the call to ForkStream below are in a race since // although the send below will not complete until the value has been read // from it by the internal goroutine in the publisher, that goroutine may // get descheduled before it can update the internal state in the publisher // that ForkStream uses to capture the latest value. That is, the latest // value returned by ForkStream below may be nil, hence the while loop below. in <- pubsub.NewString("ip", "address", "1.2.3.4") // A simple producer of IP address settings. producer := func() { in <- pubsub.NewString("ip", "address", "1.2.3.5") } var waiter sync.WaitGroup waiter.Add(2) // A simple consumer of IP address Settings. consumer := func(ch chan pubsub.Setting) { fmt.Printf("consumer: %v\n", <-ch) waiter.Done() } // Fork the stream twice, and read the latest value. ch1 := make(chan pubsub.Setting) var st *pubsub.Stream for st == nil { // See comment above. time.Sleep(time.Millisecond) st, _ = pub.ForkStream("net", ch1) } fmt.Printf("1: %v\n", st.Latest["ip"]) st = nil ch2 := make(chan pubsub.Setting) for st == nil { // See comment above. time.Sleep(time.Millisecond) st, _ = pub.ForkStream("net", ch2) } fmt.Printf("2: %v\n", st.Latest["ip"]) // Now we can read new Settings as they are generated. go producer() go consumer(ch1) go consumer(ch2) waiter.Wait() }
Output: 1: ip: address: (string: 1.2.3.4) 2: ip: address: (string: 1.2.3.4) consumer: ip: address: (string: 1.2.3.5) consumer: ip: address: (string: 1.2.3.5)
func (*Publisher) CloseFork ¶
CloseFork removes the specified channel from the named stream. The caller must drain the channel before closing it. TODO(cnicolaou): add tests for this.
func (*Publisher) CreateStream ¶
func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error)
CreateStream creates a Stream with the provided name and description (note, Settings have their own names and description, these are for the stream). In general, no buffering is required for this channel since the Publisher itself will read from it, however, if the consumers are slow then the publisher may be slow in draining the channel. The publisher should provide additional buffering if this is a concern. Consequently this mechanism should be used for rarely changing Settings, such as network address changes forced by DHCP and hence no buffering will be required. The channel returned by CreateStream is closed when the publisher is shut down and hence the caller should wait for this to occur and then close the channel it has passed to CreateStream.
func (*Publisher) ForkStream ¶
ForkStream 'forks' the named stream to add a new consumer. The channel provided is to be used to read Settings sent down the stream. This channel will be closed by the Publisher when it is asked to shut down. The reader on this channel must be able to keep up with the flow of Settings through the Stream in order to avoid blocking all other readers and hence should set an appropriate amount of buffering for the channel it passes in. ForkStream returns the most recent values of all Settings previously sent over the stream, thus allowing its caller to synchronise with the stream.
func (*Publisher) Latest ¶
Latest returns information on the requested stream, including the last instance of all Settings, if any, that flowed over it.
func (*Publisher) Shutdown ¶
func (p *Publisher) Shutdown()
Shutdown initiates the process of stopping the operation of the Publisher. All of the channels passed to CreateStream must be closed by their owner to ensure that all goroutines are garbage collected.
Example ¶
package main import ( "fmt" "sync" "v.io/x/ref/lib/pubsub" ) func main() { in := make(chan pubsub.Setting) pub := pubsub.NewPublisher() stop, _ := pub.CreateStream("net", "network settings", in) var producerReady sync.WaitGroup producerReady.Add(1) var consumersReady sync.WaitGroup consumersReady.Add(2) // A producer to write 100 Settings before signalling that it's // ready to be shutdown. This is purely to demonstrate how to use // Shutdown. producer := func() { for i := 0; ; i++ { select { case <-stop: close(in) return default: in <- pubsub.NewString("ip", "address", "1.2.3.4") if i == 100 { producerReady.Done() } } } } var waiter sync.WaitGroup waiter.Add(2) consumer := func() { ch := make(chan pubsub.Setting, 10) pub.ForkStream("net", ch) //nolint:errcheck consumersReady.Done() i := 0 for { if _, ok := <-ch; !ok { // The channel has been closed when the publisher // is asked to shut down. break } i++ } if i >= 100 { // We've received at least 100 Settings as per the producer above. fmt.Println("done") } waiter.Done() } go consumer() go consumer() consumersReady.Wait() go producer() producerReady.Wait() pub.Shutdown() waiter.Wait() }
Output: done done
type Setting ¶
type Setting interface { String() string // Name returns the name of the Setting Name() string // Description returns the description of the Setting Description() string // Value returns the value of the Setting. Value() interface{} }
Setting must be implemented by all data types to sent over Publisher streams.