XRedisPubSub

package
v1.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2021 License: MIT Imports: 7 Imported by: 0

README

XRedisPubSub Starter

基于 github.com/gomodule/redigo/redis 包

redis Documentation

Documentation https://godoc.org/github.com/gomodule/redigo/redis#hdr-Publish_and_Subscribe

Example

Publish and Subscribe

Use the Send, Flush and Receive methods to implement Pub/Sub subscribers.

c.Send("SUBSCRIBE", "example")
c.Flush()
for {
    reply, err := c.Receive()
    if err != nil {
        return err
    }
    // process pushed message
}

The PubSubConn type wraps a Conn with convenience methods for implementing subscribers. The Subscribe, PSubscribe, Unsubscribe and PUnsubscribe methods send and flush a subscription management command. The receive method converts a pushed message to convenient types for use in a type switch.


psc := redis.PubSubConn{Conn: c}
psc.Subscribe("example")
for {
    switch v := psc.Receive().(type) {
    case redis.Message:
        fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
    case redis.Subscription:
        fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
    case error:
        return v
    }
}
XRedisPubSub Starter Usage
goinfras.RegisterStarter(X.NewStarter())

XRedisPubSub Config Setting
Switch      bool   // 开关
DbHost      string // 主机地址
DbPort      int    // 主机端口
DbAuth      bool   // 权限认证开关
DbPasswd    string // 权限密码
MaxActive   int64  // 最大活动连接数,0为无限
MaxIdle     int64  // 最大闲置连接数,0为无限
IdleTimeout int64  // 闲置超时时间,0位无限
XRedisPubSub Usage
// 发布...
err := XRedisPubSub.XRedisPublisher().Pulbish("channel", "msg")
// 处理err...
// 订阅...
recSubMsgFuncs := make(map[string]RecSubMsgFunc)
// ChannelName1 订阅频道消息的处理函数
recSubMsgFuncs[ChannelName1] = func(channel string, msg interface{}) error {
    logger.Info("Receive Message:", zap.String("channel", channel), zap.Any("message", msg))
    fmt.Println(msg)
    return nil
}
// ChannelName2 订阅频道消息的处理函数
recSubMsgFuncs[ChannelName2] = func(channel string, msg interface{}) error {
    logger.Info("Receive Message:", zap.String("channel", channel), zap.Any("message", msg))
    fmt.Println(msg)
    return nil
}

// 取消订阅通道信号,传入需要取消订阅的频道名称
unSubCh := make(chan string, 1)

go func() {
    // 10s后发送取消订阅信号
    time.Sleep(10 * time.Second)
    unSubCh <- ChannelName1
    unSubCh <- ChannelName2
}()

// 开始订阅,go程阻塞
err = XRedisSubscriber(logger).Subscribe(recSubMsgFuncs, unSubCh)
// 处理err...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateDefaultPool

func CreateDefaultPool(config *Config)

func NewRedisPubsubPool

func NewRedisPubsubPool(cfg *Config) *redigo.Pool

func NewStarter

func NewStarter() *starter

func XF

func XF(f func(c redigo.Conn) error) error

资源组件闭包执行

func XPool

func XPool() *redigo.Pool

资源组件实例调用

func XRedisPublisher

func XRedisPublisher() *redisPublisher

通用Publisher实例

Types

type Config

type Config struct {
	DbHost      string // 主机地址
	DbPort      int    // 主机端口
	DbAuth      bool   // 权限认证开关
	DbPasswd    string // 权限密码
	MaxActive   int64  // 最大活动连接数,0为无限
	MaxIdle     int64  // 最大闲置连接数,0为无限
	IdleTimeout int64  // 闲置超时时间,0位无限
}

func DefaultConfig

func DefaultConfig() *Config

type RecSubMsgFunc

type RecSubMsgFunc func(channel string, msg interface{}) error

订阅模式下的消息处理函数类型

type RedisSubscriber

type RedisSubscriber struct {
	// contains filtered or unexported fields
}

func XRedisSubscriber

func XRedisSubscriber() *RedisSubscriber

通用Subscriber实例

func (*RedisSubscriber) Subscribe

func (c *RedisSubscriber) Subscribe(recMsgFuncs map[string]RecSubMsgFunc, unSubChannel <-chan string) error

订阅并接收消息,该方法阻塞

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL