que

package
v0.0.0-...-0cb7091 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2021 License: BSD-3-Clause-Clear Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEF_UNIT_MSIZE = 10
)

Variables

View Source
var (
	ErrOK      = error(nil)
	ErrRequeue = errors.New("ErrRequeue")
)

Functions

func InstallExt

func InstallExt(ctrlName string, mc IMCGeter, conf *ConnConf)

安装扩展ETCD控制器

func PreClose

func PreClose(name string) error

PreClose 预关闭消费者

func Publish

func Publish(nsqdAddr, topic string, value interface{}) error

Publish 生产消息

Types

type CParam

type CParam struct {
	Topic, Channel   string
	GoNum, MaxFlight int
}

消费者创建参数

type ConnConf

type ConnConf struct {
	NsqdAddr    string `json:"nsqd_addr"`
	LookupHAddr string `json:"lookup_http_addr"` // 优先使用这个
}

type Consum

type Consum struct {
	cto.ControlBase
	// contains filtered or unexported fields
}

 => nsq consum control

func Install

func Install(ctrlName string, conf *ConnConf) *Consum

安装默认Consum控制器

func (*Consum) Count

func (this *Consum) Count() int

Consum.Count 消费者数量

func (*Consum) HandleInit

func (this *Consum) HandleInit()

func (*Consum) HandleTerm

func (this *Consum) HandleTerm()

func (*Consum) HasUnit

func (this *Consum) HasUnit(name string) int

Consum.HasUnit 查找消费者

func (*Consum) Mount

func (this *Consum) Mount(source TUnits)

Consum.Mount 挂在初始化源,在控制器启动的时候一起创建

func (*Consum) Overview

func (this *Consum) Overview() string

Consum.Overview 所有存储概览

func (*Consum) Ping

func (this *Consum) Ping() error

func (*Consum) Start

func (this *Consum) Start(name string, handle IConsumer) error

Consum.Start 创建并添加Consumer

type HandlerMsgFunc

type HandlerMsgFunc func(msg *Msg) error

HandleMessage implements the nsq.Handler interface

func (HandlerMsgFunc) HandleMessage

func (h HandlerMsgFunc) HandleMessage(m *nsq.Message) error

type IConsumer

type IConsumer interface {

	// 消费者名称
	HandleParam() CParam

	// 处理消费者消息
	HandleMessage(msg *Msg) error
}

 => 消费者数据接口

func PreStart

func PreStart(name string, handle IConsumer) IConsumer

PreStart 预创建消费者

type IMCGeter

type IMCGeter interface {
	cto.ICtrlHandler

	// 获取基类对象
	HandleConsum() *Consum
}

 => 消费管理器接口

type Logger

type Logger struct{}

---------- Logger

func (Logger) Output

func (l Logger) Output(calldepth int, s string) error

Print format & print log

type Msg

type Msg struct {
	Body      []byte
	NsqdAddr  string
	ProduceAt int64
}

=> Nsq Message

type TUnits

type TUnits []Unit // 支持排序

func (TUnits) Len

func (l TUnits) Len() int

func (TUnits) Less

func (l TUnits) Less(i, j int) bool

func (TUnits) Swap

func (l TUnits) Swap(i, j int)

type To

type To struct {
	NsqdAddr string // 目标
	Topic    string
}

****************************************************************************** Copyright:cloud Author:cloudapex@126.com Version:1.0 Date:2020-05-13 Description: 生产者助手 ******************************************************************************

func (To) Publish

func (p To) Publish(value interface{}) error

Send.Publish 发送消息 value如果是struct则进行json序列化

type Unit

type Unit struct {
	// contains filtered or unexported fields
}

func NewUnit

func NewUnit(name string, cser IConsumer) Unit

消费者结构单元

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL