tel

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2021 License: MIT Imports: 35 Imported by: 2

README

= Telemetry

https://wiki.egt-ua.loc/display/EL/Logging+Policy[Wiki Logging policy]

Framework which aims to ease logging affair: `Logs`, `Traces` and `Metrics`.

Tel use `zap.Logger` as the heart of system.
That why it's pass all zap functions through.

== Motto

Ony context all logs.

Decrease external dependencies as match as possible.

== Features

* Jaeger log fan-in (already in box)
* Kafka log fan-in.
Follow example: `ExampleTelemetry_KafkaLog`
* Kafka consumer/produce tracing spans.
Follow examples:  `ExampleKHeader`, `ExampleTelemetry_StartSpanFromKafka`
* Kafka health checker inside `kaf` package
* Kafka mw consumer (recovery, debug log, trace, metric + duration)
* Span wrapper which helps to write both onto trace logs and zap-log.
* Configuration standardize via prebuild env with `GetConfigFromEnv`
* Grpc All-in-One middleware client (recovery, logger, trace and metric injection) - `GrpcUnaryClientInterceptorAll`, `GrpcUnaryServerInterceptor`
* Http All-in-One middleware server (recovery, logger, trace and metric injection) - `HttpServerMiddlewareAll`
* graylog package `github.com/snovichkov/gelf` optimized
** fixed non-compresed mode
** async option which save resources during compression
** async option has resiliency pattern: retrier
** zap.Sync used in async mode to write all buffered messages
** fixed bags with ignoring dispatched messages because of wrong key validation.
** add boolean types support (they require quotes for that)
* middleware: dynamically select level according to recovery, error in handler or just simple notification.
* caller field tune both for `tel` and `span`
* monitor handle health, metrics and pprof endpoint.
Pprof enabled when DEBUG option switched on.
* fx.Printer interface

== Env

.PROJECT
project name

`type`: string

.NAMESPACE
project namespace

`type`: string

.LOG_LEVEL
info log

`type`: string
NOTE:  debug, info, warn, error, dpanic, panic, fatal

.DEBUG
for IsDebug() function

`type`: bool

.GRAYLOG_ADDR
graylog address

NOTE: address logic represented in net.Listen description

`type`: string

.SENTRY_DSN
sentry dns

`type`: string

.MONITOR_ADDR
address where `health`, `prometheus` would be listen

NOTE: address logic represented in net.Listen description

=== jaeger
we use jaeger's config.FromEnv to retrieve furthermore k8s devops uses that var either as they are ubiquitous, so no need to create new wheel.

Further i provide snippet from jaeger config section

[sorce,go]
----
const (
	// environment variable names
	envServiceName                         = "JAEGER_SERVICE_NAME"
	envDisabled                            = "JAEGER_DISABLED"
	envRPCMetrics                          = "JAEGER_RPC_METRICS"
	envTags                                = "JAEGER_TAGS"
	envSamplerType                         = "JAEGER_SAMPLER_TYPE"
	envSamplerParam                        = "JAEGER_SAMPLER_PARAM"
	envSamplerManagerHostPort              = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint
	envSamplingEndpoint                    = "JAEGER_SAMPLING_ENDPOINT"
	envSamplerMaxOperations                = "JAEGER_SAMPLER_MAX_OPERATIONS"
	envSamplerRefreshInterval              = "JAEGER_SAMPLER_REFRESH_INTERVAL"
	envReporterMaxQueueSize                = "JAEGER_REPORTER_MAX_QUEUE_SIZE"
	envReporterFlushInterval               = "JAEGER_REPORTER_FLUSH_INTERVAL"
	envReporterLogSpans                    = "JAEGER_REPORTER_LOG_SPANS"
	envReporterAttemptReconnectingDisabled = "JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED"
	envReporterAttemptReconnectInterval    = "JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL"
	envEndpoint                            = "JAEGER_ENDPOINT"
	envUser                                = "JAEGER_USER"
	envPassword                            = "JAEGER_PASSWORD"
	envAgentHost                           = "JAEGER_AGENT_HOST"
	envAgentPort                           = "JAEGER_AGENT_PORT"
)
----

== ToDo
* [ ] Expose health check to specific metric

== Usage

=== main init

[source=go]
----
    // create tel instance
	t := tel.New(tel.GetConfigFromEnv())
	defer t.Close()

	// init ctx containing telemetry
	ctx := t.Copy()

    // hello msg
	tel.FromCtx(ctx).Info("HELLO WORLD",
		zap.Bool("is-debug-mode", tel.FromCtx(ctx).IsDebug()),
		zap.Bool("is-log-level-debug", tel.FromCtx(ctx).Logger.Core().Enabled(zap.DebugLevel)),
		zap.Bool("is-stream-worker", runStreamWorker),
	)

    // .... //

    // ------
    // Metrics
    // -------
    httpMetrics := metrics.NewHttpMetric(metrics.DefaultHTTPPathRetriever())

	go tel.FromCtx(ctx).M().
        // add grpc, http + custom local gauge metric `gauge`
		AddMetricTracker(ctx, metrics.NewGrpcClientTracker(), httpMetrics, gauge).
        // health check
		AddHealthChecker(ctx, tel.HealthChecker{
			Name:    "grpc service",
			Handler: checkers.NewGrpcClientChecker(sbConn),
		}).
		Start(ctx)

    // .... //

    // link grpc/http/kafka-consumer clien mw
    // init http/grpc/kafka-producer with mw

    // .... //

    // pass ctx to you controllers where u can use log via ctx
	gr, _ := errgroup.WithContext(ctx)
	gr.Go(func() error {
		cc.Run(ctx)
		return nil
	})

	gr.Go(func() error {
		if runStreamWorker {
			worker.RunStream(ctx, gauge)
		} else {
			worker.Run(ctx)
		}

		return nil
	})
	_ = gr.Wait()
----

=== grpc client init with mw (metrics are embedded in interceptor)

[source=go]
----
func Connect(ctx context.Context, addr string) *grpc.ClientConn {
	dialOptions := clientDialOptionsInsecure(ctx)
	conn, err := grpc.Dial(addr, dialOptions...)
	if err != nil {
		tel.FromCtx(ctx).Fatal("grpc dial", zap.Error(err))
	}

	return conn
}

func clientDialOptionsInsecure(ctx context.Context) []grpc.DialOption {
	return []grpc.DialOption{
		grpc.WithInsecure(),
		grpc.WithKeepaliveParams(keepalive.ClientParameters{
			Time:                keepAliveTime,
			PermitWithoutStream: true,
		}),
		grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
			tel.FromCtx(ctx).GrpcUnaryClientInterceptorAll(),
			timeoutClientInterceptor(defaultTimeout),
			waitForReadyInterceptor,
			grpc_retry.UnaryClientInterceptor(retryOpts()...),
		)),
	}
}

----

=== kafka-consumer example with metrics

[source=go]
----
type consumer struct {
	c       *kafka.Consumer
    // most metrics all filling inside kaf.NewConsumerMw but some u should handle by yourself
	metrics kaf.MetricsReader
}

// handleMessage mw-like wrapper for kaf.ConsumerCallBack to put own handle logic here
func (s *consumer) handleMessage(cb kaf.ConsumerCallBack) kaf.ConsumerCallBack {
	return func(ctx context.Context, message *kafka.Message) error {
		if err := cb(ctx, message); err != nil {
			return err
		}

		partition, err := s.c.CommitMessage(message)
		if err != nil {
			return fmt.Errorf("commit: %w", err)
		}

		s.commitNotify(partition)
		return nil
	}
}

func (s *consumer) process(ctx context.Context, topic string, handler kaf.ConsumerCallBack) {
    s.metrics.AddReaderTopicsInUse()
	defer s.metrics.RmReaderTopicsInUse()

    cb := kaf.NewConsumerMw(s.metrics).HandleMessage(s.handleMessage(handler))

    // ... ///
		case m, ok := <-s.c.Events():
			if !ok {
				tel.FromCtx(ctx).Fatal("channel is closed.")
			}
			switch e := m.(type) {
			case kafka.AssignedPartitions:
				// ... ///
			case kafka.RevokedPartitions:
				// ... ///
			case kafka.Error:
				lvl := zapcore.ErrorLevel
				if checkFatalKafka(e.Code()) {
					lvl = zapcore.FatalLevel
				}

				// Errors should generally be considered as informational, the client will try to automatically recover
				tel.FromCtx(ctx).Check(lvl, "kafka:factory error event").Write(
					zap.Error(fmt.Errorf("%s", e.String())),
					zap.String("code", e.Code().String()),
				)
			case *kafka.Message:
				if err := cb(ctx, e); err != nil && !errors.Is(err, kaf.ErrManualCommit) {
					tel.FromCtx(ctx).Error("factory message process", zap.Error(err))
				}
			case kafka.PartitionEOF:
				// ... ///
			default:
				tel.FromCtx(ctx).Debug("event", zap.Any("event", m))
    // ... ///
    s.metrics.AddReaderTopicCommitEvents(*p.Topic, 1)
}
----

=== pure http client example with metrics

right now developer responsible to create that flow, but i guess further we will find correct approach with that

[source=go]
----
type service struct {
	url    url.URL
	key    string
	client *http.Client

	metric metrics.HttpTracker
}

func (s *service) send(_ctx context.Context, p r, m interface{}) (err error) {
     // ... ///

    // right now u should handle trace by yourself
span, _ := opentracing.StartSpanFromContext(_ctx, fmt.Sprintf("%s-%s", p.method, uri.String()))
	defer span.Finish()

	req, err := http.NewRequest(p.method, uri.String(), b)
	if err != nil {
		return fmt.Errorf("new request %w", err)
	}

	req.Header.Set("Content-Type", ct)
	req.Header.Set(secKey, s.key)

	// metrics wrapper just give us more information
	res, err := s.metric.Do(s.client, req)
	if err != nil {
		return fmt.Errorf("post %q error %w", uri.String(), err)
	}

	defer res.Body.Close()
    // ... ///
}
----

=== kafka-produces

producer is fully featured by tel

[soruce=go]
----
type producer struct {
	srv *kafka.Producer

	group string
}

func (t *producer) Produce(_ctx context.Context, topic string, key []byte, value []byte, headers ...kafka.Header) error {
	return kaf.NewProducerMiddleware(t.srv).Produce(_ctx, &kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Key:            key,
		Value:          value,
		Headers:        headers,
	})
}

----

=== pure http-server example uses chi (native http handler router)
just link mw

[soruce=go]
----
type service struct {
	mx     *chi.Mux
	server http.Server
}

func (s *service) mwSetup(ctx context.Context, m metrics.HttpTracker) {
	// ... ///
	s.mx.Use(tel.FromCtx(ctx).HttpServerMiddlewareAll(m))
}

----

== grpc-server
[soruce=go]
----
s := grpc.NewServer(
		grpc.ChainUnaryInterceptor(FromCtx(ctx).GrpcUnaryServerInterceptor()),
	)
----

Documentation

Overview

Package tel represent Telemetry service we support context as source of Telemetry for gracefully support middleware we not pass ref to Telemetry for better handling different log instances

Index

Constants

View Source
const (
	MonitorEndpoint     = "/metrics"
	HealthEndpoint      = "/health"
	PprofIndexEndpoint  = "/debug/pprof"
	EchoShutdownTimeout = 5 * time.Second
)
View Source
const (
	Component = "component"
)

Variables

View Source
var (
	ErrEmptyHandler = fmt.Errorf("unable to add nil health to monitoring")
)
View Source
var ErrGrpcInternal = status.New(codes.Internal, "internal server error").Err()

Functions

func FromCtxWithSpan

func FromCtxWithSpan(ctx context.Context) *span

FromCtxWithSpan retrieves from ctx tele span object span object just composition of tele object with open-tracing instance which write both to log and fill trace log simultaneously

func GrpcUnaryClientInterceptor

func GrpcUnaryClientInterceptor(ignore ...string) grpc.UnaryClientInterceptor

GrpcUnaryClientInterceptor input ctx assume that it contain telemetry instance as well as invoker already under our telemetry GrpcUnaryClientInterceptor implement:

  • recovery
  • detail log during errors (+ in recovery also)
  • measure execution time

func NewTelemetryContext

func NewTelemetryContext(cfg Config, ctx context.Context) context.Context

NewTelemetryContext creates new instance and put it to @ctx

func SetLogOutput

func SetLogOutput(ctx context.Context) *bytes.Buffer

SetLogOutput debug function for duplicate input log into bytes.Buffer

func StartSpanFromContext

func StartSpanFromContext(ctx context.Context, name string, opts ...opentracing.StartSpanOption) (span, context.Context)

StartSpanFromContext start telemetry span witch create or continue existent trace for gracefully continue trace ctx should contain both span and tele

func UpdateTraceFields

func UpdateTraceFields(ctx context.Context)

UpdateTraceFields during session start good way to update tracing fields @prefix - for split different inter-service calls: kafka, grpc, db and etc

func WithContext added in v1.0.6

func WithContext(ctx context.Context, l Telemetry) context.Context

Types

type Config

type Config struct {
	Project   string `env:"PROJECT"`
	Namespace string `env:"NAMESPACE"`
	LogLevel  string `env:"LOG_LEVEL" envDefault:"info"`
	Debug     bool   `env:"DEBUG" envDefault:"false"`

	MonitorAddr string `env:"MONITOR_ADDR" envDefault:"0.0.0.0:8011"`
}

func DefaultConfig

func DefaultConfig() Config

func DefaultDebugConfig

func DefaultDebugConfig() Config

func GetConfigFromEnv

func GetConfigFromEnv() Config

GetConfigFromEnv uses DefaultConfig and overwrite only variables present in env

type HealthChecker

type HealthChecker struct {
	Name    string
	Handler health.Checker
}

type HealthHandler

type HealthHandler struct {
	health.CompositeChecker
}

func NewHealthHandler

func NewHealthHandler() *HealthHandler

NewHandler returns a new Handler

func (*HealthHandler) ServeHTTP

func (h *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP returns a json encoded health set the status to http.StatusServiceUnavailable if the check is down

type Monitor

type Monitor interface {
	AddMetricTracker(ctx context.Context, metrics ...metrics.MetricTracker) Monitor
	AddHealthChecker(ctx context.Context, handlers ...HealthChecker) Monitor

	Start(ctx context.Context)
	GracefulStop(ctx context.Context)
}

type Telemetry

type Telemetry struct {
	*zap.Logger
	// contains filtered or unexported fields
}

func FromCtx

func FromCtx(ctx context.Context) *Telemetry

FromCtx retrieves from ctx tel object

func New

func New(cfg Config) (t Telemetry)

func NewNull

func NewNull() Telemetry

func (*Telemetry) Close

func (t *Telemetry) Close()

Close properly Telemetry instance

func (Telemetry) Copy

func (t Telemetry) Copy() Telemetry

Copy resiver instance and give us more convenient way to use pipelines

func (Telemetry) Ctx

func (t Telemetry) Ctx() context.Context

Ctx initiate new ctx with Telemetry

func (Telemetry) GrpcUnaryClientInterceptorAll

func (t Telemetry) GrpcUnaryClientInterceptorAll(ignore ...string) grpc.UnaryClientInterceptor

GrpcUnaryClientInterceptorAll setup recovery, metrics, tracing and debug option according goal of our framework Execution order:

  • opentracing injection via otgrpc.OpenTracingClientInterceptor
  • recovery, measure execution time + debug log via own GrpcUnaryClientInterceptor
  • metrics via metrics.UnaryClientInterceptor

func (Telemetry) GrpcUnaryServerInterceptor

func (t Telemetry) GrpcUnaryServerInterceptor(ignore ...string) grpc.UnaryServerInterceptor

GrpcUnaryServerInterceptor the most important create new telepresence instance + fill trace ids

implements:
* new telepresence instance
* fill trace ids
* recovery
* detail log during errors (+ in recovery also)
* measure execution time

func (Telemetry) GrpcUnaryServerInterceptorAll

func (t Telemetry) GrpcUnaryServerInterceptorAll(ignore ...string) grpc.UnaryServerInterceptor

GrpcUnaryServerInterceptorAll setup recovery, metrics, tracing and debug option according goal of our framework Execution order:

  • opentracing injection via otgrpc.OpenTracingServerInterceptor
  • ctx new instance, recovery, measure execution time + debug log via own GrpcUnaryServerInterceptor
  • metrics via metrics.UnaryServerInterceptor

func (Telemetry) HttpServerMiddleware

func (t Telemetry) HttpServerMiddleware() func(next http.Handler) http.Handler

HttpServerMiddleware perform: * telemetry log injection * measure execution time * recovery

func (Telemetry) HttpServerMiddlewareAll

func (t Telemetry) HttpServerMiddlewareAll(m metrics.HttpTracker) func(next http.Handler) http.Handler

HttpServerMiddlewareAll represent all essential metrics Execution order:

  • opentracing injection via nethttp.Middleware
  • recovery + measure execution time + debug log via own HttpServerMiddleware
  • metrics via metrics.NewHttpMiddlewareWithOption

func (Telemetry) IsDebug

func (t Telemetry) IsDebug() bool

IsDebug if ENV DEBUG was true

func (Telemetry) M

func (t Telemetry) M() Monitor

M returns monitoring instance

func (*Telemetry) Printf

func (t *Telemetry) Printf(msg string, items ...interface{})

Printf expose fx.Printer interface as debug output

func (*Telemetry) PutFields

func (t *Telemetry) PutFields(fields ...zap.Field) *Telemetry

PutFields update current logger instance with new fields, which would affect only on nest write log call for current tele instance Because reference it also affect context and this approach is covered in Test_telemetry_With

func (*Telemetry) StartMonitor

func (t *Telemetry) StartMonitor()

StartMonitor is blocking operation

func (*Telemetry) StartSpan

func (t *Telemetry) StartSpan(name string, opts ...opentracing.StartSpanOption) (span, context.Context)

StartSpan start absolutely new trace telemetry span keep in mind than that function don't continue any trace, only create new for continue span use StartSpanFromContext

func (Telemetry) T

func (t Telemetry) T() opentracing.Tracer

T returns opentracing instance

func (Telemetry) WithContext

func (t Telemetry) WithContext(ctx context.Context) context.Context

WithContext put new copy of telemetry into context

func (*Telemetry) WithSpan

func (t *Telemetry) WithSpan(s opentracing.Span) span

WithSpan create span logger where we can duplicate messages both tracer and logger Furthermore we create new log instance with trace fields

Directories

Path Synopsis
example
demo/client Module
demo/gin Module
demo/nats Module
Package httpclient implement tel http.client wrapper which help to handle error The most important approach: perform logs by itself
Package httpclient implement tel http.client wrapper which help to handle error The most important approach: perform logs by itself
middleware
chi Module
echo Module
gin Module
gin/example Module
grpc Module
nats Module
natsmw Module
monitoring
plugins
otelsql Module
pgx Module
sqlwrapper Module
propagators
natsprop Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL