forwarder

package
v0.0.0-...-1d9613f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2018 License: Apache-2.0 Imports: 18 Imported by: 0

README

package forwarder

This package is responsible for sending payloads to the backend. Payloads can come from different sources in different format, the forwarder will not inspect them.

The forwarder can receive multiple domains with a list of API keys for each of them. Every payload will be sent to every domain/API keys couple, this became a Transaction. Transactions will be retried on error. The newest transactions will be retried first. Transactions are consumed by Workers asynchronously.

Usage example:


KeysPerDomains := map[string][]string{
	"http://api.datadog.com": {"my_secret_key_1", "my_secret_key_2"},
	"http://debug.api.com":   {"secret_api"},
}

forwarder := forwarder.NewForwarder(KeysPerDomains)
forwarder.NumberOfWorkers = 1 // default 4
forwarder.Start()

// ...

payload := []byte("some payload")
forwarder.SubmitTimeseries(&payload)

// ...

forwarder.Stop()

Documentation

Index

Constants

View Source
const (
	// Stopped represent the internal state of an unstarted Forwarder.
	Stopped uint32 = iota
	// Started represent the internal state of an started Forwarder.
	Started
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultForwarder

type DefaultForwarder struct {

	// NumberOfWorkers Number of concurrent HTTP request made by the DefaultForwarder (default 4).
	NumberOfWorkers int
	// KeysPerDomains are the different keys to use per domain when sending transactions.
	KeysPerDomains map[string][]string
	// contains filtered or unexported fields
}

DefaultForwarder is in charge of receiving transaction payloads and sending them to Datadog backend over HTTP.

func NewDefaultForwarder

func NewDefaultForwarder(KeysPerDomains map[string][]string) *DefaultForwarder

NewDefaultForwarder returns a new DefaultForwarder.

func (*DefaultForwarder) Start

func (f *DefaultForwarder) Start() error

Start starts a DefaultForwarder.

func (*DefaultForwarder) State

func (f *DefaultForwarder) State() uint32

State returns the internal state of the DefaultForwarder (either Started or Stopped).

func (*DefaultForwarder) Stop

func (f *DefaultForwarder) Stop()

Stop stops a DefaultForwarder, all transactions not yet flushed will be lost.

func (*DefaultForwarder) SubmitEvents

func (f *DefaultForwarder) SubmitEvents(payload Payloads, extra http.Header) error

SubmitEvents will send an event type payload to Datadog backend.

func (*DefaultForwarder) SubmitHostMetadata

func (f *DefaultForwarder) SubmitHostMetadata(payload Payloads, extra http.Header) error

SubmitHostMetadata will send a host_metadata tag type payload to Datadog backend.

func (*DefaultForwarder) SubmitMetadata

func (f *DefaultForwarder) SubmitMetadata(payload Payloads, extra http.Header) error

SubmitMetadata will send a metadata type payload to Datadog backend.

func (*DefaultForwarder) SubmitSeries

func (f *DefaultForwarder) SubmitSeries(payload Payloads, extra http.Header) error

SubmitSeries will send a series type payload to Datadog backend.

func (*DefaultForwarder) SubmitServiceChecks

func (f *DefaultForwarder) SubmitServiceChecks(payload Payloads, extra http.Header) error

SubmitServiceChecks will send a service check type payload to Datadog backend.

func (*DefaultForwarder) SubmitSketchSeries

func (f *DefaultForwarder) SubmitSketchSeries(payload Payloads, extra http.Header) error

SubmitSketchSeries will send payloads to Datadog backend - PROTOTYPE FOR PERCENTILE

func (*DefaultForwarder) SubmitV1CheckRuns

func (f *DefaultForwarder) SubmitV1CheckRuns(payload Payloads, extra http.Header) error

SubmitV1CheckRuns will send service checks to v1 endpoint (this will be removed once the backend handles v2 endpoints).

func (*DefaultForwarder) SubmitV1Intake

func (f *DefaultForwarder) SubmitV1Intake(payload Payloads, extra http.Header) error

SubmitV1Intake will send payloads to the universal `/intake/` endpoint used by Agent v.5

func (*DefaultForwarder) SubmitV1Series

func (f *DefaultForwarder) SubmitV1Series(payload Payloads, extra http.Header) error

SubmitV1Series will send timeserie to v1 endpoint (this will be remove once the backend handles v2 endpoints).

type Forwarder

type Forwarder interface {
	Start() error
	Stop()
	SubmitV1Series(payload Payloads, extra http.Header) error
	SubmitV1Intake(payload Payloads, extra http.Header) error
	SubmitV1CheckRuns(payload Payloads, extra http.Header) error
	SubmitSeries(payload Payloads, extra http.Header) error
	SubmitEvents(payload Payloads, extra http.Header) error
	SubmitServiceChecks(payload Payloads, extra http.Header) error
	SubmitSketchSeries(payload Payloads, extra http.Header) error
	SubmitHostMetadata(payload Payloads, extra http.Header) error
	SubmitMetadata(payload Payloads, extra http.Header) error
}

Forwarder implements basic interface - useful for testing

type HTTPTransaction

type HTTPTransaction struct {
	// Domain represents the domain target by the HTTPTransaction.
	Domain string
	// Endpoint is the API Endpoint used by the HTTPTransaction.
	Endpoint string
	// Headers are the HTTP headers used by the HTTPTransaction.
	Headers http.Header
	// Payload is the content delivered to the backend.
	Payload *[]byte
	// ErrorCount is the number of times this HTTPTransaction failed to be processed.
	ErrorCount int
	// contains filtered or unexported fields
}

HTTPTransaction represents one Payload for one Endpoint on one Domain.

func NewHTTPTransaction

func NewHTTPTransaction() *HTTPTransaction

NewHTTPTransaction returns a new HTTPTransaction.

func (*HTTPTransaction) GetCreatedAt

func (t *HTTPTransaction) GetCreatedAt() time.Time

GetCreatedAt returns the creation time of the HTTPTransaction.

func (*HTTPTransaction) GetNextFlush

func (t *HTTPTransaction) GetNextFlush() time.Time

GetNextFlush returns the next time when this HTTPTransaction expect to be processed.

func (*HTTPTransaction) GetTarget

func (t *HTTPTransaction) GetTarget() string

GetTarget return the url used by the transaction

func (*HTTPTransaction) Process

func (t *HTTPTransaction) Process(ctx context.Context, client *http.Client) error

Process sends the Payload of the transaction to the right Endpoint and Domain.

func (*HTTPTransaction) Reschedule

func (t *HTTPTransaction) Reschedule()

Reschedule update nextFlush time according to the number of ErrorCount. This will increase gaps between each retry as the ErrorCount increase.

type MockedForwarder

type MockedForwarder struct {
	mock.Mock
}

MockedForwarder a mocked forwarder to be use in other module to test their dependencies with the forwarder

func (*MockedForwarder) Start

func (tf *MockedForwarder) Start() error

Start updates the internal mock struct

func (*MockedForwarder) Stop

func (tf *MockedForwarder) Stop()

Stop updates the internal mock struct

func (*MockedForwarder) SubmitEvents

func (tf *MockedForwarder) SubmitEvents(payload Payloads, extra http.Header) error

SubmitEvents updates the internal mock struct

func (*MockedForwarder) SubmitHostMetadata

func (tf *MockedForwarder) SubmitHostMetadata(payload Payloads, extra http.Header) error

SubmitHostMetadata updates the internal mock struct

func (*MockedForwarder) SubmitMetadata

func (tf *MockedForwarder) SubmitMetadata(payload Payloads, extra http.Header) error

SubmitMetadata updates the internal mock struct

func (*MockedForwarder) SubmitSeries

func (tf *MockedForwarder) SubmitSeries(payload Payloads, extra http.Header) error

SubmitSeries updates the internal mock struct

func (*MockedForwarder) SubmitServiceChecks

func (tf *MockedForwarder) SubmitServiceChecks(payload Payloads, extra http.Header) error

SubmitServiceChecks updates the internal mock struct

func (*MockedForwarder) SubmitSketchSeries

func (tf *MockedForwarder) SubmitSketchSeries(payload Payloads, extra http.Header) error

SubmitSketchSeries updates the internal mock struct

func (*MockedForwarder) SubmitV1CheckRuns

func (tf *MockedForwarder) SubmitV1CheckRuns(payload Payloads, extra http.Header) error

SubmitV1CheckRuns updates the internal mock struct

func (*MockedForwarder) SubmitV1Intake

func (tf *MockedForwarder) SubmitV1Intake(payload Payloads, extra http.Header) error

SubmitV1Intake updates the internal mock struct

func (*MockedForwarder) SubmitV1Series

func (tf *MockedForwarder) SubmitV1Series(payload Payloads, extra http.Header) error

SubmitV1Series updates the internal mock struct

type Payloads

type Payloads []*[]byte

Payloads is a slice of pointers to byte arrays, an alias for the slices of payloads we pass into the forwarder

type Transaction

type Transaction interface {
	Process(ctx context.Context, client *http.Client) error
	Reschedule()
	GetNextFlush() time.Time
	GetCreatedAt() time.Time
	GetTarget() string
}

Transaction represents the task to process for a Worker.

type Worker

type Worker struct {
	// Client the http client used to processed transactions.
	Client *http.Client
	// HighPrio is the channel used to receive high priority transaction from the Forwarder.
	HighPrio <-chan Transaction
	// LowPrio is the channel used to receive low priority transaction from the Forwarder.
	LowPrio <-chan Transaction
	// RequeueChan is the channel used to send failed transaction back to the Forwarder.
	RequeueChan chan<- Transaction
	// contains filtered or unexported fields
}

Worker comsumes Transaction (aka transactions) from the Forwarder and process them. If the transaction fail to be processed the Worker will send it back to the Forwarder to be retried later.

func NewWorker

func NewWorker(highPrioChan <-chan Transaction, lowPrioChan <-chan Transaction, requeueChan chan<- Transaction, blocked *blockedEndpoints) *Worker

NewWorker returns a new worker to consume Transaction from inputChan and push back erroneous ones into requeueChan.

func (*Worker) Start

func (w *Worker) Start()

Start starts a Worker.

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL