broker

package
v0.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BROKER = "broker"

	KEY_TRACE_MSG_CARRIER = "trace_msg_carrier"
	KEY_BROKER_MSG        = "msg"
)

Variables

This section is empty.

Functions

func LogError

func LogError(l logger.Logger, msg string, topic string, err error)

Types

type Broker

type Broker interface {
	component.Component
	// Return logger
	Logger() logger.Logger
	// Publish a message
	Publish(ctx context.Context, topic string, message interface{}, opts ...PublishOption) error
	// Subscribe to a subject
	Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) error
	// Unsubscribe from a subject
	Unsubscribe(topic string) error
	// String returns the string name of the broker
	String() string
}

Broker interface for adding new brokers

type Handler

type Handler interface {
	// Handles the subscribed message
	Handle(ctx context.Context, m *Message) error
}

Handler used by the subscriber

type Message

type Message struct {
	Body   []byte
	Extras map[string]interface{}
}

Message structure

type Option

type Option func(Broker)

Option to pass as arg while creating new broker instance

type PublishCallback

type PublishCallback func(*TraceMsgCarrier) error

type PublishOption

type PublishOption func(Publisher)

PublishOption to pass as arg while publishing a message

type Publisher

type Publisher interface{}

Interface for a message publisher

type RunOption

type RunOption func(Runner)

RunOption to pass as arg while calling the run method for the broker

type Runner

type Runner interface{}

Interface for a broker runner instance

type SubscribeCallback

type SubscribeCallback func(context.Context, *TraceMsgCarrier) error

type SubscribeOption

type SubscribeOption func(Subscriber)

SubscribeOption to pass as arg while creating subscription

type Subscriber

type Subscriber interface{}

Interface for a subscription subscriber

type Trace

type Trace struct {
	// contains filtered or unexported fields
}

func NewTrace

func NewTrace(b Broker) *Trace

func (*Trace) Publish

func (t *Trace) Publish(ctx context.Context, tm *TraceMsgCarrier, publish PublishCallback) error

Publish adds tracer details to the message

func (*Trace) Subscribe

func (t *Trace) Subscribe(ctx context.Context, tm *TraceMsgCarrier, subscribe SubscribeCallback) error

Subscribe adds the tracer details to the context

type TraceMsgCarrier

type TraceMsgCarrier struct {
	Topic   string
	Message []byte
	Headers map[string]string
}

TraceMsgCarrier implements TextMapPropagator

func NewTraceMsgCarrier

func NewTraceMsgCarrier(topic string, data []byte) *TraceMsgCarrier

NewTraceMsgCarrier creates a new instance of opentel TextMapPropagator

func NewTraceMsgCarrierFromBytes

func NewTraceMsgCarrierFromBytes(tmBytes []byte) *TraceMsgCarrier

NewTraceMsgCarrierFromBytes converts carrier bytes to TraceMsgCarrier

func (*TraceMsgCarrier) Bytes

func (tm *TraceMsgCarrier) Bytes() ([]byte, error)

Bytes converts the TraceMsgCarrier instance to bytes

func (*TraceMsgCarrier) Get

func (tm *TraceMsgCarrier) Get(key string) string

Get returns the key value from the headers property

func (*TraceMsgCarrier) Keys

func (tm *TraceMsgCarrier) Keys() []string

Keys returns the list of keys in the headers

func (*TraceMsgCarrier) Set

func (tm *TraceMsgCarrier) Set(key string, value string)

Set sets the key value of the headers property

type Wrapper

type Wrapper struct {
	// contains filtered or unexported fields
}

func NewWrapper

func NewWrapper(b Broker) *Wrapper

func (*Wrapper) Publish

func (w *Wrapper) Publish(ctx context.Context, topic string, payload []byte, publish PublishCallback) error

Publish - publishes a message with the traceparent if tracer is defined

func (*Wrapper) Subscribe

func (w *Wrapper) Subscribe(ctx context.Context, topic string, tmBytes []byte, subscribe SubscribeCallback) error

Subscribe - subscribes to a message and adds traceparent to the ctx if tracer is defined

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL