accumulator

package
v1.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrMetadataUnavailable is returned when a lambda data is added to
	// the batch without metadata being set.
	ErrMetadataUnavailable = errors.New("metadata is not yet available")
	// ErrBatchFull signfies that the batch has reached full capacity
	// and cannot accept more entries.
	ErrBatchFull = errors.New("batch is full")
	// ErrInvalidEncoding is returned for any APMData that is encoded
	// with any encoding format
	ErrInvalidEncoding = errors.New("encoded data not supported")
)

Functions

func GetUncompressedBytes

func GetUncompressedBytes(rawBytes []byte, encodingType string) ([]byte, error)

func ProcessMetadata

func ProcessMetadata(data APMData) ([]byte, error)

ProcessMetadata return a byte array containing the Metadata marshaled in JSON In case we want to update the Metadata values, usage of https://github.com/tidwall/sjson is advised

Types

type APMData

type APMData struct {
	Data            []byte
	ContentEncoding string
	AgentInfo       string
}

APMData represents data to be sent to APMServer. `Agent` type data will have `metadata` as ndjson whereas `lambda` type data will be without metadata.

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch manages the data that needs to be shipped to APM Server. It holds all the invocations that have not yet been shipped to the APM Server and is responsible for correlating the invocation with the APM data collected from all sources (logs API & APM Agents). As the batch gets the required data it marks the data ready for shipping to APM Server.

func NewBatch

func NewBatch(maxSize int, maxAge time.Duration) *Batch

NewBatch creates a new BatchData which can accept a maximum number of entries as specified by the arguments.

func (*Batch) AddAgentData

func (b *Batch) AddAgentData(apmData APMData) error

AddAgentData adds a data received from agent. For a specific invocation agent data is always received in the same invocation. All the events extracted from the payload are added to the batch even though the batch might exceed the max size limit, however, if the batch is already full before adding any events then ErrBatchFull is returned.

func (*Batch) AddLambdaData

func (b *Batch) AddLambdaData(d []byte) error

AddLambdaData adds a new entry to the batch. Returns ErrBatchFull if batch has reached its maximum size.

func (*Batch) Count

func (b *Batch) Count() int

Count return the number of APMData entries in batch.

func (*Batch) OnAgentInit

func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error

OnAgentInit caches the transaction ID and the payload for the currently executing invocation as reported by the agent. The payload can contain metadata along with partial transaction. Metadata, if available, will be cached for all future invocation. The agent payload will be used to create a new transaction in an event the actual transaction is not reported by the agent due to unexpected termination.

func (*Batch) OnLambdaLogRuntimeDone

func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) error

OnLambdaLogRuntimeDone prepares the data for the invocation to be shipped to APM Server. It accepts requestID and status of the invocation both of which can be retrieved after parsing `platform.runtimeDone` event.

func (*Batch) OnPlatformReport

func (b *Batch) OnPlatformReport(reqID string) (string, int64, time.Time, error)

OnPlatformReport should be the last event for a request ID. On receiving the platform.report event the batch will cleanup any datastructure for the request ID. It will return some of the function metadata to allow the caller to enrich the report metrics.

func (*Batch) OnShutdown

func (b *Batch) OnShutdown(status string) error

OnShutdown flushes the data for shipping to APM Server by finalizing all the invocation in the batch. If we haven't received a platform.runtimeDone event for an invocation so far we won't be able to recieve it in time thus the status needs to be guessed based on the available information.

func (*Batch) RegisterInvocation

func (b *Batch) RegisterInvocation(
	reqID, functionARN string,
	deadlineMs int64,
	timestamp time.Time,
)

RegisterInvocation registers a new function invocation against its request ID. It also updates the caches for currently executing request ID.

func (*Batch) Reset

func (b *Batch) Reset()

Reset resets the batch to prepare for new set of data

func (*Batch) ShouldShip

func (b *Batch) ShouldShip() bool

ShouldShip indicates when a batch is ready for sending. A batch is marked as ready for flush when one of the below conditions is reached: 1. max size is greater than threshold (90% of maxSize) 2. batch is older than maturity age

func (*Batch) Size

func (b *Batch) Size() int

Size returns the number of invocations cached in the batch.

func (*Batch) ToAPMData

func (b *Batch) ToAPMData() APMData

ToAPMData returns APMData with metadata and the accumulated batch

type Invocation

type Invocation struct {
	// RequestID is the id to identify the invocation.
	RequestID string
	// Timestamp is the time of the invocation.
	Timestamp time.Time
	// DeadlineMs is the function execution deadline.
	DeadlineMs int64
	// FunctionARN requested. Can be different in each invoke that
	// executes the same version.
	FunctionARN string
	// TransactionID is the ID generated for a transaction for the
	// current invocation. It is populated by the request from agent.
	TransactionID string
	// AgentPayload is the partial transaction registered at agent init.
	// It will be used to create a proxy transaction by enriching the
	// payload with data from `platform.runtimeDone` event if agent fails
	// to report the actual transaction.
	AgentPayload []byte
	// TransactionObserved is true if the root transaction ID for the
	// invocation is observed by the extension.
	TransactionObserved bool
	// Finalized tracks if the invocation has been finalized or not.
	Finalized bool
}

Invocation holds data for each function invocation and finalizes the data when `platform.report` type log is received for the specific invocation identified by request ID.

func (*Invocation) MaybeCreateProxyTxn added in v1.4.0

func (inc *Invocation) MaybeCreateProxyTxn(status string, time time.Time) ([]byte, error)

MaybeCreateProxyTxn creates a proxy transaction for an invocation if required. A proxy transaction will be required to be created if the agent has registered a transaction for the invocation but has not sent the corresponding transaction to the extension. The proxy transaction will not be created if the invocation has already been finalized or the agent has reported the transaction.

func (*Invocation) NeedProxyTransaction

func (inc *Invocation) NeedProxyTransaction() bool

NeedProxyTransaction returns true if a proxy transaction needs to be created based on the information available.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL