Documentation
¶
Index ¶
- Variables
- func ApplyAttributionToLogRecord(er *LogRecord, ctx context.Context)
- func ContextWithAttribution(ctx context.Context, attr *Attribution) context.Context
- func IsValidOrderByField(field RequestOrderByField) bool
- func IsValidResponseSource(s ResponseSource) bool
- func SetLogRecordFieldsFromRequestInfo(er *LogRecord, ri httpf.RequestInfo)
- type Attribution
- type Encryptor
- type FullLog
- type FullLogRequest
- type FullLogResponse
- type FullStore
- type ListFilters
- func (l *ListFilters) AddError(e error)
- func (l *ListFilters) SetConnectionId(u apid.ID)
- func (l *ListFilters) SetConnectorId(u apid.ID)
- func (l *ListFilters) SetConnectorType(t string)
- func (l *ListFilters) SetConnectorVersion(v uint64)
- func (l *ListFilters) SetCorrelationId(correlationId string)
- func (l *ListFilters) SetLabelSelector(selector string)
- func (l *ListFilters) SetLimit(limit int32)
- func (l *ListFilters) SetMethod(method string)
- func (l *ListFilters) SetNamespaceMatcher(matcher string) error
- func (l *ListFilters) SetNamespaceMatchers(matchers []string) error
- func (l *ListFilters) SetOrderBy(field RequestOrderByField, by pagination.OrderBy)
- func (l *ListFilters) SetPath(path string)
- func (l *ListFilters) SetPathRegex(r string) error
- func (l *ListFilters) SetRateLimitId(id apid.ID)
- func (l *ListFilters) SetRequestType(requestType httpf.RequestType)
- func (l *ListFilters) SetResponseSource(s ResponseSource)
- func (l *ListFilters) SetStatusCode(s int)
- func (l *ListFilters) SetStatusCodeRangeInclusive(start, end int)
- func (l *ListFilters) SetTimestampRange(start, end time.Time)
- func (l *ListFilters) Validate() error
- type ListRequestBuilder
- type ListRequestExecutor
- type LogRecord
- type LogRetriever
- type MillisecondDuration
- type RateLimitMatch
- type RecordRetriever
- func NewClickhouseRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, ...) RecordRetriever
- func NewRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, ...) RecordRetriever
- func NewSqlRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, ...) RecordRetriever
- type RecordStore
- func NewBufferedStore(inner RecordStore, flushBatchSize int, flushInterval time.Duration) RecordStore
- func NewClickhouseRecordStore(cfg *config.Database, logger *slog.Logger, dbOpts ...database.Option) RecordStore
- func NewRecordStore(cfg *config.Database, logger *slog.Logger, dbOpts ...database.Option) RecordStore
- func NewSqlRecordStore(cfg *config.Database, logger *slog.Logger, opts ...database.Option) RecordStore
- type RequestOrderByField
- type ResponseSource
- type RoundTripper
- type StorageService
- func (ss *StorageService) GetFullLog(ctx context.Context, id apid.ID) (*FullLog, error)
- func (ss *StorageService) GetRecord(ctx context.Context, id apid.ID) (*LogRecord, error)
- func (ss *StorageService) ListRequestsFromCursor(ctx context.Context, cursor string) (ListRequestExecutor, error)
- func (ss *StorageService) Migrate(ctx context.Context) error
- func (ss *StorageService) NewListRequestsBuilder() ListRequestBuilder
- func (ss *StorageService) NewRoundTripper(ri httpf.RequestInfo, transport http.RoundTripper) http.RoundTripper
- func (ss *StorageService) Ping(ctx context.Context) bool
Constants ¶
This section is empty.
Variables ¶
var ErrNotFound = errors.New("record not found")
ErrNotFound is returned when a log record is not found that was requested. This isn't necessarily a bad request as the record may have expired due to TTL.
Functions ¶
func ApplyAttributionToLogRecord ¶
ApplyAttributionToLogRecord stamps the LogRecord with whatever the proxy stack recorded on the request context — the response source plus, when a rate-limit resource matched, the rule id / mode / bucket / full match set. When no attribution was installed (older code paths or non-proxy traffic), defaults the source to ResponseSourceUpstream so the column is always populated.
Designed as a separate call from SetLogRecordFieldsFromRequestInfo so the existing signature isn't disturbed; the request-log round-tripper invokes both in sequence.
func ContextWithAttribution ¶
func ContextWithAttribution(ctx context.Context, attr *Attribution) context.Context
ContextWithAttribution returns a context that carries attr. The proxy bootstrap installs an empty Attribution on the request context before the round-tripper chain runs; middlewares mutate the struct in place.
func IsValidOrderByField ¶
func IsValidOrderByField(field RequestOrderByField) bool
func IsValidResponseSource ¶
func IsValidResponseSource(s ResponseSource) bool
IsValidResponseSource reports whether s is a recognised ResponseSource. Unknown values returned from old DB rows or from forward-compatible readers are not coerced — callers can choose to treat them as upstream or to log a warning.
func SetLogRecordFieldsFromRequestInfo ¶
func SetLogRecordFieldsFromRequestInfo(er *LogRecord, ri httpf.RequestInfo)
Types ¶
type Attribution ¶
type Attribution struct {
Source ResponseSource
RateLimitId apid.ID
RateLimitMode string
RateLimitBucket map[string]string
RateLimitMatched []RateLimitMatch
}
Attribution carries per-request log attribution that the proxy stack populates as the request flows through it. The rate-limit round-tripper writes Source when it short-circuits with a synthetic 429; the (future) rate-limit resource enforcement writes the RateLimit* fields when a rule fires. The request-log round-tripper reads the value on its way back to populate LogRecord.
Use a pointer in the context so middlewares writing to the same Attribution see each other's edits — the pointer doesn't change, the struct fields do.
func AttributionFromContext ¶
func AttributionFromContext(ctx context.Context) *Attribution
AttributionFromContext returns the Attribution carried by ctx, or nil when no proxy middleware has installed one. Returning nil rather than a zero-value struct lets callers distinguish "nothing was stamped" from "stamped but blank" — the LogRecord-population path treats nil as ResponseSourceUpstream.
type Encryptor ¶
type Encryptor interface {
EncryptForNamespace(ctx context.Context, namespacePath string, data []byte) (encfield.EncryptedField, error)
Decrypt(ctx context.Context, ef encfield.EncryptedField) ([]byte, error)
}
Encryptor provides namespace-scoped encryption/decryption for blob storage. Satisfied by encrypt.E via duck typing.
type FullLog ¶
type FullLog struct {
Id apid.ID `json:"id"`
Namespace string `json:"ns"`
CorrelationID string `json:"cid"`
Timestamp time.Time `json:"ts"`
MillisecondDuration MillisecondDuration `json:"dur"`
Full bool `json:"full,omitempty"`
InternalTimeout bool `json:"to,omitempty"`
RequestCancelled bool `json:"rc,omitempty"`
Request FullLogRequest `json:"req"`
Response FullLogResponse `json:"res"`
}
func NewFullLogFromRecord ¶
func (*FullLog) GetNamespace ¶
type FullLogRequest ¶
type FullLogResponse ¶
type FullStore ¶
type ListFilters ¶
type ListFilters struct {
LimitVal int32 `json:"limit"`
Offset int32 `json:"offset"`
OrderByFieldVal *RequestOrderByField `json:"order_by_field"`
OrderByVal *pagination.OrderBy `json:"order_by"`
RequestType *string `json:"request_type,omitempty"`
CorrelationId *string `json:"correlation_id,omitempty"`
ConnectionId *apid.ID `json:"connection_id,omitempty"`
ConnectorType *string `json:"connector_type,omitempty"`
ConnectorId *apid.ID `json:"connector_id,omitempty"`
ConnectorVersion *uint64 `json:"connector_version,omitempty"`
Method *string `json:"method,omitempty"`
StatusCodeRangeInclusive []int `json:"status_code_range,omitempty"`
TimestampRange []time.Time `json:"timestamp_range,omitempty"`
Path *string `json:"path,omitempty"`
PathRegex *string `json:"path_regex,omitempty"`
NamespaceMatchers []string `json:"namespace_matchers,omitempty"`
LabelSelector *string `json:"label_selector,omitempty"`
ResponseSource *string `json:"response_source,omitempty"`
RateLimitId *apid.ID `json:"rate_limit_id,omitempty"`
Errors *multierror.Error `json:"-"`
}
ListFilters holds the filter, pagination, and ordering data for list requests. This struct is provider-agnostic and is embedded by provider-specific list builders.
func (*ListFilters) AddError ¶
func (l *ListFilters) AddError(e error)
func (*ListFilters) SetConnectionId ¶
func (l *ListFilters) SetConnectionId(u apid.ID)
func (*ListFilters) SetConnectorId ¶
func (l *ListFilters) SetConnectorId(u apid.ID)
func (*ListFilters) SetConnectorType ¶
func (l *ListFilters) SetConnectorType(t string)
func (*ListFilters) SetConnectorVersion ¶
func (l *ListFilters) SetConnectorVersion(v uint64)
func (*ListFilters) SetCorrelationId ¶
func (l *ListFilters) SetCorrelationId(correlationId string)
func (*ListFilters) SetLabelSelector ¶
func (l *ListFilters) SetLabelSelector(selector string)
func (*ListFilters) SetLimit ¶
func (l *ListFilters) SetLimit(limit int32)
func (*ListFilters) SetMethod ¶
func (l *ListFilters) SetMethod(method string)
func (*ListFilters) SetNamespaceMatcher ¶
func (l *ListFilters) SetNamespaceMatcher(matcher string) error
func (*ListFilters) SetNamespaceMatchers ¶
func (l *ListFilters) SetNamespaceMatchers(matchers []string) error
func (*ListFilters) SetOrderBy ¶
func (l *ListFilters) SetOrderBy(field RequestOrderByField, by pagination.OrderBy)
func (*ListFilters) SetPath ¶
func (l *ListFilters) SetPath(path string)
func (*ListFilters) SetPathRegex ¶
func (l *ListFilters) SetPathRegex(r string) error
func (*ListFilters) SetRateLimitId ¶
func (l *ListFilters) SetRateLimitId(id apid.ID)
func (*ListFilters) SetRequestType ¶
func (l *ListFilters) SetRequestType(requestType httpf.RequestType)
func (*ListFilters) SetResponseSource ¶
func (l *ListFilters) SetResponseSource(s ResponseSource)
func (*ListFilters) SetStatusCode ¶
func (l *ListFilters) SetStatusCode(s int)
func (*ListFilters) SetStatusCodeRangeInclusive ¶
func (l *ListFilters) SetStatusCodeRangeInclusive(start, end int)
func (*ListFilters) SetTimestampRange ¶
func (l *ListFilters) SetTimestampRange(start, end time.Time)
func (*ListFilters) Validate ¶
func (l *ListFilters) Validate() error
type ListRequestBuilder ¶
type ListRequestBuilder interface {
ListRequestExecutor
Limit(int32) ListRequestBuilder
OrderBy(RequestOrderByField, pagination.OrderBy) ListRequestBuilder
WithNamespaceMatcher(matcher string) ListRequestBuilder
WithNamespaceMatchers(matchers []string) ListRequestBuilder
WithRequestType(requestType httpf.RequestType) ListRequestBuilder
WithCorrelationId(correlationId string) ListRequestBuilder
WithConnectionId(u apid.ID) ListRequestBuilder
WithConnectorType(t string) ListRequestBuilder
WithConnectorId(u apid.ID) ListRequestBuilder
WithConnectorVersion(v uint64) ListRequestBuilder
WithMethod(method string) ListRequestBuilder
WithStatusCode(s int) ListRequestBuilder
WithStatusCodeRangeInclusive(start, end int) ListRequestBuilder
WithParsedStatusCodeRange(r string) (ListRequestBuilder, error)
WithPath(path string) ListRequestBuilder
WithPathRegex(r string) (ListRequestBuilder, error)
WithTimestampRange(start, end time.Time) ListRequestBuilder
WithParsedTimestampRange(r string) (ListRequestBuilder, error)
WithLabelSelector(selector string) (ListRequestBuilder, error)
WithResponseSource(s ResponseSource) ListRequestBuilder
WithRateLimitId(id apid.ID) ListRequestBuilder
}
type ListRequestExecutor ¶
type ListRequestExecutor interface {
FetchPage(ctx context.Context) pagination.PageResult[*LogRecord]
Enumerate(ctx context.Context, callback pagination.EnumerateCallback[*LogRecord]) error
}
type LogRecord ¶
type LogRecord struct {
Namespace string `json:"namespace"`
Type httpf.RequestType `json:"type"`
RequestId apid.ID `json:"request_id"`
CorrelationId string `json:"correlation_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
MillisecondDuration MillisecondDuration `json:"duration"`
ConnectionId apid.ID `json:"connection_id,omitempty"`
ConnectorId apid.ID `json:"connector_id,omitempty"`
ConnectorVersion uint64 `json:"connector_version,omitempty"`
Method string `json:"method"`
Host string `json:"host"`
Scheme string `json:"scheme"`
Path string `json:"path"`
RequestHttpVersion string `json:"request_http_version,omitempty"`
RequestSizeBytes int64 `json:"request_size_bytes,omitempty"`
RequestMimeType string `json:"request_mime_type,omitempty"`
ResponseStatusCode int `json:"response_status_code,omitempty"`
ResponseError string `json:"response_error,omitempty"`
ResponseHttpVersion string `json:"response_http_version,omitempty"`
ResponseSizeBytes int64 `json:"response_size_bytes,omitempty"`
ResponseMimeType string `json:"response_mime_type,omitempty"`
InternalTimeout bool `json:"internal_timeout,omitempty"`
RequestCancelled bool `json:"request_cancelled,omitempty"`
FullRequestRecorded bool `json:"full_request_recorded,omitempty"`
Labels database.Labels `json:"labels,omitempty"`
// ResponseSource identifies who produced the response. Defaults to
// ResponseSourceUpstream so historical entries — and any non-429
// response — keep the obvious meaning. See attribution.go.
ResponseSource ResponseSource `json:"response_source,omitempty"`
// RateLimitId, RateLimitMode, RateLimitBucket are populated when a
// proxy-side RateLimit resource matched the request, regardless of
// whether it was the firing rule or just a logged observation. The
// connector-level reactive limiter does not populate these (it has
// no rule id).
RateLimitId apid.ID `json:"rate_limit_id,omitempty"`
RateLimitMode string `json:"rate_limit_mode,omitempty"`
RateLimitBucket map[string]string `json:"rate_limit_bucket,omitempty"`
// RateLimitMatched is the full set of rate-limit rules that matched
// this request — the firing rule plus any observe-mode matches. Lets
// operators see *every* rule that contributed to the decision, not
// just the one that ultimately rejected the request.
RateLimitMatched []RateLimitMatch `json:"rate_limit_matched,omitempty"`
}
LogRecord represents a record of an HTTP request as is stored in the request log. This data is redacted to avoid containing sensitive information like information in headers. For a given record, the full request may be stored as well, which would correspond to the data in the Entry struct.
JSON tagging on this struct is used so the same data structure can be passed directly to endpoint responses. It is not use for internal storage.
func (*LogRecord) GetNamespace ¶
type LogRetriever ¶
type LogRetriever interface {
GetFullLog(ctx context.Context, id apid.ID) (*FullLog, error)
NewListRequestsBuilder() ListRequestBuilder
ListRequestsFromCursor(ctx context.Context, cursor string) (ListRequestExecutor, error)
}
LogRetriever is an interface for retrieving logs. Used by the API to retrieve logs.
type MillisecondDuration ¶
MillisecondDuration is a wrapper around time.Duration for JSON marshaling as milliseconds
func (MillisecondDuration) Duration ¶
func (d MillisecondDuration) Duration() time.Duration
func (MillisecondDuration) MarshalJSON ¶
func (d MillisecondDuration) MarshalJSON() ([]byte, error)
MarshalJSON implements the json.Marshaler interface
func (MillisecondDuration) String ¶
func (d MillisecondDuration) String() string
func (*MillisecondDuration) UnmarshalJSON ¶
func (d *MillisecondDuration) UnmarshalJSON(b []byte) error
UnmarshalJSON implements the json.Unmarshaler interface
type RateLimitMatch ¶
type RateLimitMatch struct {
Id apid.ID `json:"id"`
Mode string `json:"mode"`
Bucket map[string]string `json:"bucket,omitempty"`
}
RateLimitMatch describes a single rate-limit rule that matched the request. The full set of matches is captured in LogRecord.RateLimitMatched so observers can see every rule that fired (in either enforce or observe mode), not just the most-restrictive one that ultimately decided the request's fate.
type RecordRetriever ¶
type RecordRetriever interface {
// GetRecord retrieves a single LogRecord by its request ID.
GetRecord(ctx context.Context, id apid.ID) (*LogRecord, error)
// NewListRequestsBuilder creates a new builder for listing entry records with filters.
NewListRequestsBuilder() ListRequestBuilder
// ListRequestsFromCursor resumes a paginated listing from a cursor string.
ListRequestsFromCursor(ctx context.Context, cursor string) (ListRequestExecutor, error)
}
RecordRetriever handles querying LogRecord metadata from a storage backend.
func NewClickhouseRecordRetriever ¶
func NewClickhouseRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, logger *slog.Logger, dbOpts ...database.Option) RecordRetriever
func NewRecordRetriever ¶
func NewRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, logger *slog.Logger, dbOpts ...database.Option) RecordRetriever
NewRecordRetriever creates an RecordRetriever based on the HttpLogging configuration.
func NewSqlRecordRetriever ¶
func NewSqlRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, logger *slog.Logger, opts ...database.Option) RecordRetriever
type RecordStore ¶
type RecordStore interface {
// StoreRecord persists a LogRecord to the storage backend.
StoreRecord(ctx context.Context, record *LogRecord) error
// StoreRecords persists multiple LogRecord to the storage backend.
StoreRecords(ctx context.Context, records []*LogRecord) error
}
RecordStore handles persisting LogRecord metadata to a storage backend.
func NewBufferedStore ¶
func NewBufferedStore(inner RecordStore, flushBatchSize int, flushInterval time.Duration) RecordStore
func NewRecordStore ¶
func NewRecordStore(cfg *config.Database, logger *slog.Logger, dbOpts ...database.Option) RecordStore
NewRecordStore creates an RecordStore based on the HttpLogging configuration. dbOpts are forwarded to the underlying instrumented DB constructor — callers pass database.WithTelemetry(...) to enable spans + metrics on the request-log database tier.
func NewSqlRecordStore ¶
type RequestOrderByField ¶
type RequestOrderByField string
const ( RequestOrderByTimestamp RequestOrderByField = "timestamp" RequestOrderByType RequestOrderByField = "type" RequestOrderByCorrelationId RequestOrderByField = "correlation_id" RequestOrderByConnectionId RequestOrderByField = "connection_id" RequestOrderByConnectorType RequestOrderByField = "connector_type" RequestOrderByConnectorId RequestOrderByField = "connector_id" RequestOrderByMethod RequestOrderByField = "method" RequestOrderByPath RequestOrderByField = "path" RequestOrderByResponseStatusCode RequestOrderByField = "response_status_code" RequestOrderByConnectorVersion RequestOrderByField = "connector_version" RequestOrderByNamespace RequestOrderByField = "namespace" )
type ResponseSource ¶
type ResponseSource string
ResponseSource identifies *who* produced the response that a LogRecord captures. Until this PR every 429 in the log looked the same — a 3rd party rate-limiting us was indistinguishable from the connector-level reactive backoff returning a synthetic 429. Stamping this field at the point of synthesis closes that gap and reserves a value for the upcoming proxy-side rate-limit resource enforcement (#223).
const ( // ResponseSourceUpstream means the response (including any 429) came // from the 3rd-party service — the default and historically the only // possibility. ResponseSourceUpstream ResponseSource = "upstream" // ResponseSourceConnectorRateLimiter means the connector-level reactive // limiter short-circuited the request because the connection is in // cool-down from a prior real upstream 429. No upstream call was made // for this attempt. ResponseSourceConnectorRateLimiter ResponseSource = "connector_rate_limiter" // ResponseSourceRateLimit is reserved for the proxy-side rate-limit // resource enforcement (#223). Defining it here keeps the schema // stable across the two PRs; population happens in #223. ResponseSourceRateLimit ResponseSource = "rate_limit" )
type RoundTripper ¶
type RoundTripper struct {
// contains filtered or unexported fields
}
type StorageService ¶
type StorageService struct {
// contains filtered or unexported fields
}
func NewStorageService ¶
func NewStorageService( ctx context.Context, cfg *config.HttpLogging, cursorEncryptor pagination.CursorEncryptor, encryptor Encryptor, logger *slog.Logger, dbOpts ...database.Option, ) (*StorageService, error)
NewStorageService that will store the log records and the full request/response. dbOpts are forwarded to the underlying DB constructors — pass database.WithTelemetry(...) to instrument the request-log database tier.
func (*StorageService) GetFullLog ¶
func (*StorageService) ListRequestsFromCursor ¶
func (ss *StorageService) ListRequestsFromCursor(ctx context.Context, cursor string) (ListRequestExecutor, error)
ListRequestsFromCursor resumes a paginated listing from a cursor string.
func (*StorageService) Migrate ¶
func (ss *StorageService) Migrate(ctx context.Context) error
Migrate runs any necessary schema migrations for the storage backend.
func (*StorageService) NewListRequestsBuilder ¶
func (ss *StorageService) NewListRequestsBuilder() ListRequestBuilder
NewListRequestsBuilder creates a new builder for listing entry records with filters.
func (*StorageService) NewRoundTripper ¶
func (ss *StorageService) NewRoundTripper(ri httpf.RequestInfo, transport http.RoundTripper) http.RoundTripper