mq

package
v2.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package mq common package for any MQ system Based on Kafka Stream but everything what you need is -

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrManualCommit = errors.New("manual commit")
	ErrBadMessage   = errors.New("bad message")
)

Functions

func StartSpanFromConsumer

func StartSpanFromConsumer(_ctx context.Context, span string, e *Message) (trace.Span, context.Context)

StartSpanFromConsumer extract span from kafka's header and continue chain receiver not reference because we put new fields into logger and we expect root ctx for that returns ctx

func StartSpanProducer

func StartSpanProducer(_ctx context.Context, name string, e *Message) (trace.Span, context.Context)

StartSpanProducer inject current span or start new for Kafka

Types

type CallBack

type CallBack func(context.Context, *Message) error

CallBack will call during HandleMessage it supposed that here will be message handled for calculate execution time at least

type KHeader

type KHeader interface {
	propagation.TextMapCarrier
	GetTraceValue() []byte
}

type Message

type Message struct {
	Topic     string
	Key       []byte
	Value     []byte
	Partition int32
	Offset    int64
	Timestamp time.Time
	Header    KHeader
}

func (*Message) String

func (m *Message) String() string

String returns a human readable representation of a Message. Key and payload are not represented.

type MiddleWare

type MiddleWare interface {
	HandleMessage(next CallBack) CallBack
}

MiddleWare ...

func NewConsumerMw

func NewConsumerMw(m metrics.Reader) MiddleWare

NewConsumerMw which provide MW helper for: recovery, debug logging, tracing solution, common metrics and ruration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL