forwarder

package
v0.0.0-...-914b764 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2020 License: Apache-2.0 Imports: 24 Imported by: 0

README

package forwarder

This package is responsible for sending payloads to the backend. Payloads can come from different sources in different formats, the forwarder will not inspect them.

The forwarder can receive multiple domains with a list of API keys for each of them. Every payload will be sent to every domain/API keys couple, this became a Transaction. Transactions will be retried on error. The newest transactions will be retried first. Transactions are consumed by Workers asynchronously.

Usage


KeysPerDomains := map[string][]string{
	"http://api.datadog.com": {"my_secret_key_1", "my_secret_key_2"},
	"http://debug.api.com":   {"secret_api"},
}

forwarder := forwarder.NewForwarder(KeysPerDomains)
forwarder.NumberOfWorkers = 1 // default: config.Datadog.GetInt("forwarder_num_workers")
forwarder.Start()

// ...

payload1 := []byte("some payload")
payload2 := []byte("another payload")
forwarder.SubmitSeries(Payloads{&payload1, &payload2}
)

// ...

forwarder.Stop()

Configuration

There are several settings that influence the behavior of the forwarder.

Exponential backoff and circuit breaker settings
  • forwarder_backoff_factor - This controls the overlap between consecutive retry interval ranges. When set to 2, there is a guarantee that there will be no overlap. The overlap will asymptotically approach 50% the higher the value is set. Values less then 2 are verboten as there will be range gaps. Default: 2
  • forwarder_backoff_base - This controls the rate of exponential growth. Also, you can calculate the start of the very first retry interval range by evaluating the following expression: forwarder_backoff_base / forwarder_backoff_factor * 2. Default: 2
  • forwarder_backoff_max - This is the maximum number of seconds to wait for a retry. Default: 64
  • forwarder_recovery_interval - This controls how many retry interval ranges to step down for an endpoint upon success. Default: 2
  • forwarder_recovery_reset - Whether or not a successful request should completely clear an endpoint's error count. Default: false

Internal

The forwarder is composed of multiple parts:

DefaultForwarder

DefaultForwarder it the default implementation of the Forwarder interface (and the only one for now). This class is in charge of receiving payloads, creating the HTTP transactions and distributing them among every domainForwarder.

domainForwarder

The agent can be configured to send the same payload to multiple destinations. Each destination (or domain) can be configured with 1 or more API keys. Every payload will be sent to each domain/API key pair.

A domainForwarder is in charge of sending payloads to one domain. This avoids slowing down every domain when one is down/slow. Each domainForwarder will have a number of dedicated Worker to process Transaction. We process new transactions first and then (when the workers have time) we retry the erroneous ones (newest transactions are retried first).

We start dropping transactions (oldest first) when the number of transactions in the retry queue is bigger than forwarder_retry_queue_max_size (see the agent configuration).

Disclaimer: using multiple API keys with the Datadog backend will multiply your billing ! Most customers will only use one API key.

Worker

A Worker processes transactions coming from 2 queues: HighPrio and LowPrio. New transactions are sent to the HighPrio queue and the ones to retry are sent to LowPrio. A Worker is dedicated to on domain (ie: domainForwarder).

blockedEndpoints (or exponential backoff)

When a transaction fails to be sent to a backend we blacklist that particular endpoints for some time to avoid flooding an unavailable endpoint (the transactions will be retried later). A blacklist is specific to one endpoint on one domain (ie: "http(s):///"). The blacklist time will grow, up to a maximum, has more and more errors are encountered for that endpoint and is gradually cleared when a transaction is successful. The blacklist is shared by all workers.

Transaction

A HTTPTransaction contains every information about a payload and how/where to send it. On failure a transaction will be retried later (see blockedEndpoints).

Documentation

Index

Constants

View Source
const (
	// Stopped represent the internal state of an unstarted Forwarder.
	Stopped uint32 = iota
	// Started represent the internal state of an started Forwarder.
	Started
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultForwarder

type DefaultForwarder struct {
	// NumberOfWorkers Number of concurrent HTTP request made by the DefaultForwarder (default 4).
	NumberOfWorkers int
	// contains filtered or unexported fields
}

DefaultForwarder is the default implementation of the Forwarder.

func NewDefaultForwarder

func NewDefaultForwarder(options *Options) *DefaultForwarder

NewDefaultForwarder returns a new DefaultForwarder.

func (*DefaultForwarder) Start

func (f *DefaultForwarder) Start() error

Start initialize and runs the forwarder.

func (*DefaultForwarder) State

func (f *DefaultForwarder) State() uint32

State returns the internal state of the forwarder (Started or Stopped)

func (*DefaultForwarder) Stop

func (f *DefaultForwarder) Stop()

Stop all the component of a forwarder and free resources

func (*DefaultForwarder) SubmitConnectionChecks

func (f *DefaultForwarder) SubmitConnectionChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitConnectionChecks sends connection checks

func (*DefaultForwarder) SubmitContainerChecks

func (f *DefaultForwarder) SubmitContainerChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitContainerChecks sends container checks

func (*DefaultForwarder) SubmitEvents

func (f *DefaultForwarder) SubmitEvents(payload Payloads, extra http.Header) error

SubmitEvents will send an event type payload to Datadog backend.

func (*DefaultForwarder) SubmitHostMetadata

func (f *DefaultForwarder) SubmitHostMetadata(payload Payloads, extra http.Header) error

SubmitHostMetadata will send a host_metadata tag type payload to Datadog backend.

func (*DefaultForwarder) SubmitMetadata

func (f *DefaultForwarder) SubmitMetadata(payload Payloads, extra http.Header) error

SubmitMetadata will send a metadata type payload to Datadog backend.

func (*DefaultForwarder) SubmitPodChecks

func (f *DefaultForwarder) SubmitPodChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitPodChecks sends pod checks

func (*DefaultForwarder) SubmitProcessChecks

func (f *DefaultForwarder) SubmitProcessChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitProcessChecks sends process checks

func (*DefaultForwarder) SubmitRTContainerChecks

func (f *DefaultForwarder) SubmitRTContainerChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitRTContainerChecks sends real time container checks

func (*DefaultForwarder) SubmitRTProcessChecks

func (f *DefaultForwarder) SubmitRTProcessChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitRTProcessChecks sends real time process checks

func (*DefaultForwarder) SubmitSeries

func (f *DefaultForwarder) SubmitSeries(payload Payloads, extra http.Header) error

SubmitSeries will send a series type payload to Datadog backend.

func (*DefaultForwarder) SubmitServiceChecks

func (f *DefaultForwarder) SubmitServiceChecks(payload Payloads, extra http.Header) error

SubmitServiceChecks will send a service check type payload to Datadog backend.

func (*DefaultForwarder) SubmitSketchSeries

func (f *DefaultForwarder) SubmitSketchSeries(payload Payloads, extra http.Header) error

SubmitSketchSeries will send payloads to Datadog backend - PROTOTYPE FOR PERCENTILE

func (*DefaultForwarder) SubmitV1CheckRuns

func (f *DefaultForwarder) SubmitV1CheckRuns(payload Payloads, extra http.Header) error

SubmitV1CheckRuns will send service checks to v1 endpoint (this will be removed once the backend handles v2 endpoints).

func (*DefaultForwarder) SubmitV1Intake

func (f *DefaultForwarder) SubmitV1Intake(payload Payloads, extra http.Header) error

SubmitV1Intake will send payloads to the universal `/intake/` endpoint used by Agent v.5

func (*DefaultForwarder) SubmitV1Series

func (f *DefaultForwarder) SubmitV1Series(payload Payloads, extra http.Header) error

SubmitV1Series will send timeserie to v1 endpoint (this will be remove once the backend handles v2 endpoints).

type Forwarder

type Forwarder interface {
	Start() error
	Stop()
	SubmitV1Series(payload Payloads, extra http.Header) error
	SubmitV1Intake(payload Payloads, extra http.Header) error
	SubmitV1CheckRuns(payload Payloads, extra http.Header) error
	SubmitSeries(payload Payloads, extra http.Header) error
	SubmitEvents(payload Payloads, extra http.Header) error
	SubmitServiceChecks(payload Payloads, extra http.Header) error
	SubmitSketchSeries(payload Payloads, extra http.Header) error
	SubmitHostMetadata(payload Payloads, extra http.Header) error
	SubmitMetadata(payload Payloads, extra http.Header) error
	SubmitProcessChecks(payload Payloads, extra http.Header) (chan Response, error)
	SubmitRTProcessChecks(payload Payloads, extra http.Header) (chan Response, error)
	SubmitContainerChecks(payload Payloads, extra http.Header) (chan Response, error)
	SubmitRTContainerChecks(payload Payloads, extra http.Header) (chan Response, error)
	SubmitConnectionChecks(payload Payloads, extra http.Header) (chan Response, error)
	SubmitPodChecks(payload Payloads, extra http.Header) (chan Response, error)
}

Forwarder interface allows packages to send payload to the backend

type HTTPAttemptHandler

type HTTPAttemptHandler func(transaction *HTTPTransaction)

HTTPAttemptHandler is an event handler that will get called each time this transaction is attempted

type HTTPCompletionHandler

type HTTPCompletionHandler func(transaction *HTTPTransaction, statusCode int, body []byte, err error)

HTTPCompletionHandler is an event handler that will get called after this transaction has completed

type HTTPTransaction

type HTTPTransaction struct {
	// Domain represents the domain target by the HTTPTransaction.
	Domain string
	// Endpoint is the API Endpoint used by the HTTPTransaction.
	Endpoint string
	// Headers are the HTTP headers used by the HTTPTransaction.
	Headers http.Header
	// Payload is the content delivered to the backend.
	Payload *[]byte
	// ErrorCount is the number of times this HTTPTransaction failed to be processed.
	ErrorCount int
	// contains filtered or unexported fields
}

HTTPTransaction represents one Payload for one Endpoint on one Domain.

func NewHTTPTransaction

func NewHTTPTransaction() *HTTPTransaction

NewHTTPTransaction returns a new HTTPTransaction.

func (*HTTPTransaction) GetCreatedAt

func (t *HTTPTransaction) GetCreatedAt() time.Time

GetCreatedAt returns the creation time of the HTTPTransaction.

func (*HTTPTransaction) GetTarget

func (t *HTTPTransaction) GetTarget() string

GetTarget return the url used by the transaction

func (*HTTPTransaction) Process

func (t *HTTPTransaction) Process(ctx context.Context, client *http.Client) error

Process sends the Payload of the transaction to the right Endpoint and Domain.

type MockedForwarder

type MockedForwarder struct {
	mock.Mock
}

MockedForwarder a mocked forwarder to be use in other module to test their dependencies with the forwarder

func (*MockedForwarder) Start

func (tf *MockedForwarder) Start() error

Start updates the internal mock struct

func (*MockedForwarder) Stop

func (tf *MockedForwarder) Stop()

Stop updates the internal mock struct

func (*MockedForwarder) SubmitConnectionChecks

func (tf *MockedForwarder) SubmitConnectionChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitConnectionChecks mock

func (*MockedForwarder) SubmitContainerChecks

func (tf *MockedForwarder) SubmitContainerChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitContainerChecks mock

func (*MockedForwarder) SubmitEvents

func (tf *MockedForwarder) SubmitEvents(payload Payloads, extra http.Header) error

SubmitEvents updates the internal mock struct

func (*MockedForwarder) SubmitHostMetadata

func (tf *MockedForwarder) SubmitHostMetadata(payload Payloads, extra http.Header) error

SubmitHostMetadata updates the internal mock struct

func (*MockedForwarder) SubmitMetadata

func (tf *MockedForwarder) SubmitMetadata(payload Payloads, extra http.Header) error

SubmitMetadata updates the internal mock struct

func (*MockedForwarder) SubmitPodChecks

func (tf *MockedForwarder) SubmitPodChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitPodChecks mock

func (*MockedForwarder) SubmitProcessChecks

func (tf *MockedForwarder) SubmitProcessChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitProcessChecks mock

func (*MockedForwarder) SubmitRTContainerChecks

func (tf *MockedForwarder) SubmitRTContainerChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitRTContainerChecks mock

func (*MockedForwarder) SubmitRTProcessChecks

func (tf *MockedForwarder) SubmitRTProcessChecks(payload Payloads, extra http.Header) (chan Response, error)

SubmitRTProcessChecks mock

func (*MockedForwarder) SubmitSeries

func (tf *MockedForwarder) SubmitSeries(payload Payloads, extra http.Header) error

SubmitSeries updates the internal mock struct

func (*MockedForwarder) SubmitServiceChecks

func (tf *MockedForwarder) SubmitServiceChecks(payload Payloads, extra http.Header) error

SubmitServiceChecks updates the internal mock struct

func (*MockedForwarder) SubmitSketchSeries

func (tf *MockedForwarder) SubmitSketchSeries(payload Payloads, extra http.Header) error

SubmitSketchSeries updates the internal mock struct

func (*MockedForwarder) SubmitV1CheckRuns

func (tf *MockedForwarder) SubmitV1CheckRuns(payload Payloads, extra http.Header) error

SubmitV1CheckRuns updates the internal mock struct

func (*MockedForwarder) SubmitV1Intake

func (tf *MockedForwarder) SubmitV1Intake(payload Payloads, extra http.Header) error

SubmitV1Intake updates the internal mock struct

func (*MockedForwarder) SubmitV1Series

func (tf *MockedForwarder) SubmitV1Series(payload Payloads, extra http.Header) error

SubmitV1Series updates the internal mock struct

type Options

type Options struct {
	NumberOfWorkers      int
	RetryQueueSize       int
	EnableHealthChecking bool
	KeysPerDomain        map[string][]string
}

Options contain the configuration options for the DefaultForwarder

func NewOptions

func NewOptions(keysPerDomain map[string][]string) *Options

NewOptions creates new Options with default values

type Payloads

type Payloads []*[]byte

Payloads is a slice of pointers to byte arrays, an alias for the slices of payloads we pass into the forwarder

type Response

type Response struct {
	Domain     string
	Body       []byte
	StatusCode int
	Err        error
}

Response contains the response details of a successfully posted transaction

type Transaction

type Transaction interface {
	Process(ctx context.Context, client *http.Client) error
	GetCreatedAt() time.Time
	GetTarget() string
}

Transaction represents the task to process for a Worker.

type Worker

type Worker struct {
	// Client the http client used to processed transactions.
	Client *http.Client
	// HighPrio is the channel used to receive high priority transaction from the Forwarder.
	HighPrio <-chan Transaction
	// LowPrio is the channel used to receive low priority transaction from the Forwarder.
	LowPrio <-chan Transaction
	// RequeueChan is the channel used to send failed transaction back to the Forwarder.
	RequeueChan chan<- Transaction
	// contains filtered or unexported fields
}

Worker comsumes Transaction (aka transactions) from the Forwarder and process them. If the transaction fail to be processed the Worker will send it back to the Forwarder to be retried later.

func NewWorker

func NewWorker(highPrioChan <-chan Transaction, lowPrioChan <-chan Transaction, requeueChan chan<- Transaction, blocked *blockedEndpoints) *Worker

NewWorker returns a new worker to consume Transaction from inputChan and push back erroneous ones into requeueChan.

func (*Worker) Start

func (w *Worker) Start()

Start starts a Worker.

func (*Worker) Stop

func (w *Worker) Stop(purgeHighPrio bool)

Stop stops the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL