queue

package
v1.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Exchange           = "logproxy"
	RoutingKey         = "new.rfc5424"
	ErrInvalidProducer = errors.New("RabbitMQ producer is nil or invalid")
)
View Source
var (
	Base64Pattern = regexp.MustCompile(`^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$`)
)

Functions

func BodyToResource

func BodyToResource(body []byte, m Metrics) (*logging.Resource, error)

BodyToResource takes the raw body and transforms it to a logging.Resource instance

func EncodeString

func EncodeString(s string, charactersToEncode string) string

EncodeString encodes all characters from the characterstoEncode set

func ProcessMessage

func ProcessMessage(rfcLogMessage syslog.Message, m Metrics) (*logging.Resource, error)

func RFC5424QueueName

func RFC5424QueueName() string

RFC5424QueueName returns the queue name to use

func RabbitMQRFC5424Worker

func RabbitMQRFC5424Worker(resourceChannel chan<- logging.Resource, done <-chan bool, m Metrics) rabbitmq.ConsumerHandlerFunc

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel implements a Queue based on a go channel

func NewChannelQueue

func NewChannelQueue(opts ...OptionFunc) (*Channel, error)

func (*Channel) DeadLetter

func (c *Channel) DeadLetter(_ logging.Resource) error

func (*Channel) Output

func (c *Channel) Output() <-chan logging.Resource

func (*Channel) Push

func (c *Channel) Push(raw []byte) error

func (*Channel) SetMetrics added in v1.7.3

func (c *Channel) SetMetrics(m Metrics)

func (*Channel) Start

func (c *Channel) Start() (chan bool, error)

type DHPLogData

type DHPLogData struct {
	Message string `json:"message"`
}

DHPLogData represents the payload of DHPLogMessage

type DHPLogMessage

type DHPLogMessage struct {
	Category            string          `json:"cat"`
	EventID             string          `json:"evt"`
	ApplicationVersion  string          `json:"ver"`
	Component           string          `json:"cmp"`
	ApplicationName     string          `json:"app"`
	ApplicationInstance string          `json:"inst"`
	ServerName          string          `json:"srv"`
	TransactionID       string          `json:"trns"`
	ServiceName         string          `json:"service"`
	LogTime             string          `json:"time"`
	OriginatingUser     string          `json:"usr"`
	Severity            string          `json:"sev"`
	LogData             DHPLogData      `json:"val"`
	TraceID             string          `json:"trace"`
	SpanID              string          `json:"span"`
	Custom              json.RawMessage `json:"custom,omitempty"`
}

DHPLogMessage describes a structured log message from applications

type Deliverer

type Deliverer struct {
	Debug bool
	// contains filtered or unexported fields
}

Deliverer implements all processing logic for parsing and forwarding logs

func NewDeliverer

func NewDeliverer(storer logging.Storer, log Logger, manager *shared.PluginManager, buildVersion string, metrics Metrics) (*Deliverer, error)

NewDeliverer returns a new configured Deliverer instance

func (*Deliverer) ResourceWorker

func (pl *Deliverer) ResourceWorker(queue Queue, done <-chan bool, _ *zipkin.Tracer)

ResourceWorker implements the worker process for parsing the queues

type Logger

type Logger interface {
	Debugf(format string, args ...interface{})
}

Logger implements logging interface

type Metrics added in v1.7.3

type Metrics interface {
	IncProcessed()
	IncEnhancedTransactionID()
	IncEnhancedEncodedMessage()
	IncPluginDropped()
	IncPluginModified()
}

type OptionFunc added in v1.7.3

type OptionFunc func(q Queue) error

func WithMetrics added in v1.7.3

func WithMetrics(m Metrics) OptionFunc

type Queue

type Queue interface {
	// Start initializes the and returns a stop channel
	Start() (chan bool, error)
	// Output should return a channel fed by the queue raw data
	Output() <-chan logging.Resource
	// Push should queue the raw payload
	Push([]byte) error
	// DeadLetter should store a rejected logging.Resource for later processing
	DeadLetter(msg logging.Resource) error
	// Set metrics
	SetMetrics(m Metrics)
}

Queue implements a queue mechanism. The queue can be backed by e.g. RabbitMQ or a simple Go channel. Both of these are provided as part of logproxy. Internally the queue is driven by the Deliverer which transforms the raw payload to a logging.Resource and then pushes it to HSDP logging infrastructure

type RabbitMQ

type RabbitMQ struct {
	// contains filtered or unexported fields
}

RabbitMQ implements Queue backed by RabbitMQ

func NewRabbitMQQueue

func NewRabbitMQQueue(p rabbitmq.Producer, opts ...OptionFunc) (*RabbitMQ, error)

func (*RabbitMQ) DeadLetter

func (r *RabbitMQ) DeadLetter(_ logging.Resource) error

func (*RabbitMQ) Output

func (r *RabbitMQ) Output() <-chan logging.Resource

func (*RabbitMQ) Push

func (r *RabbitMQ) Push(raw []byte) error

func (*RabbitMQ) SetMetrics added in v1.7.3

func (r *RabbitMQ) SetMetrics(m Metrics)

func (*RabbitMQ) Start

func (r *RabbitMQ) Start() (chan bool, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL