message_channel

package module
v0.0.0-...-a423905 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2023 License: MIT Imports: 4 Imported by: 0

README

Message Channel通信模型

一、这是什么,解决了什么问题?

TODO 通信模型详解以及一些坑和注意点

当我们的系统中组件比较多的时候,就会出现互相调用的关系,比如

多个组件之间实时通信的问题

二、Example

三、API详解

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel[Message any] struct {

	// 全局唯一的ID,每个信道的ID都不同,用于区分不同的信道
	ID uint64
	// contains filtered or unexported fields
}

Channel 用于把多个Channel连接为一个channel,这样就可以用基于channel构建更复杂的通信模型 每个Channel对象是对go原生的channel的一个包装,同时增加了一些功能

func NewChannel

func NewChannel[Message any](options *ChannelOptions[Message]) *Channel[Message]

NewChannel 创建一个信道

func (*Channel[Message]) MakeChildChannel

func (x *Channel[Message]) MakeChildChannel() *Channel[Message]

MakeChildChannel 创建一条新的消息队列,对接到当前的消息队列上作为一个子队列 当前队列关闭之前需要等待所有的孩子队列关闭

func (*Channel[Message]) ReceiverWait

func (x *Channel[Message]) ReceiverWait(ctx context.Context)

ReceiverWait 消息的接收方调用的,消息的接收方需要同步等待此消息信道被处理完毕时调用

func (*Channel[Message]) Send

func (x *Channel[Message]) Send(ctx context.Context, message Message) error

Send 往当前的消息队列中发送一条消息,消息放入之后会被异步处理

func (*Channel[Message]) SenderWaitAndClose

func (x *Channel[Message]) SenderWaitAndClose(f ...MapRunFunc[Message])

SenderWaitAndClose 消息的发送方调用,消息的发送方需要同步等待消息被处理完时调用

func (*Channel[Message]) TopologyAscii

func (x *Channel[Message]) TopologyAscii(f ...MapRunFunc[Message]) string

TopologyAscii 把拓扑逻辑转为ASCII图形,这样就能比较方便的观察依赖关系了

type ChannelConsumerFunc

type ChannelConsumerFunc[Message any] func(index int, message Message)

ChannelConsumerFunc 用于消费channel中的元素

type ChannelOptions

type ChannelOptions[Message any] struct {

	// 每个channel都可以有一个名字
	Name string

	// 关闭Channel时的回调函数
	CloseEventListener CloseEventListener

	// 用于消费channel中的元素
	ChannelConsumerFunc ChannelConsumerFunc[Message]

	// channel的缓存大小
	ChannelBuffSize uint64
}

ChannelOptions 创建Channel时的选项

func NewChannelOptions

func NewChannelOptions[Message any]() *ChannelOptions[Message]

func (*ChannelOptions[Message]) WithChannelBuffSize

func (x *ChannelOptions[Message]) WithChannelBuffSize(channelBuffSize uint64) *ChannelOptions[Message]

func (*ChannelOptions[Message]) WithChannelConsumerFunc

func (x *ChannelOptions[Message]) WithChannelConsumerFunc(channelConsumerFunc ChannelConsumerFunc[Message]) *ChannelOptions[Message]

func (*ChannelOptions[Message]) WithCloseEventListener

func (x *ChannelOptions[Message]) WithCloseEventListener(closeEventListener CloseEventListener) *ChannelOptions[Message]

func (*ChannelOptions[Message]) WithName

func (x *ChannelOptions[Message]) WithName(name string) *ChannelOptions[Message]

type ChildrenMap

type ChildrenMap[Message any] struct {
	// contains filtered or unexported fields
}

ChildrenMap 线程安全的map,用于存放当前信道的子信道

func NewChildrenMap

func NewChildrenMap[Message any]() *ChildrenMap[Message]

NewChildrenMap 创建一个存放子channel的map

func (*ChildrenMap[Message]) BlockUtilEmpty

func (x *ChildrenMap[Message]) BlockUtilEmpty(ctx context.Context, f MapRunFunc[Message], interval ...time.Duration) error

BlockUtilEmpty 阻塞住直到当前map为空,期间会每隔给定的间隔尝试获取map的情况

func (*ChildrenMap[Message]) ChildrenSlice

func (x *ChildrenMap[Message]) ChildrenSlice(ctx context.Context) ([]*Channel[Message], error)

ChildrenSlice 把map中所有的子channel都转为切片形式返回

func (*ChildrenMap[Message]) Remove

func (x *ChildrenMap[Message]) Remove(ctx context.Context, id uint64) error

Remove 从map中删除值

func (*ChildrenMap[Message]) Run

func (x *ChildrenMap[Message]) Run(ctx context.Context, f MapRunFunc[Message]) error

Run 在map上执行函数,此函数的执行是串行互斥并发安全的

func (*ChildrenMap[Message]) Set

func (x *ChildrenMap[Message]) Set(ctx context.Context, id uint64, childChannel *Channel[Message]) error

Set 设置map的值

func (*ChildrenMap[Message]) Size

func (x *ChildrenMap[Message]) Size(ctx context.Context) (int, error)

Size 统计map中元素的数量

type CloseEventListener

type CloseEventListener func()

CloseEventListener channel被关闭时的监听器

type MapRunFunc

type MapRunFunc[Message any] func(ctx context.Context, m map[uint64]*Channel[Message]) error

MapRunFunc 用来处理map函数 ctx: 用来做超时控制 m: 传入的函数,可以并发安全的去操作 error: 如果发生错误时会返回错误,返回nil为无错误

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL