mq

package
v1.4.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2021 License: BSD-3-Clause Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LastOffset  int64 = -1 // The most recent offset available for a partition.
	FirstOffset       = -2 // The least recent offset available for a partition.
)

Variables

View Source
var (
	InvalidMqRoleTypeStringErr = errors.New("invalid mq role type string")
)

Functions

func Close

func Close()

func ReadDelayMsg

func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context.Context, error)

ReadDelayMsg 读完自动确认

func ReadMsgByGroup

func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error)

读完消息后会自动提交offset

func ReadMsgByPartition

func ReadMsgByPartition(ctx context.Context, topic string, partition int, value interface{}) (context.Context, error)

func SetConfiger

func SetConfiger(ctx context.Context, configerType ConfigerType) error

func WatchUpdate

func WatchUpdate(ctx context.Context)

func WriteDelayMsg

func WriteDelayMsg(ctx context.Context, topic string, value interface{}, delaySeconds uint32) (jobID string, err error)

func WriteMsg

func WriteMsg(ctx context.Context, topic string, key string, value interface{}) error

func WriteMsgs

func WriteMsgs(ctx context.Context, topic string, msgs ...Message) error

Types

type AckHandler

type AckHandler interface {
	Ack(ctx context.Context) error
}

func FetchDelayMsg

func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (context.Context, AckHandler, error)

FetchDelayMsg 读完消息后不会自动确认

type ApolloConfig

type ApolloConfig struct {
	// contains filtered or unexported fields
}

func NewApolloConfiger

func NewApolloConfiger() *ApolloConfig

func (*ApolloConfig) GetConfig

func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQType) (*Config, error)

func (*ApolloConfig) Init

func (m *ApolloConfig) Init(ctx context.Context) (err error)

func (*ApolloConfig) ParseKey

func (m *ApolloConfig) ParseKey(ctx context.Context, key string) (*KeyParts, error)

func (*ApolloConfig) Watch

func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent

type Config

type Config struct {
	MQType         MQType
	MQAddr         []string
	Topic          string
	TimeOut        time.Duration
	CommitInterval time.Duration
	// time interval to flush msg to broker default is 1 second
	BatchTimeout time.Duration
	Offset       int64
	OffsetAt     string
	TTR          uint32 // time to run
	TTL          uint32 // time to live
	Tries        uint16 // delay tries
	BatchSize    int

	RequestInterval time.Duration
}

type Configer

type Configer interface {
	Init(ctx context.Context) error
	GetConfig(ctx context.Context, topic string, mqType MQType) (*Config, error)
	ParseKey(ctx context.Context, k string) (*KeyParts, error)
	Watch(ctx context.Context) <-chan *center.ChangeEvent
}
var DefaultConfiger Configer

func NewConfiger

func NewConfiger(configType ConfigerType) (Configer, error)

type ConfigerType

type ConfigerType int
const (
	ConfigerTypeSimple ConfigerType = iota
	ConfigerTypeEtcd
	ConfigerTypeApollo
)

func (ConfigerType) String

func (c ConfigerType) String() string

type DelayClient

type DelayClient struct {
	// contains filtered or unexported fields
}

延迟队列客户端

func NewDefaultDelayClient

func NewDefaultDelayClient(ctx context.Context, topic string) (*DelayClient, error)

NewDefaultDelayClient 通过topic创建默认客户端

func NewDelayClient

func NewDelayClient(endpoint, namespace, queue string, ttlSeconds, ttrSeconds uint32, tries uint16, requestInterval time.Duration) *DelayClient

func (*DelayClient) Ack

func (p *DelayClient) Ack(ctx context.Context, jobID string) error

Ack 确认消费

func (*DelayClient) Read

func (p *DelayClient) Read(ctx context.Context, ttrSeconds uint32) (job *Job, err error)

Read 消费任务

func (*DelayClient) Write

func (p *DelayClient) Write(ctx context.Context, value interface{}, ttlSeconds, delaySeconds uint32, tries uint16) (jobID string, err error)

Write 发布任务

type DelayHandler

type DelayHandler struct {
	// contains filtered or unexported fields
}

func NewDelayHandler

func NewDelayHandler(cli *DelayClient, jobID string) *DelayHandler

func (*DelayHandler) Ack

func (p *DelayHandler) Ack(ctx context.Context) error

type EtcdConfig

type EtcdConfig struct {
	// contains filtered or unexported fields
}

func NewEtcdConfiger

func NewEtcdConfiger() *EtcdConfig

func (*EtcdConfig) GetConfig

func (m *EtcdConfig) GetConfig(ctx context.Context, topic string, mqType MQType) (*Config, error)

func (*EtcdConfig) Init

func (m *EtcdConfig) Init(ctx context.Context) error

func (*EtcdConfig) ParseKey

func (m *EtcdConfig) ParseKey(ctx context.Context, k string) (*KeyParts, error)

func (*EtcdConfig) Watch

func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent

type Handler

type Handler interface {
	CommitMsg(ctx context.Context) error
}

func FetchMsgByGroup

func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, Handler, error)

读完消息后不会自动提交offset,需要手动调用Handle.CommitMsg方法来提交offset

type InstanceManager

type InstanceManager struct {
	// contains filtered or unexported fields
}

func NewInstanceManager

func NewInstanceManager() *InstanceManager

func (*InstanceManager) Close

func (m *InstanceManager) Close()

type Job

type Job struct {
	Namespace string `json:"namespace"`
	Queue     string `json:"queue"`
	Body      []byte `json:"body"` // 任务具体实体
	ID        string `json:"id"`
	TTL       uint32 `json:"ttl"`        // 任务过期时间 单位:s
	Delay     uint32 `json:"delay"`      // 任务延迟时间 单位:s
	ElapsedMS int64  `json:"elapsed_ms"` // 任务从产生到消费时间 单位:ms
}

延迟队列任务

type KafkaHandler

type KafkaHandler struct {
	// contains filtered or unexported fields
}

func NewKafkaHandler

func NewKafkaHandler(reader *kafka.Reader, msg kafka.Message) *KafkaHandler

func (*KafkaHandler) CommitMsg

func (m *KafkaHandler) CommitMsg(ctx context.Context) error

type KafkaReader

type KafkaReader struct {
	*kafka.Reader
}

func NewKafkaReader

func NewKafkaReader(brokers []string, topic, groupId string, partition, minBytes, maxBytes int, commitInterval time.Duration) *KafkaReader

func (*KafkaReader) Close

func (m *KafkaReader) Close() error

func (*KafkaReader) FetchMsg

func (m *KafkaReader) FetchMsg(ctx context.Context, v interface{}, ov interface{}) (Handler, error)

func (*KafkaReader) ReadMsg

func (m *KafkaReader) ReadMsg(ctx context.Context, v interface{}, ov interface{}) error

func (*KafkaReader) SetOffset

func (m *KafkaReader) SetOffset(ctx context.Context, offset int64) error

func (*KafkaReader) SetOffsetAt

func (m *KafkaReader) SetOffsetAt(ctx context.Context, t time.Time) error

type KafkaWriter

type KafkaWriter struct {
	*kafka.Writer
	// contains filtered or unexported fields
}

func NewKafkaWriter

func NewKafkaWriter(brokers []string, topic string) *KafkaWriter

func (*KafkaWriter) Close

func (m *KafkaWriter) Close() error

func (*KafkaWriter) WriteMsg

func (m *KafkaWriter) WriteMsg(ctx context.Context, k string, v interface{}) error

func (*KafkaWriter) WriteMsgs

func (m *KafkaWriter) WriteMsgs(ctx context.Context, msgs ...Message) error

type KeyParts

type KeyParts struct {
	Topic string
	Group string
}

type MQRoleType

type MQRoleType int
const (
	RoleTypeReader MQRoleType = iota
	RoleTypeWriter
	RoleTypeDelayClient
)

func MQRoleTypeFromInt

func MQRoleTypeFromInt(it int) (t MQRoleType, err error)

func (MQRoleType) String

func (t MQRoleType) String() string

type MQType

type MQType int
const (
	MQTypeKafka MQType = iota
	MQTypeDelay
)

func (MQType) String

func (t MQType) String() string

type Message

type Message struct {
	Key   string
	Value interface{}
}

type Payload

type Payload struct {
	Carrier opentracing.TextMapCarrier `json:"c"`
	Value   string                     `json:"v"`
	Head    interface{}                `json:"h"`
	Control *mqPayloadControl          `json:"t"`
}

type Reader

type Reader interface {
	FetchMsg(ctx context.Context, value interface{}, ovalue interface{}) (Handler, error)
	ReadMsg(ctx context.Context, value interface{}, ovalue interface{}) error
	SetOffsetAt(ctx context.Context, t time.Time) error
	SetOffset(ctx context.Context, offset int64) error
	Close() error
}

func NewGroupReader

func NewGroupReader(ctx context.Context, topic, groupId string) (Reader, error)

CommitInterval indicates the interval at which offsets are committed to the broker. If 0, commits will be handled synchronously.

func NewPartitionReader

func NewPartitionReader(ctx context.Context, topic string, partition int) (Reader, error)

type SimpleConfig

type SimpleConfig struct {
	// contains filtered or unexported fields
}

func NewSimpleConfiger

func NewSimpleConfiger() *SimpleConfig

func (*SimpleConfig) GetConfig

func (m *SimpleConfig) GetConfig(ctx context.Context, topic string, mqType MQType) (*Config, error)

func (*SimpleConfig) Init

func (m *SimpleConfig) Init(ctx context.Context) error

func (*SimpleConfig) ParseKey

func (m *SimpleConfig) ParseKey(ctx context.Context, k string) (*KeyParts, error)

func (*SimpleConfig) Watch

func (m *SimpleConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent

type Writer

type Writer interface {
	WriteMsg(ctx context.Context, key string, value interface{}) error
	WriteMsgs(ctx context.Context, msgs ...Message) error
	Close() error
}

func NewWriter

func NewWriter(ctx context.Context, topic string) (Writer, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL