caster

package module
v0.0.0-...-3736c44 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2019 License: MIT Imports: 1 Imported by: 2

README

caster

GoDoc

caster is a dead simple and performant message broadcaster for Go with context support. It uses the publisher and subscriber pattern (pubsub) to broadcast messages from a single or multiple source channels to multiple subscriber channels. Subscribers can dynamically join and leave.

Usage

Broadcast a Go channel

Suppose the Go channel is:

var srcCh <-chan interface{}

We can broadcast the messages coming out of it to multiple subscribers:

c := caster.New(nil)

go func() {
    // subscriber #1
    ch, _ := c.Sub(nil, 1)

    for m := range ch {
        // do anything to the broadcasted message
    }
}()

go func() {
    // subscriber #2
    ch, _ := c.Sub(nil, 1)

    for m := range ch {
        // do anything to the broadcasted message
    }
}()

go func() {
    // publisher
    for m := range srcCh {
        c.Pub(m)
    }

    c.Close()
}()

Subscribers can join and leave at any time:

// join
ch1, _ := c.Sub(nil, 1)

// leave
c.Unsub(ch1)

// join with context and automatically leave when the context is canceled
ch2, _ := c.Sub(ctx, 1)

// join with 10 subscriber channel buffer
ch3, _ := c.Sub(ctx, 10)

caster can associate with a context as well:

// `c` will be closed when the `ctx` is canceled
c := caster.New(ctx)

A boolean value is returned to indicate whether the caster is still running or not:

_, ok := c.Sub(nil, 1)
if !ok {
    // the caster has been closed, do something else
}

License

MIT

Documentation

Overview

Package caster implements a dead simple and performant message broadcaster (pubsub) library

Example
/*
 * @Author: guiguan
 * @Date:   2019-09-19T23:52:54+10:00
 * @Last modified by:   guiguan
 * @Last modified time: 2019-09-20T10:26:20+10:00
 */

package main

import (
	"fmt"
	"sync"

	"github.com/guiguan/caster"
)

func main() {
	wg := new(sync.WaitGroup)

	c := caster.New(nil)

	// register subscribers
	for i := 0; i < 5; i++ {
		ch, _ := c.Sub(nil, 1)
		go receiveFromChannel(ch, wg)
	}

	// broadcast
	for i := 0; i < 100; i++ {
		c.Pub(i)
	}

	// close caster and all subscriber channels
	c.Close()

	wg.Wait()

}

func receiveFromChannel(ch chan interface{}, wg *sync.WaitGroup) {
	wg.Add(1)

	counter := 0

	for range ch {
		counter++
	}

	fmt.Printf("received %v messages\n", counter)

	wg.Done()
}
Output:

received 100 messages
received 100 messages
received 100 messages
received 100 messages
received 100 messages

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Caster

type Caster struct {
	// contains filtered or unexported fields
}

Caster represents a message broadcaster

func New

func New(ctx context.Context) *Caster

New creates a new caster

func (*Caster) Close

func (c *Caster) Close() (ok bool)

Close closes current caster and all subscriber channels. Ok value indicates whether the operation is performed or not. When current caster is closed, it stops receiving further operations and the operation won't be performed.

func (*Caster) Done

func (c *Caster) Done() <-chan struct{}

Done returns a done channel that is closed when current caster is closed

func (*Caster) Pub

func (c *Caster) Pub(msg interface{}) (ok bool)

Pub publishes the given message to current caster, so the caster in turn broadcasts the message to all subscriber channels. Ok value indicates whether the operation is performed or not. When current caster is closed, it stops receiving further operations and the operation won't be performed.

func (*Caster) Sub

func (c *Caster) Sub(ctx context.Context, capacity uint) (sCh chan interface{}, ok bool)

Sub subscribes to current caster and returns a new channel with the given buffer for the subscriber to receive the broadcasting message. When the given ctx is canceled, current caster will unsubscribe the subscriber channel and close it. Ok value indicates whether the operation is performed or not. When current caster is closed, it stops receiving further operations and the operation won't be performed. A closed receiver channel will be returned if ok is false.

func (*Caster) TryPub

func (c *Caster) TryPub(msg interface{}) (ok bool)

TryPub publishes the given message to current caster, so the caster in turn broadcasts the message to all subscriber channels without blocking on waiting for channels to be ready for receiving. Ok value indicates whether the operation is performed or not. When current caster is closed, it stops receiving further operations and the operation won't be performed.

func (*Caster) Unsub

func (c *Caster) Unsub(subCh chan interface{}) (ok bool)

Unsub unsubscribes the given subscriber channel from current caster and closes it. Ok value indicates whether the operation is performed or not. When current caster is closed, it stops receiving further operations and the operation won't be performed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL