gonsq

package module
v0.0.0-...-504d03c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2020 License: MIT Imports: 8 Imported by: 0

README

一个简单的基于官方 nsq 的 go client 的封装

使用

一. 安装

go get github.com/sakishen/go-nsq-wrapper

二. 配置

# app.ini
# 生产者配置
[producer]
nsqd=127.0.0.1:4151

# 消费者配置
[consumer]
# 直接连接上 nsqd,支持多个,用半角逗号隔开
nsqd=127.0.0.1:4150
# 连接上 nsqlookupd, 支持多个,用半角逗号隔开
nsqlookupd=127.0.0.1:4161,127.0.0.2:4161
# 最大并行数量
maxInFlight=100
# 并行数量,该值不能超过 maxInFlight
concurrent=100

三. quick start

package main

import(
    "go.zhuzi.me/config"	
    "github.com/scofieldpeng/gonsq"
    "github.com/nsqio/go-nsq"
)

func main(){
    producerConfig := config.Data("nsq").Section("producer")
    consumerConfig := config.Data("nsq").Section("consumer")
    debug := false
    run := make(chan bool)

    // 初始化 producer
    if err := gonsq.Producer.Init(producerConfig,debug);err != nil {
	    panic(err)
    }
    // 初始化 consumer
    if err := gonsq.Consumer.Init(consumerConfig,debug);err != nil {
    	panic(err)
    }
    // 添加消费者处理函数
    gonsq.Consumer.AddHandler("testTopic",defaultHandler())
    gonsq.Consumer.AddHandler("testTopic2",defaultHandler())
    
    // run 起来
    if err := gonsq.RunAll();err != nil {
        panic(err)	
    }
    // 不要忘记退出时关闭
    defer gonsq.StopAll()
    
    <- run
}

// defaultHandler 编写消费者处理函数
func defaultHandler() (handler nsq.HandlerFunc) {
    return func (nm *nsq.Message) error {
        // 具体 consumer 逻辑	
    	return nil
    }
}

四. 单独使用producer或者consumer

单独使用producer

// 单独使用producer
// 初始化
if err := gonsq.Producer.Init(config,debug);err != nil {
	panic(err)
}
// run起来
if err := gonsq.Producer.Run();err != nil {
	panic(err)
}
// 别忘了defer
defer gonsq.Producer.Stop()

// 调用
// msg支持字符型,数字型,或者直接struct
if err := gonsq.Producer.Publish(topic,msg);err != nil {
	log.Error(err)
}
// 批量发布
if err := gonsq.Producer.MultiPublish(topic,msgArr);err != nil {
	log.Error(err)
}

单独使用consumer

// 初始化
gonsq.Consumer.Init(config,debug)
if err := gonsq.Consumer.Run();err != nil {
	panic(err)
}
// 添加handler
gonsq.Consumer.AddHandler(topic,handler)
gonsq.Consumer.AddHandler(topic2,handler2)

// run起来
if err := gonsq.Consumer.Run();err != nil {
	panic(err)
}
// 别忘了defer
defer gonsq.Consumer.Stop()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Consumer = newConsumer()
)
View Source
var (
	Producer = producer{
		// contains filtered or unexported fields
	}
)

Functions

func InitAll

func InitAll(producerConfig, consumerConfig ini.Section, debug bool) (err error)

InitAll 初始化producer 和 consumer

func RunAll

func RunAll() (err error)

RunAll 运行 nsq 服务

func StopAll

func StopAll()

StopAll 停止nsq 服务

Types

type FailMessage

type FailMessage struct {
	Body      []byte
	Attempt   uint16
	Timestamp int64
	MessageID string
	FailMsg   string
}

type FailMessageFunc

type FailMessageFunc func(message FailMessage) (err error)

失败消息处理函数类型

func (FailMessageFunc) HandleFailMessage

func (f FailMessageFunc) HandleFailMessage(message FailMessage) (err error)

type FailMessageHandler

type FailMessageHandler interface {
	HandleFailMessage(message FailMessage) (err error)
}

失败消息处理接口,继承了该接口的接口都会调用该接口

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL