amplitude

package module
v0.0.0-...-67ab311 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2026 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AMPLITUDE_API_URL     = "https://amplitude.com/api/2/export"
	AMPLITUDE_TIME_FORMAT = "20060102T15"
)
View Source
const (
	ENV_DESTINATION_SCHEMA_NAME = "DESTINATION_SCHEMA_NAME"

	ENV_API_KEY    = "SOURCE_AMPLITUDE_API_KEY"
	ENV_SECRET_KEY = "SOURCE_AMPLITUDE_SECRET_KEY"
	ENV_START_DATE = "SOURCE_AMPLITUDE_START_DATE"

	DEFAULT_START_DATE = "2025-01-01"
)
View Source
const (
	PAGINATION_TIME_INTERVAL = time.Hour
	AMPLITUDE_DATA_DELAY     = time.Hour // Amplitude data is available for export after an up to 2-hour delay (1 full hour + truncated). https://amplitude.com/docs/apis/analytics/export

	CURSOR_COLUMN_NAME = "server_upload_time"

	COMPRESSION_FACTOR = 2 // 1 GB uncompressed data x 2 = ~90MB compressed data
)
View Source
const (
	EVENTS_TABLE_NAME = "events"
)

Variables

This section is empty.

Functions

func EventsIcebergSchemaColumns

func EventsIcebergSchemaColumns(config *common.CommonConfig) []*common.IcebergSchemaColumn

func RegisterFlags

func RegisterFlags()

Types

type Amplitude

type Amplitude struct {
	Config     *Config
	HttpClient *http.Client
}

func NewAmplitude

func NewAmplitude(config *Config) *Amplitude

func (*Amplitude) Export

func (amplitude *Amplitude) Export(jsonQueueWriter *common.JsonQueueWriter, startTime, endTime time.Time) error

type Config

type Config struct {
	CommonConfig          *common.CommonConfig
	DestinationSchemaName string
	ApiKey                string
	SecretKey             string
	StartDate             time.Time
}

func LoadConfig

func LoadConfig() *Config

type RecordEvent

type RecordEvent struct {
	Adid                    string                 `json:"adid"`
	AmplitudeAttributionIDs string                 `json:"amplitude_attribution_ids"`
	AmplitudeEventType      string                 `json:"amplitude_event_type"`
	AmplitudeID             int64                  `json:"amplitude_id"`
	App                     int                    `json:"app"`
	City                    string                 `json:"city"`
	ClientEventTime         string                 `json:"client_event_time"`
	ClientUploadTime        string                 `json:"client_upload_time"`
	Country                 string                 `json:"country"`
	Data                    map[string]interface{} `json:"data"`
	DataType                string                 `json:"data_type"`
	DeviceBrand             string                 `json:"device_brand"`
	DeviceCarrier           string                 `json:"device_carrier"`
	DeviceFamily            string                 `json:"device_family"`
	DeviceID                string                 `json:"device_id"`
	DeviceManufacturer      string                 `json:"device_manufacturer"`
	DeviceModel             string                 `json:"device_model"`
	DeviceType              string                 `json:"device_type"`
	DMA                     string                 `json:"dma"`
	EventID                 int                    `json:"event_id"`
	EventProperties         map[string]interface{} `json:"event_properties"`
	EventTime               string                 `json:"event_time"`
	EventType               string                 `json:"event_type"`
	GlobalUserProperties    map[string]interface{} `json:"global_user_properties"`
	GroupProperties         map[string]interface{} `json:"group_properties"`
	Groups                  map[string]interface{} `json:"groups"`
	IDFA                    string                 `json:"idfa"`
	InsertID                string                 `json:"$insert_id"`
	InsertKey               string                 `json:"$insert_key"`
	IPAddress               string                 `json:"ip_address"`
	IsAttributionEvent      bool                   `json:"is_attribution_event"`
	Language                string                 `json:"language"`
	Library                 string                 `json:"library"`
	LocationLat             float64                `json:"location_lat"`
	LocationLng             float64                `json:"location_lng"`
	OSName                  string                 `json:"os_name"`
	OSVersion               string                 `json:"os_version"`
	PartnerID               string                 `json:"partner_id"`
	Paying                  string                 `json:"paying"`
	Plan                    map[string]interface{} `json:"plan"`
	Platform                string                 `json:"platform"`
	ProcessedTime           string                 `json:"processed_time"`
	Region                  string                 `json:"region"`
	SampleRate              float64                `json:"sample_rate"`
	Schema                  string                 `json:"$schema"`
	ServerReceivedTime      string                 `json:"server_received_time"`
	ServerUploadTime        string                 `json:"server_upload_time"`
	SessionID               int64                  `json:"session_id"`
	SourceID                string                 `json:"source_id"`
	StartVersion            string                 `json:"start_version"`
	UserCreationTime        string                 `json:"user_creation_time"`
	UserID                  string                 `json:"user_id"`
	UserProperties          map[string]interface{} `json:"user_properties"`
	UUID                    string                 `json:"uuid"`
	VersionName             string                 `json:"version_name"`
}

https://amplitude.com/docs/apis/analytics/export

func (*RecordEvent) ToMap

func (event *RecordEvent) ToMap() map[string]interface{}

Normalize $insert_id -> insert_id, $insert_key -> insert_key, $schema -> schema

type Syncer

type Syncer struct {
	Config       *Config
	Amplitude    *Amplitude
	StorageS3    *common.StorageS3
	DuckdbClient *common.DuckdbClient
}

func NewSyncer

func NewSyncer(config *Config, storageS3 *common.StorageS3, duckdbClient *common.DuckdbClient) *Syncer

func (*Syncer) Sync

func (syncer *Syncer) Sync()

func (*Syncer) WriteToIceberg

func (syncer *Syncer) WriteToIceberg(icebergTable *common.IcebergTable, cursorValue common.CursorValue, cappedBuffer *common.CappedBuffer)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL