eventbus

package module
Version: v0.0.0-...-4593672 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2021 License: MIT Imports: 13 Imported by: 2

README

eventbus (实验性支持)

EventBus 事件总线

. RabbitMQ支持

. Redis Stream支持

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultRabbitMQOptions 默认RabbitMQ可选项
	DefaultRabbitMQOptions = RabbitMQOptions{
		ExchangeName:        "nilorg.eventbus",
		ExchangeType:        "topic",
		QueueMessageExpires: 864000000,
		Serialize:           &JSONSerialize{},
		Logger:              &StdLogger{},
		PoolMinOpen:         1,
		PoolMaxOpen:         10,
	}
)
View Source
var (
	// DefaultRedisOptions 默认Redis可选项
	DefaultRedisOptions = RedisOptions{
		ReadCount: 1,
		Serialize: &JSONSerialize{},
		Logger:    &StdLogger{},
	}
)
View Source
var (
	// ErrRabbitMQChannelNotFound ...
	ErrRabbitMQChannelNotFound = errors.New("rabbitmq channel not found")
)
View Source
var (
	// Version 版本
	Version = "v1"
)

Functions

func FromGroupIDContext

func FromGroupIDContext(ctx context.Context) (groupID string, ok bool)

FromGroupIDContext ...

func NewGroupIDContext

func NewGroupIDContext(parent context.Context, groupID string) context.Context

NewGroupIDContext ...

func NewSetMessageHeaderContext

func NewSetMessageHeaderContext(ctx context.Context, f SetMessageHeader) context.Context

NewSetMessageHeaderContext ...

Types

type EventBus

type EventBus interface {
	Publisher
	Subscriber
}

EventBus 事件总线

func NewRabbitMQ

func NewRabbitMQ(conn *amqp.Connection, options ...*RabbitMQOptions) (bus EventBus, err error)

NewRabbitMQ 创建RabbitMQ事件总线

func NewRedis

func NewRedis(conn *redis.Client, options ...*RedisOptions) (bus EventBus, err error)

NewRedis 创建Redis事件总线

type JSONSerialize

type JSONSerialize struct {
}

JSONSerialize json序列号

func (JSONSerialize) ContentType

func (JSONSerialize) ContentType() string

ContentType ...

func (JSONSerialize) Marshal

func (JSONSerialize) Marshal(msg interface{}) (data []byte, err error)

Marshal ...

func (JSONSerialize) Unmarshal

func (JSONSerialize) Unmarshal(data []byte, msg interface{}) (err error)

Unmarshal ...

type Logger

type Logger interface {
	// Debugf 测试
	Debugf(ctx context.Context, format string, args ...interface{})
	// Debugln 测试
	Debugln(ctx context.Context, args ...interface{})
	// Infof 信息
	Infof(ctx context.Context, format string, args ...interface{})
	// Infoln 消息
	Infoln(ctx context.Context, args ...interface{})
	// Warnf 警告
	Warnf(ctx context.Context, format string, args ...interface{})
	// Warnln 警告
	Warnln(ctx context.Context, args ...interface{})
	// Warningf 警告
	Warningf(ctx context.Context, format string, args ...interface{})
	// Warningln 警告
	Warningln(ctx context.Context, args ...interface{})
	// Errorf 错误
	Errorf(ctx context.Context, format string, args ...interface{})
	// Errorln 错误
	Errorln(ctx context.Context, args ...interface{})
}

Logger logger

func NewLogrusLogger

func NewLogrusLogger(log *logrus.Logger) Logger

NewLogrusLogger ...

type LogrusLogger

type LogrusLogger struct {
	// contains filtered or unexported fields
}

LogrusLogger ...

func (*LogrusLogger) Debugf

func (l *LogrusLogger) Debugf(ctx context.Context, format string, args ...interface{})

Debugf 测试

func (*LogrusLogger) Debugln

func (l *LogrusLogger) Debugln(ctx context.Context, args ...interface{})

Debugln 测试

func (*LogrusLogger) Errorf

func (l *LogrusLogger) Errorf(ctx context.Context, format string, args ...interface{})

Errorf 错误

func (*LogrusLogger) Errorln

func (l *LogrusLogger) Errorln(ctx context.Context, args ...interface{})

Errorln 错误

func (*LogrusLogger) Infof

func (l *LogrusLogger) Infof(ctx context.Context, format string, args ...interface{})

Infof 信息

func (*LogrusLogger) Infoln

func (l *LogrusLogger) Infoln(ctx context.Context, args ...interface{})

Infoln 消息

func (*LogrusLogger) Warnf

func (l *LogrusLogger) Warnf(ctx context.Context, format string, args ...interface{})

Warnf 警告

func (*LogrusLogger) Warningf

func (l *LogrusLogger) Warningf(ctx context.Context, format string, args ...interface{})

Warningf 警告

func (*LogrusLogger) Warningln

func (l *LogrusLogger) Warningln(ctx context.Context, args ...interface{})

Warningln 警告

func (*LogrusLogger) Warnln

func (l *LogrusLogger) Warnln(ctx context.Context, args ...interface{})

Warnln 警告

type Message

type Message struct {
	Header MessageHeader `json:"header"`
	Value  interface{}   `json:"value"`
}

Message 消息

type MessageHeader

type MessageHeader map[string]string

MessageHeader message header

type Publisher

type Publisher interface {
	Publish(ctx context.Context, topic string, v interface{}) (err error)
	PublishAsync(ctx context.Context, topic string, v interface{}) (err error)
}

Publisher 发布接口

type RabbitMQOptions

type RabbitMQOptions struct {
	ExchangeName             string
	ExchangeType             string
	QueueMessageExpires      int64
	Serialize                Serializer
	Logger                   Logger
	PoolMinOpen, PoolMaxOpen int
}

RabbitMQOptions RabbitMQ可选项

type RedisOptions

type RedisOptions struct {
	ReadCount int64
	ReadBlock time.Duration
	Serialize Serializer
	Logger    Logger
}

RedisOptions Redis可选项

type Serializer

type Serializer interface {
	Unmarshal(data []byte, msg interface{}) (err error)
	Marshal(msg interface{}) (data []byte, err error)
	ContentType() string
}

Serializer 序列化器

type SetMessageHeader

type SetMessageHeader func(ctx context.Context) MessageHeader

SetMessageHeader set message header func

func FromSetMessageHeaderContext

func FromSetMessageHeaderContext(ctx context.Context) (f SetMessageHeader, ok bool)

FromSetMessageHeaderContext ...

type StdLogger

type StdLogger struct {
}

StdLogger ...

func (StdLogger) Debugf

func (StdLogger) Debugf(ctx context.Context, format string, args ...interface{})

Debugf 测试

func (StdLogger) Debugln

func (StdLogger) Debugln(ctx context.Context, args ...interface{})

Debugln 测试

func (StdLogger) Errorf

func (StdLogger) Errorf(ctx context.Context, format string, args ...interface{})

Errorf 错误

func (StdLogger) Errorln

func (StdLogger) Errorln(ctx context.Context, args ...interface{})

Errorln 错误

func (StdLogger) Infof

func (StdLogger) Infof(ctx context.Context, format string, args ...interface{})

Infof 信息

func (StdLogger) Infoln

func (StdLogger) Infoln(ctx context.Context, args ...interface{})

Infoln 消息

func (StdLogger) Warnf

func (StdLogger) Warnf(ctx context.Context, format string, args ...interface{})

Warnf 警告

func (StdLogger) Warningf

func (StdLogger) Warningf(ctx context.Context, format string, args ...interface{})

Warningf 警告

func (StdLogger) Warningln

func (StdLogger) Warningln(ctx context.Context, args ...interface{})

Warningln 警告

func (StdLogger) Warnln

func (StdLogger) Warnln(ctx context.Context, args ...interface{})

Warnln 警告

type SubscribeHandler

type SubscribeHandler func(ctx context.Context, msg *Message) error

SubscribeHandler 订阅处理

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, topic string, h SubscribeHandler) (err error)
	SubscribeAsync(ctx context.Context, topic string, h SubscribeHandler) (err error)
}

Subscriber 订阅接口

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL