README

Build status Coverage GoReportCard API documentation

Parked: PubSub with long polling in Go

The package longpoll provides an implementation of the long-polling mechanism of the PubSub pattern. Although the primary purpose of the library is to aid the development of web applications, the library provides no specific web handlers and can be used in other distributed applications.

Long polling is a technique to notify client applications about updates on the server. It is often used in writing web application as a substitute for the push technique, however can be used in other distributed applications.

Clients initiate subscriptions to the server specifying topics they are interested in. Given a subscription Id a client makes a request for new data. The request is held open until data becomes available on the server (published to a matching topic). As soon as this happens the request is answered immediately. If no data arrives over a predefined time window (the long-polling interval) the request returns empty. A new connection is then established between the client and the server to receive further updates.

The following points are often listed as the benefits of long-polling over the push mechanism in web applications:

  • does not require a persistent connection to the server
  • works for slow clients as they receive information at the speed they can process, although maybe in large chunks which are accumulated at the server between requests
  • friendly to proxies blocking streaming

Implementation details and examples

Subscriptions will timeout and get closed if no client request is received over a given timeout interval. Every request resets the timeout counter. The timeout interval is a property of the subscription itself and different subscriptions may have different timeout intervals.

The long-polling interval, within which the request is held, is specified per request. Web application wrappers might provide defaults.

The library supports concurrent long-polling requests on the same subscription Id, but no data will be duplicated across request responses. No specific distribution of data across responses is guaranteed: new requests signal the existing one to return immediately.

At the moment the library does not support persisting of published data before it is collected by subscribers. All the published data is stored in memory of the backend.

Long-polling with subscription management:

Handling of long-polling subscriptions, publishing and receiving data is done by the longpoll.LongPoll type:

ps := longpoll.New()
id1, _ := ps.Subscribe(time.Minute, "TopicA", "TopicB")
id2, _ := ps.Subscribe(time.Minute, "TopicB", "TopicC")

go func() {
  for {
    if datach, err := ps.Get(id1, 30*time.Second); err == nil {
      fmt.Printf("%v received %v", id1, <-datach)
    } else {
      break
    }
  }
}()

go func() {
  for {
    if datach, err := ps.Get(id2, 20*time.Second); err == nil {
      fmt.Printf("%v received %v", id2, <-datach)
    } else {
      break
    }
  }
}()

for {
  // data published on TopicB will be received by id1 and id2, TopicC by id2 only
  ps.Publish({"random": rand.Float64()}, "TopicB", "TopicC")

  // sleep for up to 50s
  time.Sleep(time.Duration(rand.Intn(5e10)))
}

Long-polling on a single subscription channel:

Handling of the single-channel long-polling pubsub is done by the longpoll.Sub type:

ch := longpoll.MustNewChannel(time.Minute, func (id string) {
  // action on exit
}, "TopicA", "TopicB")

go func() {
  for {
    if datach, err := ch.Get(20*time.Second); err == nil {
      fmt.Printf("received %v", <-datach)
    } else {
      break
    }
  }
}()

for {
  ch.Publish({"random": rand.Float64()}, "TopicB")
  // above subscription will not receive this data
  ch.Publish({"random": rand.Float64()}, "TopicC")

  // sleep for up to 50s
  time.Sleep(time.Duration(rand.Intn(5e10)))
}
Copyright (c) 2015-2017. Oleg Sklyar and teris.io. MIT license applies. All rights reserved.

Documentation

Overview

    Package longpoll provides an implementation of the long polling mechanism of the PubSub pattern. Although the primary purpose of the package is to aid the development of web applications, it provides no specific web handlers and can be used in other distributed applications.

    The package provides the Channel type to manage publishing and retrieval of information for each individual subscription, and the LongPoll type to manage subscription channels allowing for adding, removing and publishing to all.

    Index

    Constants

    View Source
    const Version = 1.2

      Version of the library.

      Variables

      This section is empty.

      Functions

      This section is empty.

      Types

      type Channel

      type Channel struct {
      	// contains filtered or unexported fields
      }

        Channel represents a single channel for publishing and receiving data over a long-polling subscription. Data published to any of the topics subscribed to will be received by the client asking for new data. The receiving is not split by topic.

        The subscription is setup to timeout if no Get request is made before the end of the timeout period provided at construction. Every Get request extends the lifetime of the subscription for the duration of the timeout.

        func MustNewChannel

        func MustNewChannel(timeout time.Duration, onClose func(id string), topics ...string) *Channel

          MustNewChannel acts just like NewChannel, however, it does not return errors and panics instead.

          func NewChannel

          func NewChannel(timeout time.Duration, onClose func(id string), topics ...string) (*Channel, error)

            NewChannel constructs a new long-polling pubsub channel with the given timeout, optional exit handler, and subscribing to given topics. Every new channel gets a unique channel/subscription Id assigned based on UUID.v4.

            Constructing a channel with NewChannel starts a timeout timer. The first Get request must follow within the timeout window.

            func (*Channel) Drop

            func (ch *Channel) Drop()

              Drop terminates any publishing and receiving on the channel, signals the currently waiting Get request to return empty, terminates the timeout timer and runs the exit handler if supplied.

              func (*Channel) Get

              func (ch *Channel) Get(polltime time.Duration) (chan []interface{}, error)

                Get requests data published on all of the channel topics. The function returns a channel to receive the data set on.

                The request is held until data becomes available (published to a matching topic). Upon new data, or if data has been waiting at the time of the call, the request returns immediately. Otherwise it waits over the `polltime` duration and return empty if no new data arrives. It is expected that a new Get request is made immediately afterwards to receive further data and prevent channel timeout.

                Multiple Get requests to the channel can be made concurrently, however, every data sample will be delivered to only one request issuer. It is not guaranteed to which one, although every new incoming request will trigger a return of any earlier one.

                func (*Channel) ID

                func (ch *Channel) ID() string

                  ID returns the channel/subscription Id assigned at construction.

                  func (*Channel) IsAlive

                  func (ch *Channel) IsAlive() bool

                    IsAlive tests if the channel is up and running.

                    func (*Channel) IsGetWaiting

                    func (ch *Channel) IsGetWaiting() bool

                      IsGetWaiting reports if there is a Get request waiting for data.

                      func (*Channel) Publish

                      func (ch *Channel) Publish(data interface{}, topic string) error

                        Publish publishes data on the channel in a non-blocking manner if the topic corresponds to one of those provided at construction. Data published to other topics will be silently ignored. No topic information is persisted and retrieved with the data.

                        func (*Channel) QueueSize

                        func (ch *Channel) QueueSize() int

                          QueueSize returns the size of the currently waiting data queue (only not empty when no Get request waiting).

                          func (*Channel) Topics

                          func (ch *Channel) Topics() []string

                            Topics returns the list of topics the channel is subscribed to.

                            type LongPoll

                            type LongPoll struct {
                            	// contains filtered or unexported fields
                            }

                              The LongPoll type represents a subscription manager. It provides functionality to manage multiple long-polling subscriptions allowing for adding and removing subscriptions, publishing to all subscriptions, receiving data by subscription Id.

                              func New

                              func New() *LongPoll

                                New creates a new long-polling subscription manager.

                                func (*LongPoll) Channel

                                func (lp *LongPoll) Channel(id string) (*Channel, bool)

                                  Channel returns a pointer to the subscription channel behind the given id.

                                  func (*LongPoll) Channels

                                  func (lp *LongPoll) Channels() []*Channel

                                    Channels returns the list of all currently up and running subscription channels. For performance reasons when dealing with a large number of subscription channels all operations across all of them use this method to retrieve the list first and unlock the thread ASAP. If a subscription channel is removed after the list was retrieved, the operation will still run on that channel. If a channel is added, the operation will not apply to it.

                                    func (*LongPoll) Drop

                                    func (lp *LongPoll) Drop(id string)

                                      Drop terminates a subscription channel for the given Id and removes it from the list of subscription channels.

                                      func (*LongPoll) Get

                                      func (lp *LongPoll) Get(id string, polltime time.Duration) (chan []interface{}, error)

                                        Get requests data published on all of the topics for the given subscription channel. See further info in (*Channel).Get.

                                        func (*LongPoll) Ids

                                        func (lp *LongPoll) Ids() []string

                                          Ids returns the list of Ids of all currently up and running subscription channels.

                                          func (*LongPoll) IsAlive

                                          func (lp *LongPoll) IsAlive() bool

                                            IsAlive tests if the pubsub service is up and running.

                                            func (*LongPoll) MustSubscribe

                                            func (lp *LongPoll) MustSubscribe(timeout time.Duration, topics ...string) string

                                              MustSubscribe acts in the same manner as Subscribe, however, it does not return errors and panics instead.

                                              func (*LongPoll) Publish

                                              func (lp *LongPoll) Publish(data interface{}, topics ...string) error

                                                Publish publishes data on all subscription channels with minimal blocking. Data is published separately for each topic. Closed subscription channels and mismatching topics are ignored silently.

                                                func (*LongPoll) Shutdown

                                                func (lp *LongPoll) Shutdown()

                                                  Shutdown terminates the pubsub service and drops all subscription channels.

                                                  func (*LongPoll) Subscribe

                                                  func (lp *LongPoll) Subscribe(timeout time.Duration, topics ...string) (string, error)

                                                    Subscribe creates a new subscription channel and returns its Id (and an error if the subscription channel could not be created). The subscription channel is automatically open to publishing.

                                                    func (*LongPoll) Topics

                                                    func (lp *LongPoll) Topics() []string

                                                      Topics constructs the set of all topics, for which there are currently open subscription channels.

                                                      type Timeout

                                                      type Timeout struct {
                                                      	// contains filtered or unexported fields
                                                      }

                                                        Timeout implements a callback mechanism on timeout (along with reporting on a buffered channel), which is extendable in time via pinging the object. An alive timeout can be dropped at any time, in which case the callback will not be executed, but the exit will still be reported on the channel.

                                                        This extendable Timeout is used for monitoring long polling subscriptions here, which would expire if no client asks for data within a defined timeout (or timeout extended otherwise).

                                                        func MustNewTimeout

                                                        func MustNewTimeout(timeout time.Duration, onTimeout func()) *Timeout

                                                          MustNewTimeout acts just like NewTimeout, however, it does not return errors and panics instead.

                                                          func NewTimeout

                                                          func NewTimeout(timeout time.Duration, onTimeout func()) (*Timeout, error)

                                                            NewTimeout creates and starts a new timeout timer accepting an optional exit handler.

                                                            func (*Timeout) Drop

                                                            func (tor *Timeout) Drop()

                                                              Drop drops the timeout handler and reports the exit on the reporting channel. The drop will take place at most after 1/100th of the timeout and the onTimeout handler will not get called.

                                                              func (*Timeout) IsAlive

                                                              func (tor *Timeout) IsAlive() bool

                                                                IsAlive verifies if the timeout handler is up and running.

                                                                func (*Timeout) Ping

                                                                func (tor *Timeout) Ping()

                                                                  Ping pings the timeout handler extending it for another timeout duration.

                                                                  func (*Timeout) ReportChan

                                                                  func (tor *Timeout) ReportChan() chan bool

                                                                    ReportChan retrieves the timeout reporting channel, which will get a true reported on exit (in case of timeout or drop).