pchelper

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

生产者消费者模式(Producer-consumer problem)帮助工具.

Index

Constants

This section is empty.

Variables

View Source
var DefaultListenOpt = ListenOptions{
	Parser:      DefaultParser,
	TopicStarts: map[string]string{},
}
View Source
var DefaultPublishOpt = PublishOptions{}
View Source
var Defaultopt = Options{
	SerializeProtocol: SerializeProtocol_JSON,
	UUIDType:          idgener.IDGEN_UUIDV4,
}
View Source
var ErrMapPayloadCanNotCast = errors.New("payload can not cast map as map[string]interface{}")

ErrMapPayloadCanNotCast map数据不能被转换

View Source
var ErrNotSupportChanAsPayload = errors.New("not support chan as payload")

ErrNotSupportChanAsPayload chan数据不能作为payload

View Source
var ErrNotSupportSliceAsPayload = errors.New("not support slice as payload")

ErrNotSupportSliceAsPayload 不支持slice作为负载

View Source
var ErrUnSupportSerializeProtocol = errors.New("unsupported serialize protocol")

ErrUnSupportSerializeProtocol 未支持的序列化协议

Functions

func SerializeWithJSON

func SerializeWithJSON() optparams.Option[Options]

SerializeWithJSON 使用JSON作为序列化反序列化的协议

func SerializeWithMsgpack

func SerializeWithMsgpack() optparams.Option[Options]

SerializeWithMsgpack 使用JSON作为序列化反序列化的协议

func ToBytes

func ToBytes(spt SerializeProtocolType, payload interface{}) ([]byte, error)

func ToXAddArgsValue

func ToXAddArgsValue(spt SerializeProtocolType, payload interface{}) (interface{}, error)

func WithEventParser

func WithEventParser(fn EventParser) optparams.Option[ListenOptions]

WithParallelHanddler 并行执行注册的handdler

func WithID

WithID stream专用

func WithLimit

func WithLimit(limit int64) optparams.Option[PublishOptions]

WithMinID stream专用

func WithMaxlen

func WithMaxlen(n int64) optparams.Option[PublishOptions]

func WithMinID

func WithMinID(minid string) optparams.Option[PublishOptions]

WithMinID stream专用

func WithNoMkStream

func WithNoMkStream() optparams.Option[PublishOptions]

WithNoMkStream stream专用

func WithParallelHanddler

func WithParallelHanddler() optparams.Option[ListenOptions]

WithParallelHanddler 并行执行注册的handdler

func WithStrictMode

func WithStrictMode() optparams.Option[PublishOptions]

func WithTopicStartAt

func WithTopicStartAt(topic string, t time.Time) optparams.Option[ListenOptions]

WithTopicStartAt stream消费者专用,用于设定指定topic消费起始时间

func WithTopicStartPosition

func WithTopicStartPosition(topic, flag string) optparams.Option[ListenOptions]

WithTopicStartPosition stream消费者专用,用于设定指定topic消费起始位置

func WithTopicsStartPositionMap

func WithTopicsStartPositionMap(setting map[string]string) optparams.Option[ListenOptions]

WithTopicsStartPositionMap stream消费者专用,用于设定指定复数topic消费起始位置

func WithUUIDSnowflake

func WithUUIDSnowflake() optparams.Option[Options]

WithUUIDSnowflake 使用snowflake作为uuid的生成器

func WithUUIDSonyflake

func WithUUIDSonyflake() optparams.Option[Options]

WithUUIDSonyflake 使用sonyflake作为uuid的生成器

func WithUUIDv4

func WithUUIDv4() optparams.Option[Options]

WithUUIDv4 使用uuid4作为uuid的生成器

Types

type ConsumerABC

type ConsumerABC struct {
	Handdlers     map[string][]EventHanddler
	Handdlerslock sync.RWMutex
	*ProducerConsumerABC
}

ConsumerABC 消费者的基类 定义了回调函数的注册操作和执行操作

func NewConsumerABC

func NewConsumerABC(opts ...optparams.Option[Options]) *ConsumerABC

func (*ConsumerABC) HanddlerEvent

func (c *ConsumerABC) HanddlerEvent(asyncHanddler bool, evt *Event)

HanddlerEvent 调用回调函数处理消息 @params asyncHanddler bool 是否异步执行回调函数 @params evt *Event 待处理的消息

func (*ConsumerABC) RegistHandler

func (c *ConsumerABC) RegistHandler(topic string, fn EventHanddler) error

RegistHandler 将回调函数注册到指定topic上 @params topic string 注册的topic,topic可以是具体的key也可以是*,*表示监听所有消息 @params fn EventHanddler 注册到topic上的回调函数

func (*ConsumerABC) UnRegistHandler

func (c *ConsumerABC) UnRegistHandler(topic string) error

UnRegistHandler 删除特定topic上注册的回调函数 @params topic string 要取消注册回调的topic,注意`*`取消的只是`*`类型的回调并不是全部取消,要全部取消请使用空字符串

type ConsumerInterface

type ConsumerInterface interface {
	//RegistHandler 注册特定topic的执行函数
	//@params topic string 指定目标topic,`*`为全部topic
	//@params fn EventHanddler 事件触发时执行的函数
	RegistHandler(topic string, fn EventHanddler) error
	//UnRegistHandler 取消注册特定topic的执行函数
	//@params topic string 指定目标topic,`*`为全部topic
	UnRegistHandler(topic string) error
	//Listen 开始监听
	//@params topics string 指定监听的目标topic,使用`,`分隔表示多个topic
	//@params opts ...optparams.Option[ListenOptions] 监听时的一些配置,具体看listenoption.go说明
	Listen(topics string, opts ...optparams.Option[ListenOptions]) error
	//StopListening 停止监听
	StopListening() error
}

ConsumerInterface 消费者对象的接口

type Event

type Event struct {
	Topic     string      `json:"topic,omitempty" msgpack:"topic,omitempty"`
	Sender    string      `json:"sender,omitempty" msgpack:"sender,omitempty"`
	EventTime int64       `json:"event_time,omitempty" msgpack:"event_time,omitempty"` //毫秒级时间戳
	EventID   string      `json:"event_id,omitempty" msgpack:"event_id,omitempty"`
	Payload   interface{} `json:"payload" msgpack:"payload"`
}

Event 消息对象

func DefaultParser

func DefaultParser(SerializeProtocol SerializeProtocolType, topic, eventID, payloadstr string, payload map[string]interface{}) (*Event, error)

DefaultParser 默认的消息处理函数负载会被解析为 m,ap[string]interface{}

type EventHanddler

type EventHanddler func(msg *Event) error

EventHanddler 处理消息的回调函数 @params msg *Event Event对象

type EventParser

type EventParser func(SerializeProtocol SerializeProtocolType, topic, eventID, payloadstr string, payload map[string]interface{}) (*Event, error)

EventParser 用于将负载字符串转化为event的函数 规定eventID不为""时解析流的消息,用到topic, eventID, payload 规定eventID为""时解析除流之外的消息,用到SerializeProtocol,topic, payloadstr

type ListenOptions

type ListenOptions struct {
	ParallelHanddler bool
	Parser           EventParser
	TopicStarts      map[string]string //用于指定特定topic的监听起始位置
}

ListenOptions 消费端listen方法的配置

type Options

type Options struct {
	SerializeProtocol SerializeProtocolType
	// ClientID          string
	UUIDType idgener.IDGENAlgorithm
}

Options broker的配置

type ProducerConsumerABC

type ProducerConsumerABC struct {
	Opt Options
}

ProducerConsumerABC 消费者的基类 定义了回调函数的注册操作和执行操作

func New

type ProducerInterface

type ProducerInterface interface {
	//Publish 发布消息
	//@params ctx context.Context 发送的上下文配置
	//@params topic string 指定发送去的topic
	//@params payload interface{} 消息负载
	Publish(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[PublishOptions]) error
	//PubEvent 发布事件,事件会包含除负载外的一些其他元信息
	//@params ctx context.Context 发送的上下文配置
	//@params topic string 指定发送去的topic
	//@params payload interface{} 消息负载
	PubEvent(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[PublishOptions]) (*Event, error)
}

ProducerInterface 生产者对象的接口

type PublishOptions

type PublishOptions struct {
	NoMkStream bool
	MinID      string
	ID         string
	Limit      int64
	MaxLen     int64 //stream生产者专用,用于设置流的最大长度
	Strict     bool  //stream生产者专用,用于设置流是否为严格模式
}

PublishOptions 消费端listen方法的配置

type SerializeProtocolType

type SerializeProtocolType uint8

AckModeType stream的Ack模式

const (

	//SerializeProtocol_JSON json作为序列化协议
	SerializeProtocol_JSON SerializeProtocolType = iota
	//SerializeProtocol_MSGPACK messagepack作为序列化协议
	SerializeProtocol_MSGPACK
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL