s3spanstore

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PARQUET_CONCURRENCY = 1
	PARTION_FORMAT      = "2006/01/02/15"
)
View Source
const (
	ATHENA_TIMEFORMAT = "2006-01-02 15:04:05.999"
)

Variables

This section is empty.

Functions

func DecodeSpanPayload added in v0.1.0

func DecodeSpanPayload(payload string) (*model.Span, error)

func EmptyBucket added in v0.1.0

func EmptyBucket(ctx context.Context, svc S3API, bucketName string) error

func EncodeSpanPayload added in v0.0.6

func EncodeSpanPayload(span *model.Span) (string, error)

func RandStringBytes added in v0.0.9

func RandStringBytes(n int) string

func S3ParquetKey added in v0.0.13

func S3ParquetKey(prefix, suffix string, datehour string) string

func S3PartitionKey added in v0.1.4

func S3PartitionKey(t time.Time) string

Types

type AthenaAPI added in v0.3.0

type AthenaAPI interface {
	BatchGetQueryExecution(ctx context.Context, params *athena.BatchGetQueryExecutionInput, optFns ...func(*athena.Options)) (*athena.BatchGetQueryExecutionOutput, error)
	GetQueryExecution(ctx context.Context, params *athena.GetQueryExecutionInput, optFns ...func(*athena.Options)) (*athena.GetQueryExecutionOutput, error)
	GetQueryResults(ctx context.Context, params *athena.GetQueryResultsInput, optFns ...func(*athena.Options)) (*athena.GetQueryResultsOutput, error)
	ListQueryExecutions(ctx context.Context, params *athena.ListQueryExecutionsInput, optFns ...func(*athena.Options)) (*athena.ListQueryExecutionsOutput, error)
	StartQueryExecution(ctx context.Context, params *athena.StartQueryExecutionInput, optFns ...func(*athena.Options)) (*athena.StartQueryExecutionOutput, error)
	StopQueryExecution(ctx context.Context, params *athena.StopQueryExecutionInput, optFns ...func(*athena.Options)) (*athena.StopQueryExecutionOutput, error)
}

type AthenaQueryCache added in v0.3.0

type AthenaQueryCache struct {
	// contains filtered or unexported fields
}

func NewAthenaQueryCache added in v0.3.0

func NewAthenaQueryCache(logger hclog.Logger, svc AthenaAPI, workGroup string) *AthenaQueryCache

func (*AthenaQueryCache) Lookup added in v0.3.0

type DedupeParquetWriter added in v0.4.0

type DedupeParquetWriter struct {
	// contains filtered or unexported fields
}

func NewDedupeParquetWriter added in v0.4.0

func NewDedupeParquetWriter(logger hclog.Logger, dedupeDuration time.Duration, dedupeRewriteBufferDuration time.Duration, dedupeCacheSize int, parquetWriter IParquetWriter) (*DedupeParquetWriter, error)

func (*DedupeParquetWriter) Close added in v0.4.0

func (w *DedupeParquetWriter) Close() error

func (*DedupeParquetWriter) Write added in v0.4.0

func (w *DedupeParquetWriter) Write(ctx context.Context, rowTime time.Time, maxBufferUntil time.Time, row DeduplicatableRow) error

type DeduplicatableRow added in v0.4.2

type DeduplicatableRow interface {
	DedupeKey() string
}

type DependenciesPrefetch added in v0.4.3

type DependenciesPrefetch struct {
	// contains filtered or unexported fields
}

func NewDependenciesPrefetch added in v0.4.3

func NewDependenciesPrefetch(ctx context.Context, logger hclog.Logger, reader ReaderWithDependencies, interval time.Duration, enabled bool) *DependenciesPrefetch

func (*DependenciesPrefetch) Start added in v0.4.3

func (d *DependenciesPrefetch) Start()

func (*DependenciesPrefetch) Stop added in v0.4.3

func (d *DependenciesPrefetch) Stop()

type IParquetWriter added in v0.4.0

type IParquetWriter interface {
	Write(ctx context.Context, time time.Time, maxBufferUntil time.Time, row interface{}) error
	Close() error
}

type OperationRecord added in v0.4.0

type OperationRecord struct {
	OperationName string `parquet:"name=operation_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
	SpanKind      string `parquet:"name=span_kind, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
	ServiceName   string `parquet:"name=service_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
}

OperationRecord contains queryable properties

func NewOperationRecordFromSpan added in v0.4.0

func NewOperationRecordFromSpan(span *model.Span) (*OperationRecord, error)

func (*OperationRecord) DedupeKey added in v0.4.2

func (w *OperationRecord) DedupeKey() string

type ParquetRef added in v0.1.4

type ParquetRef struct {
	// contains filtered or unexported fields
}

type ParquetWriter added in v0.4.0

type ParquetWriter struct {
	// contains filtered or unexported fields
}

func NewParquetWriter added in v0.4.0

func NewParquetWriter(ctx context.Context, logger hclog.Logger, svc S3API, bufferDuration time.Duration, bucketName string, prefix string, rowType interface{}) (*ParquetWriter, error)

func (*ParquetWriter) Close added in v0.4.0

func (w *ParquetWriter) Close() error

func (*ParquetWriter) Write added in v0.4.0

func (w *ParquetWriter) Write(ctx context.Context, time time.Time, maxBufferUntil time.Time, row interface{}) error

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(ctx context.Context, logger hclog.Logger, svc AthenaAPI, cfg config.Athena) (*Reader, error)

func (*Reader) Close added in v1.1.1

func (r *Reader) Close() error

func (*Reader) DefaultMaxTime added in v0.1.4

func (r *Reader) DefaultMaxTime() time.Time

func (*Reader) DefaultMinTime added in v0.1.4

func (r *Reader) DefaultMinTime() time.Time

func (*Reader) FindTraceIDs

func (r *Reader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error)

func (*Reader) FindTraces

func (r *Reader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error)

func (*Reader) GetDependencies added in v0.1.4

func (r *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)

func (*Reader) GetOperations

func (*Reader) GetServices

func (s *Reader) GetServices(ctx context.Context) ([]string, error)

func (*Reader) GetTrace

func (s *Reader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error)

type ReaderWithDependencies added in v0.4.3

type ReaderWithDependencies interface {
	GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)
}

type S3API added in v0.0.9

type S3API interface {
	PutObject(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error)
	UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
	CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
	CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
	AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
	GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
	HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error)
	DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
	ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
}

type SpanRecord

type SpanRecord struct {
	TraceID       string            `parquet:"name=trace_id, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	SpanID        string            `parquet:"name=span_id, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	OperationName string            `parquet:"name=operation_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
	SpanKind      string            `parquet:"name=span_kind, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
	StartTime     int64             `parquet:"name=start_time, type=INT64"`
	Duration      int64             `parquet:"name=duration, type=INT64"`
	Tags          map[string]string `` /* 138-byte string literal not displayed */
	ServiceName   string            `parquet:"name=service_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`

	// TODO: Write binary
	SpanPayload string                 `parquet:"name=span_payload, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	References  []SpanRecordReferences `parquet:"name=references"`
}

SpanRecord contains queryable properties from the span and the span as json payload

func NewSpanRecordFromSpan

func NewSpanRecordFromSpan(span *model.Span) (*SpanRecord, error)

type SpanRecordReferences added in v0.0.2

type SpanRecordReferences struct {
	TraceID string `parquet:"name=trace_id, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	SpanID  string `parquet:"name=span_id, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
	RefType int32  `parquet:"name=ref_type, type=INT32, convertedtype=INT_8"`
}

func NewSpanRecordReferencesFromSpanReferences added in v0.0.2

func NewSpanRecordReferencesFromSpanReferences(span *model.Span) []SpanRecordReferences

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(ctx context.Context, logger hclog.Logger, svc S3API, s3Config config.S3) (*Writer, error)

func (*Writer) Close added in v0.0.3

func (w *Writer) Close() error

func (*Writer) WriteSpan

func (w *Writer) WriteSpan(ctx context.Context, span *model.Span) error

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL