sync

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: Apache-2.0 Imports: 159 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RelHasRole          = "HAS_ROLE"
	RelMemberOf         = "MEMBER_OF"
	RelAttachedTo       = "ATTACHED_TO"
	RelBelongsTo        = "BELONGS_TO"
	RelCanAccess        = "CAN_ACCESS"
	RelExposedTo        = "EXPOSED_TO"
	RelTrustedBy        = "TRUSTED_BY"
	RelContains         = "CONTAINS"
	RelProtects         = "PROTECTS"
	RelEncryptedBy      = "ENCRYPTED_BY"
	RelManagedBy        = "MANAGED_BY"
	RelLogsTo           = "LOGS_TO"
	RelReadsFrom        = "READS_FROM"
	RelWritesTo         = "WRITES_TO"
	RelInvokes          = "INVOKES"
	RelRoutes           = "ROUTES"
	RelInSubnet         = "IN_SUBNET"
	RelInVPC            = "IN_VPC"
	RelAssumableBy      = "ASSUMABLE_BY"
	RelHasPermission    = "HAS_PERMISSION"
	RelHasVulnerability = "HAS_VULNERABILITY"
)

RelationshipType constants

Variables

View Source
var DefaultAWSRegions = []string{
	"us-east-1",
	"us-east-2",
	"us-west-1",
	"us-west-2",
	"eu-west-1",
	"eu-west-2",
	"eu-central-1",
	"ap-southeast-1",
	"ap-southeast-2",
	"ap-northeast-1",
}

DefaultAWSRegions returns commonly used AWS regions for multi-region scanning

View Source
var ExpectedTables = []string{
	"aws_iam_roles",
	"aws_ec2_instances",
	"aws_s3_buckets",
	"aws_kms_keys",
	"gcp_compute_instances",
	"gcp_storage_buckets",
	"gcp_iam_service_accounts",
	"gcp_container_vulnerabilities",
	"gcp_artifact_registry_images",
	"gcp_scc_findings",
	"azure_compute_virtual_machines",
	"azure_aks_clusters",
	"azure_rbac_role_assignments",
	"azure_policy_assignments",
	"azure_graph_service_principals",
	"azure_defender_assessments",
	"azure_storage_accounts",
	"k8s_cluster_inventory",
	"k8s_core_pods",
	"k8s_core_namespaces",
	"k8s_core_nodes",
	"k8s_core_services",
	"k8s_core_service_accounts",
	"k8s_apps_deployments",
	"k8s_networking_ingresses",
	"k8s_rbac_cluster_roles",
	"k8s_rbac_roles",
	"k8s_rbac_cluster_role_bindings",
	"k8s_rbac_role_bindings",
	"k8s_rbac_service_account_bindings",
	"k8s_rbac_risky_bindings",
	"k8s_audit_events",
	"okta_users",
	"okta_groups",
	"okta_applications",
	"sentinelone_agents",
	"github_repositories",
	"github_organizations",
	"crowdstrike_hosts",
	"entra_users",
	"entra_groups",
}

ExpectedTables are high-signal tables that must remain registered.

View Source
var GCPAssetTypes = map[string]string{
	"compute.googleapis.com/Instance":        "gcp_compute_instances",
	"compute.googleapis.com/Firewall":        "gcp_compute_firewalls",
	"compute.googleapis.com/Network":         "gcp_compute_networks",
	"compute.googleapis.com/Subnetwork":      "gcp_compute_subnetworks",
	"storage.googleapis.com/Bucket":          "gcp_storage_buckets",
	"iam.googleapis.com/ServiceAccount":      "gcp_iam_service_accounts",
	"iam.googleapis.com/Role":                "gcp_iam_roles",
	"sqladmin.googleapis.com/Instance":       "gcp_sql_instances",
	"cloudfunctions.googleapis.com/Function": "gcp_cloudfunctions_functions",
	"pubsub.googleapis.com/Topic":            "gcp_pubsub_topics",
	"container.googleapis.com/Cluster":       "gcp_container_clusters",
	"cloudkms.googleapis.com/CryptoKey":      "gcp_kms_keys",
	"orgpolicy.googleapis.com/Policy":        "gcp_org_policies",
	"secretmanager.googleapis.com/Secret":    "gcp_secretmanager_secrets",
	"bigquery.googleapis.com/Dataset":        "gcp_bigquery_datasets",
	"bigquery.googleapis.com/Table":          "gcp_bigquery_tables",
	"run.googleapis.com/Service":             "gcp_run_services",
	"logging.googleapis.com/LogSink":         "gcp_logging_sinks",
	"dns.googleapis.com/ManagedZone":         "gcp_dns_zones",
}

GCPAssetType maps asset types to table names

Functions

func ListOrganizationProjects

func ListOrganizationProjects(ctx context.Context, orgID string) ([]string, error)

ListOrganizationProjects lists all projects in an organization

func RegisterAllTables

func RegisterAllTables()

RegisterAllTables auto-registers all native sync tables.

func SupportedTableNames

func SupportedTableNames() []string

SupportedTableNames returns all table names supported by native sync engines.

func WithGCPClientOptions

func WithGCPClientOptions(ctx context.Context, opts ...option.ClientOption) context.Context

Types

type AWSCoverageEntry

type AWSCoverageEntry struct {
	Service     string   `json:"service"`
	Table       string   `json:"table"`
	PrimaryKeys []string `json:"primary_keys"`
	Regions     []string `json:"regions"`
	Scope       string   `json:"scope"`
}

AWSCoverageEntry captures table-level AWS sync coverage metadata.

func BuildAWSCoverageMatrix

func BuildAWSCoverageMatrix(configuredRegions []string) []AWSCoverageEntry

BuildAWSCoverageMatrix returns a service-to-table coverage matrix for AWS sync. If configuredRegions is empty, DefaultAWSRegions are used.

type AWSCoverageGap

type AWSCoverageGap struct {
	Service       string   `json:"service"`
	MissingTables []string `json:"missing_tables"`
}

AWSCoverageGap describes required high-value tables that are not currently covered.

func AWSCoverageGaps

func AWSCoverageGaps(entries []AWSCoverageEntry) []AWSCoverageGap

AWSCoverageGaps reports missing high-value AWS tables by service.

type AzureCoverageEntry

type AzureCoverageEntry struct {
	Service     string   `json:"service"`
	Table       string   `json:"table"`
	PrimaryKeys []string `json:"primary_keys"`
	Source      string   `json:"source"`
}

AzureCoverageEntry captures Azure sync coverage metadata.

func BuildAzureCoverageMatrix

func BuildAzureCoverageMatrix() []AzureCoverageEntry

BuildAzureCoverageMatrix returns Azure table coverage metadata for ARM and Graph sync.

type AzureEngineOption

type AzureEngineOption func(*AzureSyncEngine)

AzureEngineOption configures the Azure sync engine

func WithAzureConcurrency

func WithAzureConcurrency(n int) AzureEngineOption

func WithAzureSubscription

func WithAzureSubscription(subscriptionID string) AzureEngineOption

func WithAzureTableFilter

func WithAzureTableFilter(tables []string) AzureEngineOption

type AzureSyncEngine

type AzureSyncEngine struct {
	// contains filtered or unexported fields
}

AzureSyncEngine orchestrates Azure resource syncing with change detection

func NewAzureSyncEngine

func NewAzureSyncEngine(sf *snowflake.Client, logger *slog.Logger, opts ...AzureEngineOption) (*AzureSyncEngine, error)

func (*AzureSyncEngine) SyncAll

func (e *AzureSyncEngine) SyncAll(ctx context.Context) ([]SyncResult, error)

SyncAll syncs all Azure resources with change detection

func (*AzureSyncEngine) ValidateTables

func (e *AzureSyncEngine) ValidateTables(ctx context.Context) ([]SyncResult, error)

ValidateTables ensures required Snowflake tables exist without fetching Azure resources.

type AzureTableSpec

type AzureTableSpec struct {
	Name    string
	Columns []string
	Fetch   func(ctx context.Context, cred *azidentity.DefaultAzureCredential, subscriptionID string) ([]map[string]interface{}, error)
}

AzureTableSpec defines an Azure table to sync

type ChangeSet

type ChangeSet struct {
	Added    []string
	Removed  []string
	Modified []string
}

ChangeSet tracks what changed during sync

func (*ChangeSet) HasChanges

func (c *ChangeSet) HasChanges() bool

func (*ChangeSet) Summary

func (c *ChangeSet) Summary() string

type EngineOption

type EngineOption func(*SyncEngine)

EngineOption configures the sync engine

func WithConcurrency

func WithConcurrency(n int) EngineOption

func WithRateLimiter

func WithRateLimiter(limiter *rate.Limiter) EngineOption

func WithRegions

func WithRegions(regions []string) EngineOption

func WithTableFilter

func WithTableFilter(tables []string) EngineOption

type GCPAssetInventoryEngine

type GCPAssetInventoryEngine struct {
	// contains filtered or unexported fields
}

GCPAssetInventoryEngine uses Cloud Asset Inventory API for efficient bulk resource fetching

func NewGCPAssetInventoryEngine

func NewGCPAssetInventoryEngine(sf *snowflake.Client, logger *slog.Logger, opts ...GCPAssetOption) *GCPAssetInventoryEngine

NewGCPAssetInventoryEngine creates a new engine using Cloud Asset Inventory API

func (*GCPAssetInventoryEngine) SyncAll

SyncAll syncs all GCP resources using Cloud Asset Inventory API

func (*GCPAssetInventoryEngine) ValidateTables

func (e *GCPAssetInventoryEngine) ValidateTables(ctx context.Context) ([]SyncResult, error)

ValidateTables ensures required Snowflake tables exist without fetching assets.

type GCPAssetOption

type GCPAssetOption func(*GCPAssetInventoryEngine)

GCPAssetOption configures the GCP Asset Inventory engine

func WithAssetConcurrency

func WithAssetConcurrency(n int) GCPAssetOption

func WithAssetScope

func WithAssetScope(scope string) GCPAssetOption

func WithAssetTypeFilter

func WithAssetTypeFilter(types []string) GCPAssetOption

func WithProjects

func WithProjects(projects []string) GCPAssetOption

type GCPCoverageEntry

type GCPCoverageEntry struct {
	Service        string   `json:"service"`
	Table          string   `json:"table"`
	PrimaryKeys    []string `json:"primary_keys"`
	NativeAPI      bool     `json:"native_api"`
	AssetInventory bool     `json:"asset_inventory"`
}

GCPCoverageEntry captures GCP table coverage across native API and Asset Inventory paths.

func BuildGCPCoverageMatrix

func BuildGCPCoverageMatrix() []GCPCoverageEntry

BuildGCPCoverageMatrix returns GCP table coverage and indicates whether each table is sourced from native APIs, Asset Inventory, or both.

type GCPCoverageSummary

type GCPCoverageSummary struct {
	BothSources        []string `json:"both_sources"`
	NativeOnly         []string `json:"native_only"`
	AssetInventoryOnly []string `json:"asset_inventory_only"`
}

GCPCoverageSummary compares source overlap between native and Asset Inventory ingestion.

func SummarizeGCPCoverageSources

func SummarizeGCPCoverageSources(entries []GCPCoverageEntry) GCPCoverageSummary

SummarizeGCPCoverageSources reports table overlap between native and Asset Inventory paths.

type GCPEngineOption

type GCPEngineOption func(*GCPSyncEngine)

GCPEngineOption configures the GCP sync engine

func WithGCPConcurrency

func WithGCPConcurrency(n int) GCPEngineOption

func WithGCPProject

func WithGCPProject(projectID string) GCPEngineOption

func WithGCPTableFilter

func WithGCPTableFilter(tables []string) GCPEngineOption

type GCPSecurityOption

type GCPSecurityOption func(*GCPSecuritySync)

GCPSecurityOption configures the GCP security sync

func WithGCPSecurityTableFilter

func WithGCPSecurityTableFilter(tables []string) GCPSecurityOption

type GCPSecuritySync

type GCPSecuritySync struct {
	// contains filtered or unexported fields
}

GCPSecuritySync handles syncing security-related GCP data

func NewGCPSecuritySync

func NewGCPSecuritySync(sf *snowflake.Client, logger *slog.Logger, projectID, orgID string, opts ...GCPSecurityOption) *GCPSecuritySync

NewGCPSecuritySync creates a new GCP security sync instance

func (*GCPSecuritySync) SyncAll

func (s *GCPSecuritySync) SyncAll(ctx context.Context) error

SyncAll syncs all GCP security data

type GCPSyncEngine

type GCPSyncEngine struct {
	// contains filtered or unexported fields
}

GCPSyncEngine orchestrates GCP resource syncing with change detection

func NewGCPSyncEngine

func NewGCPSyncEngine(sf *snowflake.Client, logger *slog.Logger, opts ...GCPEngineOption) *GCPSyncEngine

func (*GCPSyncEngine) SyncAll

func (e *GCPSyncEngine) SyncAll(ctx context.Context) ([]SyncResult, error)

SyncAll syncs all GCP resources with change detection

func (*GCPSyncEngine) ValidateTables

func (e *GCPSyncEngine) ValidateTables(ctx context.Context) ([]SyncResult, error)

ValidateTables ensures required Snowflake tables exist without fetching GCP resources.

type GCPTableSpec

type GCPTableSpec struct {
	Name    string
	Columns []string
	Fetch   func(ctx context.Context, projectID string) ([]map[string]interface{}, error)
}

GCPTableSpec defines a GCP table to sync

type K8sEngineOption

type K8sEngineOption func(*K8sSyncEngine)

K8sEngineOption configures the Kubernetes sync engine.

func WithK8sConcurrency

func WithK8sConcurrency(concurrency int) K8sEngineOption

WithK8sConcurrency sets the concurrency for Kubernetes sync.

func WithK8sContext

func WithK8sContext(name string) K8sEngineOption

WithK8sContext sets the kubeconfig context to use.

func WithK8sKubeconfig

func WithK8sKubeconfig(path string) K8sEngineOption

WithK8sKubeconfig sets the kubeconfig path.

func WithK8sNamespace

func WithK8sNamespace(namespace string) K8sEngineOption

WithK8sNamespace sets the Kubernetes namespace to sync.

func WithK8sTableFilter

func WithK8sTableFilter(tables []string) K8sEngineOption

WithK8sTableFilter sets a table filter for Kubernetes sync.

type K8sSyncEngine

type K8sSyncEngine struct {
	// contains filtered or unexported fields
}

K8sSyncEngine syncs Kubernetes resources to Snowflake.

func NewK8sSyncEngine

func NewK8sSyncEngine(sf *snowflake.Client, logger *slog.Logger, opts ...K8sEngineOption) *K8sSyncEngine

NewK8sSyncEngine creates a Kubernetes sync engine.

func (*K8sSyncEngine) SyncAll

func (e *K8sSyncEngine) SyncAll(ctx context.Context) ([]SyncResult, error)

SyncAll syncs all Kubernetes tables with change detection.

func (*K8sSyncEngine) ValidateTables

func (e *K8sSyncEngine) ValidateTables(ctx context.Context) ([]SyncResult, error)

ValidateTables ensures the Kubernetes tables exist without fetching resources.

type K8sTableSpec

type K8sTableSpec struct {
	Name    string
	Columns []string
	Fetch   func(ctx context.Context, client kubernetes.Interface, namespace, clusterName string) ([]map[string]interface{}, error)
}

K8sTableSpec defines a Kubernetes table to sync.

type RegisteredTable

type RegisteredTable struct {
	Name     string
	Provider TableProvider
	Columns  []string
	Sources  []TableSource
}

RegisteredTable captures normalized metadata for a table.

type Relationship

type Relationship struct {
	SourceID   string    `json:"source_id"`
	SourceType string    `json:"source_type"`
	TargetID   string    `json:"target_id"`
	TargetType string    `json:"target_type"`
	RelType    string    `json:"rel_type"`
	Properties string    `json:"properties,omitempty"` // JSON string
	SyncTime   time.Time `json:"sync_time"`
}

Relationship represents a connection between two cloud resources

type RelationshipBackfillStats

type RelationshipBackfillStats struct {
	Scanned int `json:"scanned"`
	Updated int `json:"updated"`
	Deleted int `json:"deleted"`
	Skipped int `json:"skipped"`
}

RelationshipBackfillStats summarizes normalization updates.

type RelationshipExtractor

type RelationshipExtractor struct {
	// contains filtered or unexported fields
}

RelationshipExtractor extracts relationships from synced resources

func NewRelationshipExtractor

func NewRelationshipExtractor(sf *snowflake.Client, logger *slog.Logger) *RelationshipExtractor

NewRelationshipExtractor creates a new extractor

func (*RelationshipExtractor) BackfillNormalizedRelationshipIDs

func (r *RelationshipExtractor) BackfillNormalizedRelationshipIDs(ctx context.Context, batchSize int) (RelationshipBackfillStats, error)

BackfillNormalizedRelationshipIDs normalizes IDs for existing relationships.

func (*RelationshipExtractor) ExtractAndPersist

func (r *RelationshipExtractor) ExtractAndPersist(ctx context.Context) (int, error)

ExtractAndPersist queries synced tables and extracts relationships

type SyncEngine

type SyncEngine struct {
	// contains filtered or unexported fields
}

SyncEngine orchestrates cloud resource syncing

func NewSyncEngine

func NewSyncEngine(sf *snowflake.Client, logger *slog.Logger, opts ...EngineOption) *SyncEngine

func (*SyncEngine) SyncAll

func (e *SyncEngine) SyncAll(ctx context.Context) ([]SyncResult, error)

SyncAll syncs all AWS resources with parallel execution

func (*SyncEngine) SyncAllWithConfig

func (e *SyncEngine) SyncAllWithConfig(ctx context.Context, cfg aws.Config) ([]SyncResult, error)

SyncAllWithConfig syncs all AWS resources with a preloaded AWS config

func (*SyncEngine) ValidateTablesWithConfig

func (e *SyncEngine) ValidateTablesWithConfig(ctx context.Context, cfg aws.Config) ([]SyncResult, error)

ValidateTablesWithConfig ensures required Snowflake tables exist without fetching cloud resources.

type SyncResult

type SyncResult struct {
	Table    string
	Region   string
	Synced   int
	Errors   int
	Error    string
	Duration time.Duration
	Changes  *ChangeSet
	SyncTime time.Time
}

SyncResult holds results for a single table sync

type TableProvider

type TableProvider string

TableProvider identifies the provider that owns a table.

const (
	TableProviderAWS        TableProvider = "aws"
	TableProviderGCP        TableProvider = "gcp"
	TableProviderAzure      TableProvider = "azure"
	TableProviderKubernetes TableProvider = "k8s"
	TableProviderExternal   TableProvider = "external"
)

type TableRegionScope

type TableRegionScope int
const (
	TableRegionScopeRegional TableRegionScope = iota
	TableRegionScopeGlobal
)

type TableRegistration

type TableRegistration struct {
	Name     string
	Provider TableProvider
	Columns  []string
	Source   TableSource
}

TableRegistration represents a table being added to the registry.

type TableRegistry

type TableRegistry struct {
	// contains filtered or unexported fields
}

TableRegistry stores all known tables and their metadata.

func GlobalTableRegistry

func GlobalTableRegistry() *TableRegistry

GlobalTableRegistry returns the singleton table registry.

func NewTableRegistry

func NewTableRegistry() *TableRegistry

NewTableRegistry creates an empty table registry.

func (*TableRegistry) Get

func (r *TableRegistry) Get(name string) (RegisteredTable, bool)

Get returns the registered table by name.

func (*TableRegistry) MustRegister

func (r *TableRegistry) MustRegister(registration TableRegistration)

MustRegister registers a table and panics on validation errors.

func (*TableRegistry) Names

func (r *TableRegistry) Names() []string

Names returns all registered table names sorted alphabetically.

func (*TableRegistry) NamesByProvider

func (r *TableRegistry) NamesByProvider(provider TableProvider) []string

NamesByProvider returns table names for a provider sorted alphabetically.

func (*TableRegistry) Register

func (r *TableRegistry) Register(registration TableRegistration) error

Register validates and adds a table registration.

func (*TableRegistry) Stats

func (r *TableRegistry) Stats() TableRegistryStats

Stats returns registration statistics.

func (*TableRegistry) Validate

func (r *TableRegistry) Validate() []error

Validate checks registry consistency and baseline provider coverage.

func (*TableRegistry) VerifyExpectedTables

func (r *TableRegistry) VerifyExpectedTables() []string

VerifyExpectedTables ensures expected baseline tables remain registered.

type TableRegistryStats

type TableRegistryStats struct {
	TotalTables        int
	ByProvider         map[TableProvider]int
	BySource           map[TableSource]int
	MultiSourceTables  int
	TablesWithNoColumn int
}

TableRegistryStats summarizes table registration coverage.

type TableSource

type TableSource string

TableSource identifies where the table data comes from.

const (
	TableSourceNative         TableSource = "native"
	TableSourceAssetInventory TableSource = "asset_inventory"
	TableSourceSecurity       TableSource = "security"
)

type TableSpec

type TableSpec struct {
	Name                string
	Columns             []string
	Fetch               func(ctx context.Context, cfg aws.Config, region string) ([]map[string]interface{}, error)
	Mode                TableSyncMode
	Scope               TableRegionScope
	IncrementalLookback time.Duration
}

TableSpec defines a table to sync

type TableSyncMode

type TableSyncMode int
const (
	TableSyncModeFull TableSyncMode = iota
	TableSyncModeIncremental
)

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL