athena

package
v0.0.0-...-5c79d48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2024 License: AGPL-3.0 Imports: 58 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewPublisher

func NewPublisher(cfg PublisherConfig) *publisher

NewPublisher returns new instance of publisher.

Types

type AthenaContext

type AthenaContext struct {
	Database string

	TableName string

	S3ResultsLocation string
	// contains filtered or unexported fields
}

func SetupAthenaContext

func SetupAthenaContext(t *testing.T, ctx context.Context, cfg AthenaContextConfig) *AthenaContext

func (*AthenaContext) Close

func (ac *AthenaContext) Close(t *testing.T)

func (*AthenaContext) GetLog

func (a *AthenaContext) GetLog() *Log

type AthenaContextConfig

type AthenaContextConfig struct {
	MaxBatchSize int
}

AthenaContextConfig is optional config to override defaults in athena context.

type Config

type Config struct {
	// Region is where Athena, SQS and SNS lives (required).
	Region string

	// TopicARN where to emit events in SNS (required).
	TopicARN string
	// LargeEventsS3 is location on S3 where temporary large events (>256KB)
	// are stored before converting it to Parquet and moving to long term
	// storage (required).
	LargeEventsS3 string

	// Database is name of Glue Database that Athena will query against (required).
	Database string
	// TableName is name of Glue Table that Athena will query against (required).
	TableName string
	// LocationS3 is location on S3 where Parquet files partitioned by date are
	// stored (required).
	LocationS3 string

	// QueryResultsS3 is location on S3 where Athena stored query results (optional).
	// Default results path can be defined by in workgroup settings.
	QueryResultsS3 string
	// Workgroup is Glue workgroup where Athena queries are executed (optional).
	Workgroup string
	// GetQueryResultsInterval is used to define how long query will wait before
	// checking again for results status if previous status was not ready (optional).
	GetQueryResultsInterval time.Duration
	// DisableSearchCostOptimization is used to opt-out from search cost optimization
	// used for paginated queries (optional). Default is enabled.
	DisableSearchCostOptimization bool

	// LimiterRefillTime determines the duration of time between the addition of tokens to the bucket (optional).
	LimiterRefillTime time.Duration
	// LimiterRefillAmount is the number of tokens that are added to the bucket during interval
	// specified by LimiterRefillTime (optional).
	LimiterRefillAmount int
	// Burst defines number of available tokens. It's initially full and refilled
	// based on LimiterRefillAmount and LimiterRefillTime (optional).
	LimiterBurst int

	// QueueURL is URL of SQS, which is set as subscriber to SNS topic (required).
	QueueURL string
	// BatchMaxItems defines how many items can be stored in single Parquet
	// batch (optional).
	// It's soft limit.
	BatchMaxItems int
	// BatchMaxInterval defined interval at which parquet files will be created (optional).
	BatchMaxInterval time.Duration

	// Clock is a clock interface, used in tests.
	Clock clockwork.Clock
	// UIDGenerator is unique ID generator.
	UIDGenerator utils.UID
	// LogEntry is a log entry.
	LogEntry *log.Entry

	// PublisherConsumerAWSConfig is an AWS config which can be used to
	// construct AWS Clients using aws-sdk-go-v2, used by the publisher and
	// consumer components which publish/consume events from SQS (and S3 for
	// large events). These are always hosted on Teleport cloud infra even when
	// External Audit Storage is enabled, any events written here are only held
	// temporarily while they are queued to write to s3 parquet files in
	// batches.
	PublisherConsumerAWSConfig *aws.Config

	// StorerQuerierAWSConfig is an AWS config which can be used to construct AWS Clients
	// using aws-sdk-go-v2, used by the consumer (store phase) and the querier.
	// Often it is the same as PublisherConsumerAWSConfig unless External Audit
	// Storage is enabled, then this will authenticate and connect to
	// external/customer AWS account.
	StorerQuerierAWSConfig *aws.Config

	Backend backend.Backend

	// Tracer is used to create spans
	Tracer oteltrace.Tracer

	// ObserveWriteEventsError will be called with every error encountered
	// writing events to S3.
	ObserveWriteEventsError func(error)
	// contains filtered or unexported fields
}

Config structure represents Athena configuration. Right now the only way to set config is via url params.

func (*Config) CheckAndSetDefaults

func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error

CheckAndSetDefaults is a helper returns an error if the supplied configuration is not enough to setup Athena based audit log.

func (*Config) SetFromURL

func (cfg *Config) SetFromURL(url *url.URL) error

SetFromURL establishes values on an EventsConfig from the supplied URI

func (*Config) UpdateForExternalAuditStorage

func (cfg *Config) UpdateForExternalAuditStorage(ctx context.Context, externalAuditStorage *externalauditstorage.Configurator) error

type EventuallyConsistentAuditLogger

type EventuallyConsistentAuditLogger struct {
	Inner events.AuditLogger

	// QueryDelay specifies how long query should wait after last emit event.
	QueryDelay time.Duration
	// contains filtered or unexported fields
}

EventuallyConsistentAuditLogger is used to add delay before searching for events for eventually consistent audit loggers.

func (*EventuallyConsistentAuditLogger) Close

func (*EventuallyConsistentAuditLogger) EmitAuditEvent

func (*EventuallyConsistentAuditLogger) SearchEvents

func (*EventuallyConsistentAuditLogger) SearchSessionEvents

type InfraOutputs

type InfraOutputs struct {
	TopicARN string
	QueueURL string
	Region   string
}

type Log

type Log struct {
	// contains filtered or unexported fields
}

Log is an events storage backend.

It's using SNS for emitting events. SQS is used as subscriber for SNS topic. Consumer uses SQS to read multiple events, create batch, convert it to Parquet and send it to S3 for long term storage. Athena is used for quering Parquet files on S3.

func New

func New(ctx context.Context, cfg Config) (*Log, error)

New creates an instance of an Athena based audit log.

func (*Log) Close

func (l *Log) Close() error

func (*Log) EmitAuditEvent

func (l *Log) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) error

func (*Log) SearchEvents

func (l *Log) SearchEvents(ctx context.Context, req events.SearchEventsRequest) ([]apievents.AuditEvent, string, error)

func (*Log) SearchSessionEvents

func (l *Log) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) ([]apievents.AuditEvent, string, error)

type PublisherConfig

type PublisherConfig struct {
	TopicARN      string
	SNSPublisher  snsPublisher
	Uploader      s3uploader
	PayloadBucket string
	PayloadPrefix string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL