gopolling

package module
v0.0.0-...-8928e6c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2020 License: MIT Imports: 9 Imported by: 0

README

GoPolling

Github Action codecov Go Report Card

Simple tool for handling long polling request on server side.

Install
go get github.com/stanlry/gopolling
Example
package main

import (
    "encoding/json"
    "github.com/stanlry/gopolling"
    "log"
    "net/http"
)

var channel = "test"

var mgr = gopolling.New(gopolling.DefaultOption)

func main() {
    http.HandleFunc("/message", func(w http.ResponseWriter, r *http.Request) {
        resp, _ := mgr.WaitForNotice(r.Context(), channel, nil)
        st, _ := json.Marshal(resp)
        w.Write(st)
    })
    
    http.HandleFunc("/notify", func(w http.ResponseWriter, r *http.Request) {
        data := r.URL.Query().Get("data")
        mgr.Notify(channel, data, nil)
    })
        
    log.Println("start serve on :80")
    log.Fatal(http.ListenAndServe(":80", nil))
}

wait for message

curl -s localhsot/message

notify clients

curl -s localhost/notify?data=[your message here]
Usage
Create Polling Manager
var mgr = gopolling.New(gopolling.Option{ 
    // message retention time, default is 60s
    Retention: 60,

    // set the timeout for each request, default 120s   
    Timeout: 1 * time.Minute,  

    // message bus, default use goroutine
    Bus: adapter.NewRedisAdapter(pool),

    // message buffer, default use memory
    Buffer: adapter.NewRedisAdapter(pool),

    // logger interface, currently support zap and logrus, default will not log any error
    Logger: zap.NewExample().Sugar(), 
})
Wait For Notice

wait for notice from listener or notifier

// this function will block until receive a notice or timeout
resp, err := mgr.WaitForNotice(
    // request context
    r.Context(), 
    // channel
    channel, 
    // send the data to listener, it will be discarded if no listener exist
    "data",
})

only wait for notice with matched selector

resp, err := mgr.WaitForSelectedNotice(
    r.Context(),
    channel,
    "data",
    // specify identity in the channel, this selector is essential a string map
    gopolling.S{
        "id": "xxx",
    }
)
Direct Notify

Notify everyone that have been waiting in the channel

mgr.Notify(
    // channel
    channel,
    // data being sent
    "data",
    // selector that specify the receiving side, if no one match the selector, message will be discarded
    gopolling.S{
        "id": "xxx",
    },
)
Event Listener

Listen to event when request was made and reply immediately. The reply message will only notify the one who make the request

// subscribe listener
mgr.SubscribeListener(channel, func(ev gopolling.Event, cb *gopolling.Callback){
    // event data
    st := ev.Data.(string)

    // reply to immediately, you can skip this part if no reply is needed
    cb.Reply("reply data")
}) 
Adapters
Redis

Redis is supported for both message bus and message buffer

pool := &redis.Pool{
	MaxIdle:     1,
	MaxActive:   100,
	IdleTimeout: 5 * time.Minute,
	Dial: func() (redis.Conn, error) {
		option := redis.DialPassword("password")
		con, err := redis.Dial("tcp", "localhost:6379", option)
		if err != nil {
			return nil, err
		}
		return con, err
	},
}

adapter := adapter.NewRedisAdapter(pool)
mgr := gopolling.New(gopolling.Option{
    bus:    adapter,
    buffer: adapter,
})
GCP Pub/Sub

GCP Pub/Sub is supported for message bus

client := pubsub.NewClient(context.Background(), "project-id")
adapter := adapter.NewGCPPubSubAdapter(client)
mgr := gopolling.New(gopolling.Option{
    bus: adapter,
})

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout   = errors.New("request timeout")
	ErrCancelled = errors.New("request cancelled")
)
View Source
var DefaultOption = Option{
	Retention:    600,
	Timeout:      120 * time.Second,
	Bus:          newGoroutineBus(),
	Buffer:       newMemoryBuffer(),
	PubSubPrefix: pubsubPrefix,
	QueuePrefix:  queuePrefix,
	Logger:       &NoOpLog{},
}
View Source
var (
	ErrNotSubscriber = errors.New("no subscriber")
)

Functions

This section is empty.

Types

type Callback

type Callback struct {
	// contains filtered or unexported fields
}

func NewCallback

func NewCallback() Callback

func (*Callback) Reply

func (r *Callback) Reply(data interface{})

type DefaultSubscription

type DefaultSubscription struct {
	Channel string
	ID      string
	// contains filtered or unexported fields
}

func NewDefaultSubscription

func NewDefaultSubscription(channel, id string, ch chan Message) *DefaultSubscription

func (*DefaultSubscription) Receive

func (g *DefaultSubscription) Receive() <-chan Message

type Event

type Event struct {
	Channel  string
	Data     interface{}
	Selector S
}

Event struct is the payload being sent from client to the listening subscriber

type EventQueue

type EventQueue interface {
	Enqueue(string, Event)
	// A blocking method wait til receive task
	Dequeue(string) <-chan Event
}

type GoPolling

type GoPolling struct {
	// contains filtered or unexported fields
}

func New

func New(opt Option) *GoPolling

func (*GoPolling) Notify

func (g *GoPolling) Notify(channel string, data interface{}, selector S) error

func (*GoPolling) SubscribeListener

func (g *GoPolling) SubscribeListener(roomID string, lf ListenerFunc)

func (*GoPolling) WaitForNotice

func (g *GoPolling) WaitForNotice(ctx context.Context, channel string, data interface{}) (interface{}, error)

func (*GoPolling) WaitForSelectedNotice

func (g *GoPolling) WaitForSelectedNotice(ctx context.Context, channel string, data interface{}, selector S) (interface{}, error)

type GoroutineBus

type GoroutineBus struct {
	// contains filtered or unexported fields
}

func (*GoroutineBus) Dequeue

func (g *GoroutineBus) Dequeue(channel string) <-chan Event

func (*GoroutineBus) Enqueue

func (g *GoroutineBus) Enqueue(channel string, t Event)

func (*GoroutineBus) Publish

func (g *GoroutineBus) Publish(channel string, msg Message) error

func (*GoroutineBus) SetLog

func (g *GoroutineBus) SetLog(l Log)

func (*GoroutineBus) Subscribe

func (g *GoroutineBus) Subscribe(channel string) (Subscription, error)

func (*GoroutineBus) Unsubscribe

func (g *GoroutineBus) Unsubscribe(sub Subscription) error

type ListenerFunc

type ListenerFunc func(Event, *Callback)

type ListenerManager

type ListenerManager struct {
	// contains filtered or unexported fields
}

func NewListenerManager

func NewListenerManager(adapter MessageBus, queuePrefix, pubsubPrefix string) ListenerManager

func (*ListenerManager) Subscribe

func (m *ListenerManager) Subscribe(channel string, lf ListenerFunc)

type Log

type Log interface {
	Errorf(string, ...interface{})
}

Logger interface support error logging

type Loggable

type Loggable interface {
	SetLog(Log)
}

type Message

type Message struct {
	Channel  string
	Data     interface{}
	Selector S
}

Message struct defines the payload which being sent through message bus from notifier to the waiting client

type MessageBuffer

type MessageBuffer interface {
	Find(string) (Message, bool)
	Save(string, Message, int)
}

MessageBuffer define the interface use to save and fetch message

type MessageBus

type MessageBus interface {
	PubSub
	EventQueue
	Loggable
}

type NoOpLog

type NoOpLog struct{}

No Operation Logger

func (*NoOpLog) Errorf

func (n *NoOpLog) Errorf(st string, args ...interface{})

Log error but no action will be taken

type Option

type Option struct {
	// Retention period indicate the TTL time in second for the message, default 60s
	Retention int

	// Long polling client retention (default 120s)
	Timeout time.Duration

	// Message Bus
	Bus MessageBus

	// Message Buffer
	Buffer MessageBuffer

	// Logging implementation
	Logger Log

	PubSubPrefix string

	QueuePrefix string
}

type PollingManager

type PollingManager struct {
	// contains filtered or unexported fields
}

func NewPollingManager

func NewPollingManager(adapter MessageBus, t time.Duration, queuePrefix, pubsubPrefix string) PollingManager

func (*PollingManager) WaitForNotice

func (m *PollingManager) WaitForNotice(ctx context.Context, channel string, data interface{}, sel S) (interface{}, error)

type PubSub

type PubSub interface {
	Publish(string, Message) error
	Subscribe(string) (Subscription, error)
	Unsubscribe(Subscription) error
}

type S

type S map[string]string

A shortcut to define string map

type Subscription

type Subscription interface {
	Receive() <-chan Message
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL