sources

package
v0.0.0-...-7560532 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2026 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultChunkSize used by the chunker.
	DefaultChunkSize = 10 * 1024
	// DefaultPeekSize is the size of the peek into the previous chunk.
	DefaultPeekSize = 3 * 1024
	// TotalChunkSize is the total size of a chunk with peek data.
	TotalChunkSize = DefaultChunkSize + DefaultPeekSize
)

Variables

View Source
var MatchError = errors.New("chunk doesn't match")

Functions

func DecodeResumeInfo

func DecodeResumeInfo(resumeInfo string) []string

func EncodeResumeInfo

func EncodeResumeInfo(resumeInfoSlice []string) string

func FilterReposToResume

func FilterReposToResume(repos []string, resumeInfo string) (reposToScan []string, progressOffsetCount int)

FilterReposToResume filters the existing repos down to those that are included in the encoded resume info. It returns the new slice of repos to be scanned. It also returns the difference between the original length of the repos and the new length to use for progress reporting. It is required that both the resumeInfo repos and the existing repos are sorted.

func HandleTestChannel

func HandleTestChannel(chunksCh chan *Chunk, cf ChunkFunc) error

func RemoveRepoFromResumeInfo

func RemoveRepoFromResumeInfo(resumeRepos []string, repoURL string) []string

RemoveRepoFromResumeInfo removes the repoURL from the resume info.

func WithAPI

func WithAPI(api apiClient) func(*SourceManager)

WithAPI adds an API client to the manager for tracking jobs and progress. If the API is also a JobProgressHook, it will be added to the list of event hooks.

func WithBufferedOutput

func WithBufferedOutput(size int) func(*SourceManager)

WithBufferedOutput sets the size of the buffer used for the Chunks() channel.

func WithCancel

func WithCancel(cancel context.CancelCauseFunc) func(*JobProgress)

WithCancel allows cancelling the job by the JobProgressRef.

func WithConcurrentSources

func WithConcurrentSources(concurrency int) func(*SourceManager)

WithConcurrentSources limits the concurrent number of sources a manager can run.

func WithConcurrentTargets

func WithConcurrentTargets(concurrency int) func(*SourceManager)

WithConcurrentTargets limits the concurrent number of targets a manager can run.

func WithConcurrentUnits

func WithConcurrentUnits(n int) func(*SourceManager)

WithConcurrentUnits limits the number of units to be scanned concurrently. The default is unlimited.

func WithHooks

func WithHooks(hooks ...JobProgressHook) func(*JobProgress)

WithHooks adds hooks to be called when an event triggers.

func WithReportHook

func WithReportHook(hook JobProgressHook) func(*SourceManager)

WithReportHook adds a hook to the SourceManager's reporting feature for customizing data aggregation.

func WithSourceUnits

func WithSourceUnits() func(*SourceManager)

WithSourceUnits enables using source unit enumeration and chunking if the source supports it.

func WithSourceUnitsFunc

func WithSourceUnitsFunc(f func() bool) func(*SourceManager)

WithSourceUnitsFunc dynamically configures whether to use source unit enumeration and chunking if the source supports it. If the function returns true and the source supports it, then units will be used. Otherwise, the legacy scanning method will be used.

Types

type ChanReporter

type ChanReporter struct {
	Ch chan<- *Chunk
}

ChanReporter is a ChunkReporter that writes to a channel.

func (ChanReporter) ChunkErr

func (ChanReporter) ChunkErr(ctx context.Context, err error) error

func (ChanReporter) ChunkOk

func (c ChanReporter) ChunkOk(ctx context.Context, chunk Chunk) error

type Chunk

type Chunk struct {
	// Data is the data to decode and scan.
	Data []byte

	// SourceName is the name of the Source that produced the chunk.
	SourceName string
	// SourceID is the ID of the source that the Chunk originated from.
	SourceID SourceID
	// JobID is the ID of the job that the Chunk originated from.
	JobID JobID
	// SecretID is the ID of the secret, if it exists.
	// Only secrets that are being reverified will have a SecretID.
	SecretID int64

	// SourceMetadata holds the context of where the Chunk was found.
	SourceMetadata *source_metadatapb.MetaData
	// SourceType is the type of Source that produced the chunk.
	SourceType sourcespb.SourceType

	// Verify specifies whether any secrets in the Chunk should be verified.
	Verify bool
}

Chunk contains data to be decoded and scanned along with context on where it came from.

**Important:** The order of the fields in this struct is specifically designed to optimize struct alignment and minimize memory usage. Do not change the field order without carefully considering the potential impact on memory consumption. Ex: https://go.dev/play/p/Azf4a7O-DhC

type ChunkError

type ChunkError struct {
	Unit SourceUnit
	Err  error
}

ChunkError is a custom error type for errors encountered during chunking of a specific unit.

func (ChunkError) Error

func (f ChunkError) Error() string

func (ChunkError) Unwrap

func (f ChunkError) Unwrap() error

type ChunkFunc

type ChunkFunc func(chunk *Chunk) error

type ChunkReader

type ChunkReader func(ctx context.Context, reader io.Reader) <-chan ChunkResult

ChunkReader reads chunks from a reader and returns a channel of chunks and a channel of errors. The channel of chunks is closed when the reader is closed. This should be used whenever a large amount of data is read from a reader. Ex: reading attachments, archives, etc.

func NewChunkReader

func NewChunkReader(opts ...ConfigOption) ChunkReader

NewChunkReader returns a ChunkReader with the given options.

type ChunkReporter

type ChunkReporter interface {
	ChunkOk(ctx context.Context, chunk Chunk) error
	ChunkErr(ctx context.Context, err error) error
}

ChunkReporter defines the interface a source will use to report whether a chunk was found during unit chunking. Either method may be called any number of times. Implementors of this interface should allow for concurrent calls.

type ChunkResult

type ChunkResult struct {
	// contains filtered or unexported fields
}

ChunkResult is the output unit of a ChunkReader, it contains the data and error of a chunk.

func NewChunkResult

func NewChunkResult(data []byte, contentSize int) ChunkResult

NewChunkResult creates a ChunkResult with the given data and content size.

func NewChunkResultError

func NewChunkResultError(err error) ChunkResult

NewChunkResultError creates a ChunkResult containing an error.

func (ChunkResult) Bytes

func (cr ChunkResult) Bytes() []byte

Bytes for a ChunkResult.

func (ChunkResult) ContentSize

func (cr ChunkResult) ContentSize() int

ContentSize returns the size of actual content, excluding peek data. Use this when you need to process only the unique content of each chunk without the overlapping peek portion.

func (ChunkResult) Error

func (cr ChunkResult) Error() error

Error for a ChunkResult.

type ChunkingTarget

type ChunkingTarget struct {
	// QueryCriteria represents specific parameters or conditions to target the chunking process.
	QueryCriteria *source_metadatapb.MetaData
	// SecretID is the ID of the secret.
	SecretID int64
}

ChunkingTarget specifies criteria for a targeted chunking process. Instead of collecting data indiscriminately, this struct allows the caller to specify particular subsets of data they're interested in. This becomes especially useful when one needs to verify or recheck specific data points without processing the entire dataset.

type CommonSourceUnit

type CommonSourceUnit struct {
	Kind SourceUnitKind `json:"kind,omitempty"`
	ID   string         `json:"id"`
}

CommonSourceUnit is a common implementation of SourceUnit that Sources can use instead of implementing their own types.

func (CommonSourceUnit) Display

func (c CommonSourceUnit) Display() string

func (CommonSourceUnit) SourceUnitID

func (c CommonSourceUnit) SourceUnitID() (string, SourceUnitKind)

SourceUnitID implements the SourceUnit interface.

type CommonSourceUnitUnmarshaller

type CommonSourceUnitUnmarshaller struct{}

CommonSourceUnitUnmarshaller is an implementation of SourceUnitUnmarshaller for the CommonSourceUnit. A source can embed this struct to gain the functionality of converting []byte to a CommonSourceUnit.

func (CommonSourceUnitUnmarshaller) UnmarshalSourceUnit

func (c CommonSourceUnitUnmarshaller) UnmarshalSourceUnit(data []byte) (SourceUnit, error)

UnmarshalSourceUnit implements the SourceUnitUnmarshaller interface.

type ConfigOption

type ConfigOption func(*chunkReaderConfig)

ConfigOption is a function that configures a chunker.

func WithChunkSize

func WithChunkSize(size int) ConfigOption

WithChunkSize sets the chunk size.

func WithFileSize

func WithFileSize(size int) ConfigOption

WithFileSize sets the file size. Note: If WithChunkSize is also provided, WithChunkSize takes precedence.

func WithPeekSize

func WithPeekSize(size int) ConfigOption

WithPeekSize sets the peek size.

type ConfiguredSource

type ConfiguredSource struct {
	Name string
	// contains filtered or unexported fields
}

ConfiguredSource is a Source with most of its initialization values pre-configured from a sourcespb.LocalSource configuration struct. It exposes a simplified Init() method and can be only initialized once. This struct is not necessary for running sources, but it helps simplify gathering all of the necessary information to call the [Source.Init] method.

func NewConfiguredSource

func NewConfiguredSource(s Source, config *sourcespb.LocalSource) ConfiguredSource

NewConfiguredSource pre-configures an instantiated Source object with the provided protobuf configuration.

func (*ConfiguredSource) Init

func (c *ConfiguredSource) Init(ctx context.Context, sourceID SourceID, jobID JobID) (Source, error)

Init returns the initialized Source. The ConfiguredSource is unusable after calling this method because initializing a Source more than once is undefined.

func (*ConfiguredSource) SourceType

func (c *ConfiguredSource) SourceType() sourcespb.SourceType

SourceType exposes the underlying source type.

type DockerConfig

type DockerConfig struct {
	// Images is the list of images to scan.
	Images []string
	// BearerToken is the token to use to authenticate with the source.
	BearerToken string
	// UseDockerKeychain determines whether to use the Docker keychain.
	UseDockerKeychain bool
	// ExcludePaths is a list of paths to exclude from scanning.
	ExcludePaths []string
	// Namespace represents a Docker organization or user account.
	Namespace string
	// RegistryToken is an optional authentication token used to access private images within the namespace.
	RegistryToken string
}

DockerConfig defines the optional configuration for a Docker source.

type ElasticsearchConfig

type ElasticsearchConfig struct {
	Nodes          []string
	Username       string
	Password       string
	CloudID        string
	APIKey         string
	ServiceToken   string
	IndexPattern   string
	QueryJSON      string
	SinceTimestamp string
	BestEffortScan bool
}

type Fatal

type Fatal struct {
	// contains filtered or unexported fields
}

Fatal is a wrapper around error to differentiate non-fatal errors from fatal ones. A fatal error is typically from a finished context or any error returned from a source's Init, Chunks, Enumerate, or ChunkUnit methods.

func (Fatal) Error

func (f Fatal) Error() string

func (Fatal) Unwrap

func (f Fatal) Unwrap() error

type FilesystemConfig

type FilesystemConfig struct {
	// Paths is the list of files and directories to scan.
	Paths []string
	// IncludePathsFile is the path to a file containing a list of regexps to include in the scan.
	IncludePathsFile string
	// ExcludePathsFile is the path to a file containing a list of regexps to exclude from the scan.
	ExcludePathsFile string
}

FilesystemConfig defines the optional configuration for a filesystem source.

type GCSConfig

type GCSConfig struct {
	// CloudCred determines whether to use cloud credentials.
	// This can NOT be used with a secret.
	CloudCred,

	WithoutAuth bool
	// ApiKey is the API key to use to authenticate with the source.
	ApiKey,

	ProjectID,

	ServiceAccount string
	// MaxObjectSize is the maximum object size to scan.
	MaxObjectSize int64
	// Concurrency is the number of concurrent workers to use to scan the source.
	Concurrency int
	// IncludeBuckets is a list of buckets to include in the scan.
	IncludeBuckets,

	ExcludeBuckets,

	IncludeObjects,

	ExcludeObjects []string
}

GCSConfig defines the optional configuration for a GCS source.

type GitConfig

type GitConfig struct {
	// HeadRef is the head reference to use to scan from.
	HeadRef string
	// BaseRef is the base reference to use to scan from.
	BaseRef string
	// MaxDepth is the maximum depth to scan the source.
	MaxDepth int
	// Bare is an indicator to handle bare repositories properly.
	Bare bool
	// URI is the URI of the repository to scan. file://, http://, https:// and ssh:// are supported.
	URI string
	// IncludePathsFile is the path to a file containing a list of regexps to include in the scan.
	IncludePathsFile string
	// ExcludePathsFile is the path to a file containing a list of regexps to exclude from the scan.
	ExcludePathsFile string
	// ExcludeGlobs is a list of comma separated globs to exclude from the scan.
	// This differs from the Filter exclusions as ExcludeGlobs is applied at the `git log -p` level
	ExcludeGlobs string
	// SkipBinaries allows skipping binary files from the scan.
	SkipBinaries bool
	// ClonePath is the local path used to clone repositories before scanning.
	ClonePath string
	// NoCleanup allows to keeps cloned repositories in ClonePath after scanning instead of removing them.
	NoCleanup bool
	// PrintLegacyJSON indicates whether to print legacy JSON output format for this source.
	PrintLegacyJSON bool
	// TrustLocalGitConfig allows to trust the local git config.
	TrustLocalGitConfig bool
}

GitConfig defines the optional configuration for a git source.

type GitHubExperimentalConfig

type GitHubExperimentalConfig struct {
	// Repository is the repository to scan.
	Repository string
	// Token is the token to use to authenticate with the source.
	Token string
	// ObjectDiscovery indicates whether to discover all commit objects (CFOR) in the repository.
	ObjectDiscovery bool
	// CollisionThreshold is the number of short-sha collisions tolerated during hidden data enumeration. Default is 1.
	CollisionThreshold int
	// DeleteCachedData indicates whether to delete cached data.
	DeleteCachedData bool
}

GitHubExperimentalConfig defines the optional configuration for an experimental GitHub source.

type GithubConfig

type GithubConfig struct {
	// Endpoint is the endpoint of the source.
	Endpoint string
	// Token is the token to use to authenticate with the source.
	Token string
	// IncludeForks indicates whether to include forks in the scan.
	IncludeForks bool
	// IncludeMembers indicates whether to include members in the scan.
	IncludeMembers bool
	// Concurrency is the number of concurrent workers to use to scan the source.
	Concurrency int
	// Repos is the list of repositories to scan.
	Repos []string
	// Orgs is the list of organizations to scan.
	Orgs []string
	// ExcludeRepos is a list of repositories to exclude from the scan.
	ExcludeRepos []string
	// IncludeRepos is a list of repositories to include in the scan.
	IncludeRepos []string
	// Filter is the filter to use to scan the source.
	Filter *common.Filter
	// IncludeIssueComments indicates whether to include GitHub issue comments in the scan.
	IncludeIssueComments bool
	// IncludePullRequestComments indicates whether to include GitHub pull request comments in the scan.
	IncludePullRequestComments bool
	// IncludeGistComments indicates whether to include GitHub gist comments in the scan.
	IncludeGistComments bool
	// SkipBinaries allows skipping binary files from the scan.
	SkipBinaries bool
	// IncludeWikis indicates whether to include repository wikis in the scan.
	IncludeWikis bool
	// CommentsTimeframeDays indicates how many days of comments to include in the scan.
	CommentsTimeframeDays uint32
	// AuthInUrl determines wether to use authentication token in repository url or in header.
	AuthInUrl bool
	// ClonePath is the local path used to clone repositories before scanning.
	ClonePath string
	// NoCleanup allows to keeps cloned repositories in ClonePath after scanning instead of removing them.
	NoCleanup   bool
	IgnoreGists bool
	// PrintLegacyJSON indicates whether to print legacy JSON output format for this source.
	PrintLegacyJSON bool
}

GithubConfig defines the optional configuration for a github source.

type GitlabConfig

type GitlabConfig struct {
	// Endpoint is the endpoint of the source.
	Endpoint string
	// Token is the token to use to authenticate with the source.
	Token string
	// Repos is the list of repositories to scan.
	Repos []string
	// GroupIds is the list of groups to scan.
	GroupIds []string
	// Filter is the filter to use to scan the source.
	Filter *common.Filter
	// SkipBinaries allows skipping binary files from the scan.
	SkipBinaries bool
	// IncludeRepos is a list of repositories to include in the scan.
	IncludeRepos []string
	// ExcludeRepos is a list of repositories to exclude from the scan.
	ExcludeRepos []string
	// AuthInUrl determines wether to use authentication token in repository url or in header.
	AuthInUrl bool
	// ClonePath is the local path used to clone repositories before scanning
	ClonePath string
	// NoCleanup allows to keeps cloned repositories in ClonePath after scanning instead of removing them.
	NoCleanup bool
	// PrintLegacyJSON indicates whether to print legacy JSON output format for this source.
	PrintLegacyJSON bool
}

GitlabConfig defines the optional configuration for a gitlab source.

type JobID

type JobID int64

type JobProgress

type JobProgress struct {
	// Unique identifiers for this job.
	JobID      JobID
	SourceID   SourceID
	SourceName string
	// contains filtered or unexported fields
}

JobProgress aggregates information about a run of a Source.

func NewJobProgress

func NewJobProgress(jobID JobID, sourceID SourceID, sourceName string, opts ...func(*JobProgress)) *JobProgress

NewJobProgress creates a new job report for the given source and job ID.

func (*JobProgress) Done

func (jp *JobProgress) Done() <-chan struct{}

func (*JobProgress) End

func (jp *JobProgress) End(end time.Time)

func (*JobProgress) EndEnumerating

func (jp *JobProgress) EndEnumerating(end time.Time)

func (*JobProgress) EndUnitChunking

func (jp *JobProgress) EndUnitChunking(unit SourceUnit, end time.Time)

func (*JobProgress) Finish

func (jp *JobProgress) Finish()

func (*JobProgress) Ref

func (jp *JobProgress) Ref() JobProgressRef

Ref provides a read-only reference to the JobProgress.

func (*JobProgress) ReportChunk

func (jp *JobProgress) ReportChunk(unit SourceUnit, chunk *Chunk)

func (*JobProgress) ReportError

func (jp *JobProgress) ReportError(err error)

ReportError adds a non-nil error to the aggregate of errors encountered during scanning.

func (*JobProgress) ReportUnit

func (jp *JobProgress) ReportUnit(unit SourceUnit)

func (*JobProgress) Snapshot

func (jp *JobProgress) Snapshot() JobProgressMetrics

Snapshot safely gets the job's current metrics.

func (*JobProgress) Start

func (jp *JobProgress) Start(start time.Time)

TODO: Comment all this mess. They are mostly implementing JobProgressHook but without the JobProgressRef parameter.

func (*JobProgress) StartEnumerating

func (jp *JobProgress) StartEnumerating(start time.Time)

func (*JobProgress) StartUnitChunking

func (jp *JobProgress) StartUnitChunking(unit SourceUnit, start time.Time)

func (*JobProgress) TrackProgress

func (jp *JobProgress) TrackProgress(progress *Progress)

TrackProgress informs the JobProgress of a Progress object and safely exposes its information in the Snapshots.

type JobProgressHook

type JobProgressHook interface {
	// Start and End marks the overall start and end time for this job.
	Start(JobProgressRef, time.Time)
	End(JobProgressRef, time.Time)
	// StartEnumerating and EndEnumerating marks the start and end time for
	// calling the source's Enumerate method. If the source does not
	// support enumeration these methods will never be called.
	StartEnumerating(JobProgressRef, time.Time)
	EndEnumerating(JobProgressRef, time.Time)
	// StartUnitChunking and EndUnitChunking marks the start and end time
	// for calling the source's ChunkUnit method for a given unit. If the
	// source does not support enumeration these methods will never be
	// called.
	StartUnitChunking(JobProgressRef, SourceUnit, time.Time)
	EndUnitChunking(JobProgressRef, SourceUnit, time.Time)
	// ReportError is called when any general error is encountered, usually
	// from enumeration.
	ReportError(JobProgressRef, error)
	// ReportUnit is called when a unit has been enumerated. If the source
	// does not support enumeration this method will never be called.
	ReportUnit(JobProgressRef, SourceUnit)
	// ReportChunk is called when a chunk has been produced for the given
	// unit. The unit will be nil if the source does not support
	// enumeration.
	ReportChunk(JobProgressRef, SourceUnit, *Chunk)
	// Finish marks the job as done.
	Finish(JobProgressRef)
}

type JobProgressMetrics

type JobProgressMetrics struct {
	StartTime *time.Time `json:"start_time,omitempty"`
	EndTime   *time.Time `json:"end_time,omitempty"`
	// Total number of units found by the Source.
	TotalUnits uint64 `json:"total_units,omitempty"`
	// Total number of units that have finished chunking.
	FinishedUnits uint64 `json:"finished_units,omitempty"`
	// Total number of chunks produced. This metric updates before the
	// chunk is sent on the output channel.
	TotalChunks uint64 `json:"total_chunks"`
	// All errors encountered.
	Errors []error `json:"errors"`
	// Set to true if the source supports enumeration and has finished
	// enumerating. If the source does not support enumeration, this field
	// is always false.
	DoneEnumerating bool `json:"done_enumerating,omitempty"`

	// Progress information reported by the source.
	SourcePercent           int64  `json:"source_percent,omitempty"`
	SourceMessage           string `json:"source_message,omitempty"`
	SourceEncodedResumeInfo string `json:"source_encoded_resume_info,omitempty"`
	SourceSectionsCompleted int32  `json:"source_sections_completed,omitempty"`
	SourceSectionsRemaining int32  `json:"source_sections_remaining,omitempty"`
}

JobProgressMetrics tracks the metrics of a job.

func (JobProgressMetrics) ChunkError

func (m JobProgressMetrics) ChunkError() error

ChunkError joins all errors encountered during chunking.

func (JobProgressMetrics) ElapsedTime

func (m JobProgressMetrics) ElapsedTime() time.Duration

ElapsedTime is a convenience method that provides the elapsed time the job has been running. If it hasn't started yet, 0 is returned. If it has finished, the total time is returned.

func (JobProgressMetrics) EnumerationError

func (m JobProgressMetrics) EnumerationError() error

EnumerationError joins all errors encountered during initialization or enumeration.

func (JobProgressMetrics) ErrorsFor

func (m JobProgressMetrics) ErrorsFor(unit SourceUnit) []error

ErrorsFor returns all the errors for the given SourceUnit. If there are no errors, including the case that the unit has not been encountered, nil will be returned.

func (JobProgressMetrics) FatalError

func (m JobProgressMetrics) FatalError() error

FatalError returns the first Fatal error, if any, encountered in the scan.

func (JobProgressMetrics) FatalErrors

func (m JobProgressMetrics) FatalErrors() error

FatalErrors returns all of the encountered fatal errors joined together.

func (JobProgressMetrics) PercentComplete

func (m JobProgressMetrics) PercentComplete() int

type JobProgressRef

type JobProgressRef struct {
	JobID      JobID    `json:"job_id"`
	SourceID   SourceID `json:"source_id"`
	SourceName string   `json:"source_name"`
	// contains filtered or unexported fields
}

JobProgressRef is a wrapper of a JobProgress for read-only access to its state. If the job supports it, the reference can also be used to cancel running via CancelRun.

func (*JobProgressRef) CancelRun

func (r *JobProgressRef) CancelRun(cause error)

CancelRun requests that the job this is referencing is cancelled and stops running. This method will have no effect if the job does not allow cancellation.

func (*JobProgressRef) Done

func (r *JobProgressRef) Done() <-chan struct{}

Done returns a channel that will block until the job has completed.

func (*JobProgressRef) Snapshot

func (r *JobProgressRef) Snapshot() JobProgressMetrics

Snapshot returns a snapshot of the job's current metrics.

type NoopHook

type NoopHook struct{}

NoopHook implements JobProgressHook by doing nothing. This is useful for embedding in other structs to overwrite only the methods of the interface that you care about.

func (NoopHook) End

func (NoopHook) EndEnumerating

func (NoopHook) EndEnumerating(JobProgressRef, time.Time)

func (NoopHook) EndUnitChunking

func (NoopHook) EndUnitChunking(JobProgressRef, SourceUnit, time.Time)

func (NoopHook) Finish

func (NoopHook) Finish(JobProgressRef)

func (NoopHook) ReportChunk

func (NoopHook) ReportChunk(JobProgressRef, SourceUnit, *Chunk)

func (NoopHook) ReportError

func (NoopHook) ReportError(JobProgressRef, error)

func (NoopHook) ReportUnit

func (NoopHook) ReportUnit(JobProgressRef, SourceUnit)

func (NoopHook) Start

func (NoopHook) Start(JobProgressRef, time.Time)

func (NoopHook) StartEnumerating

func (NoopHook) StartEnumerating(JobProgressRef, time.Time)

func (NoopHook) StartUnitChunking

func (NoopHook) StartUnitChunking(JobProgressRef, SourceUnit, time.Time)

type PostmanConfig

type PostmanConfig struct {
	// Workspace UUID(s) or file path(s) to Postman workspace (.zip)
	Workspaces []string
	// Collection ID(s) or file path(s) to Postman collection (.json)
	Collections []string
	// Environment ID(s) or file path(s) to Postman environment (.json)
	Environments []string
	// Token is the token to use to authenticate with the API.
	Token string
	// IncludeCollections is a list of Collections to include in the scan.
	IncludeCollections []string
	// IncludeEnvironment is a list of Environments to include in the scan.
	IncludeEnvironments []string
	// ExcludeCollections is a list of Collections to exclude in the scan.
	ExcludeCollections []string
	// ExcludeEnvironment is a list of Environments to exclude in the scan.
	ExcludeEnvironments []string
	// Concurrency is the number of concurrent workers to use to scan the source.
	Concurrency int
	// CollectionPaths is the list of paths to Postman collections.
	CollectionPaths []string
	// WorkspacePaths is the list of paths to Postman workspaces.
	WorkspacePaths []string
	// EnvironmentPaths is the list of paths to Postman environments.
	EnvironmentPaths []string
	// Filter is the filter to use to scan the source.
	Filter *common.Filter
}

PostmanConfig defines the optional configuration for a Postman source.

type Progress

type Progress struct {
	PercentComplete   int64
	Message           string
	EncodedResumeInfo string
	SectionsCompleted int32
	SectionsRemaining int32
	// contains filtered or unexported fields
}

Progress is used to update job completion progress across sources.

func (*Progress) ClearEncodedResumeInfoFor

func (p *Progress) ClearEncodedResumeInfoFor(id string)

ClearEncodedResumeInfoFor removes the encoded resume information from being tracked.

func (*Progress) GetEncodedResumeInfoFor

func (p *Progress) GetEncodedResumeInfoFor(id string) string

GetEncodedResumeInfoFor gets the encoded resume information for the provided ID, usually a unit ID.

func (*Progress) GetProgress

func (p *Progress) GetProgress() *Progress

GetProgress gets job completion percentage for metrics reporting.

func (*Progress) SetEncodedResumeInfoFor

func (p *Progress) SetEncodedResumeInfoFor(id, value string)

SetEncodedResumeInfoFor sets the encoded resume information for the provided ID, usually a unit ID.

func (*Progress) SetProgressComplete

func (p *Progress) SetProgressComplete(i, scope int, message, encodedResumeInfo string)

SetProgressComplete sets job progress information for a running job based on the highest level objects in the source. i is the current iteration in the loop of target scope scope should be the len(scopedItems) message is the public facing user information about the current progress encodedResumeInfo is an optional string representing any information necessary to resume the job if interrupted

NOTE: SetProgressOngoing should be used when source does not yet know how many items it is scanning (scope)
and does not want to display a percentage complete

func (*Progress) SetProgressOngoing

func (p *Progress) SetProgressOngoing(message string, encodedResumeInfo string)

SetProgressOngoing sets information about the current running job based on the highest level objects in the source. message is the public facing user information about the current progress encodedResumeInfo is an optional string representing any information necessary to resume the job if interrupted

NOTE: This method should be used over SetProgressComplete when the source does
not yet know how many items it is scanning and does not want to display a percentage complete.

type S3Config

type S3Config struct {
	// CloudCred determines whether to use cloud credentials.
	// This can NOT be used with a secret.
	CloudCred bool
	// Key is any key to use to authenticate with the source.
	Key,

	Secret,

	SessionToken string
	// Buckets is the list of buckets to scan.
	Buckets []string
	// IgnoreBuckets is the list buckets to ignore.
	IgnoreBuckets []string
	// Roles is the list of Roles to use.
	Roles []string
	// MaxObjectSize is the maximum object size to scan.
	MaxObjectSize int64
}

S3Config defines the optional configuration for an S3 source.

type ScanErrors

type ScanErrors struct {
	// contains filtered or unexported fields
}

ScanErrors is used to collect errors encountered while scanning. It ensures that errors are collected in a thread-safe manner.

func NewScanErrors

func NewScanErrors() *ScanErrors

NewScanErrors creates a new thread safe error collector.

func (*ScanErrors) Add

func (s *ScanErrors) Add(err error)

Add an error to the collection in a thread-safe manner.

func (*ScanErrors) Count

func (s *ScanErrors) Count() uint64

Count returns the number of errors collected.

func (*ScanErrors) Errors

func (s *ScanErrors) Errors() error

func (*ScanErrors) String

func (s *ScanErrors) String() string

type Source

type Source interface {
	// Type returns the source type, used for matching against configuration and jobs.
	Type() sourcespb.SourceType
	// SourceID returns the initialized source ID used for tracking relationships in the DB.
	SourceID() SourceID
	// JobID returns the initialized job ID used for tracking relationships in the DB.
	JobID() JobID
	// Init initializes the source. Calling this method more than once is undefined behavior.
	Init(aCtx context.Context, name string, jobId JobID, sourceId SourceID, verify bool, connection *anypb.Any, concurrency int) error
	// Chunks emits data over a channel which is then decoded and scanned for secrets.
	// By default, data is obtained indiscriminately. However, by providing one or more
	// ChunkingTarget parameters, the caller can direct the function to retrieve
	// specific chunks of data. This targeted approach allows for efficient and
	// intentional data processing, beneficial when verifying or rechecking specific data points.
	Chunks(ctx context.Context, chunksChan chan *Chunk, targets ...ChunkingTarget) error
	// GetProgress is the completion progress (percentage) for Scanned Source.
	GetProgress() *Progress
}

Source defines the interface required to implement a source chunker.

type SourceID

type SourceID int64

type SourceManager

type SourceManager struct {
	// contains filtered or unexported fields
}

SourceManager provides an interface for starting and managing running sources.

func NewManager

func NewManager(opts ...func(*SourceManager)) *SourceManager

NewManager creates a new manager with the provided options.

func (*SourceManager) AvailableCapacity

func (s *SourceManager) AvailableCapacity() int

AvailableCapacity returns the number of concurrent jobs the manager can accommodate at this time.

func (*SourceManager) Chunks

func (s *SourceManager) Chunks() <-chan *Chunk

Chunks returns the read only channel of all the chunks produced by all of the sources managed by this manager.

func (*SourceManager) ConcurrentSources

func (s *SourceManager) ConcurrentSources() int

ConcurrentSources returns the current number of concurrently running sources.

func (*SourceManager) Enumerate

func (s *SourceManager) Enumerate(ctx context.Context, sourceName string, source Source, reporter UnitReporter) (JobProgressRef, error)

func (*SourceManager) EnumerateAndScan

func (s *SourceManager) EnumerateAndScan(ctx context.Context, sourceName string, source Source, targets ...ChunkingTarget) (JobProgressRef, error)

EnumerateAndScan blocks until a resource is available to run the source, then asynchronously runs it. Error information is stored and accessible via the JobProgressRef as it becomes available.

func (*SourceManager) GetIDs

func (s *SourceManager) GetIDs(ctx context.Context, sourceName string, kind sourcespb.SourceType) (SourceID, JobID, error)

func (*SourceManager) MaxConcurrentSources

func (s *SourceManager) MaxConcurrentSources() int

MaxConcurrentSources returns the maximum configured limit of concurrent sources the manager will run.

func (*SourceManager) Scan

func (s *SourceManager) Scan(ctx context.Context, sourceName string, source Source, unit SourceUnit) (JobProgressRef, error)

Scan blocks until a resource is available to run the source against a single SourceUnit, then asynchronously runs it. Error information is stored and accessible via the JobProgressRef as it becomes available.

func (*SourceManager) ScanChunk

func (s *SourceManager) ScanChunk(chunk *Chunk)

ScanChunk injects a chunk into the output stream of chunks to be scanned. This method should rarely be used. TODO(THOG-1577): Remove when dependencies no longer rely on this functionality.

func (*SourceManager) SetMaxConcurrentSources

func (s *SourceManager) SetMaxConcurrentSources(maxRunCount int)

SetMaxConcurrentSources sets the maximum number of concurrently running sources. If the count is lower than the already existing number of concurrently running sources, no sources will be scheduled to run until the existing sources complete.

func (*SourceManager) Wait

func (s *SourceManager) Wait() error

Wait blocks until all running sources are completed and closes the channel returned by Chunks(). The manager should not be reused after calling this method. This current implementation is not thread safe and should only be called by one thread.

type SourceUnit

type SourceUnit interface {
	// SourceUnitID uniquely identifies a source unit. It does not need to
	// be human readable or two-way, however, it should be canonical and
	// stable across runs.
	SourceUnitID() (string, SourceUnitKind)

	// Display is the human readable representation of the SourceUnit.
	Display() string
}

SourceUnit is an object that represents a Source's unit of work. This is used as the output of enumeration, progress reporting, and job distribution.

type SourceUnitChunker

type SourceUnitChunker interface {
	// ChunkUnit creates 0 or more chunks from a unit, reporting them or
	// any errors to the ChunkReporter. An error should only be returned
	// from this method in the case of context cancellation, fatal source
	// errors, or errors returned by the reporter. All other errors related
	// to unit chunking are tracked by the ChunkReporter.
	ChunkUnit(ctx context.Context, unit SourceUnit, reporter ChunkReporter) error
}

SourceUnitChunker defines an optional interface a Source can implement to support chunking a single SourceUnit.

type SourceUnitEnumChunker

type SourceUnitEnumChunker interface {
	SourceUnitEnumerator
	SourceUnitChunker
}

SourceUnitEnumChunker are the two required interfaces to support enumerating and chunking of units.

type SourceUnitEnumerator

type SourceUnitEnumerator interface {
	// Enumerate creates 0 or more units from an initialized source,
	// reporting them or any errors to the UnitReporter. This method is
	// synchronous but can be called in a goroutine to support concurrent
	// enumeration and chunking. An error should only be returned from this
	// method in the case of context cancellation, fatal source errors, or
	// errors returned by the reporter. All other errors related to unit
	// enumeration are tracked by the UnitReporter.
	Enumerate(ctx context.Context, reporter UnitReporter) error
}

SourceUnitEnumerator defines an optional interface a Source can implement to support enumerating an initialized Source into SourceUnits.

type SourceUnitKind

type SourceUnitKind string

type SourceUnitUnmarshaller

type SourceUnitUnmarshaller interface {
	UnmarshalSourceUnit(data []byte) (SourceUnit, error)
}

SourceUnitUnmarshaller defines an optional interface a Source can implement to support units coming from an external source.

type StdinConfig

type StdinConfig struct{}

type SyslogConfig

type SyslogConfig struct {
	// Address used to connect to the source.
	Address,

	Protocol,

	CertPath,

	Format,

	KeyPath string
	// Concurrency is the number of concurrent workers to use to scan the source.
	Concurrency int
}

SyslogConfig defines the optional configuration for a syslog source.

type TargetedScanError

type TargetedScanError struct {
	Err      error
	SecretID int64
}

TargetedScanError is an error with a secret ID attached. Collections of them can be returned by targeted scans that scan multiple targets in order to associate individual errors with individual scan targets.

func (TargetedScanError) Error

func (t TargetedScanError) Error() string

func (TargetedScanError) Unwrap

func (t TargetedScanError) Unwrap() error

type UnitHook

type UnitHook struct {
	NoopHook
	// contains filtered or unexported fields
}

UnitHook implements JobProgressHook for tracking the progress of each individual unit.

func NewUnitHook

func NewUnitHook(ctx context.Context, opts ...UnitHookOpt) (*UnitHook, <-chan UnitMetrics)

func (*UnitHook) Close

func (u *UnitHook) Close() error

func (*UnitHook) EndUnitChunking

func (u *UnitHook) EndUnitChunking(ref JobProgressRef, unit SourceUnit, end time.Time)

func (*UnitHook) Finish

func (u *UnitHook) Finish(ref JobProgressRef)

func (*UnitHook) InProgressSnapshot

func (u *UnitHook) InProgressSnapshot() []UnitMetrics

InProgressSnapshot gets all the currently active metrics across all jobs.

func (*UnitHook) ReportChunk

func (u *UnitHook) ReportChunk(ref JobProgressRef, unit SourceUnit, chunk *Chunk)

func (*UnitHook) ReportError

func (u *UnitHook) ReportError(ref JobProgressRef, err error)

func (*UnitHook) StartUnitChunking

func (u *UnitHook) StartUnitChunking(ref JobProgressRef, unit SourceUnit, start time.Time)

type UnitHookOpt

type UnitHookOpt func(*UnitHook)

func WithUnitHookFinishBufferSize

func WithUnitHookFinishBufferSize(buf int) UnitHookOpt

WithUnitHookFinishBufferSize sets the buffer size for handling finished metrics (default is 1024). If the buffer fills, then scanning will stop until there is room.

type UnitMetrics

type UnitMetrics struct {
	Unit   SourceUnit     `json:"unit,omitempty"`
	Parent JobProgressRef `json:"parent,omitempty"`
	// Start and end time for chunking this unit.
	StartTime *time.Time `json:"start_time,omitempty"`
	EndTime   *time.Time `json:"end_time,omitempty"`
	// Total number of chunks produced from this unit.
	TotalChunks uint64 `json:"total_chunks"`
	// Total number of bytes produced from this unit.
	TotalBytes uint64 `json:"total_bytes"`
	// All errors encountered by this unit.
	Errors []error `json:"errors"`
}

func (UnitMetrics) ElapsedTime

func (u UnitMetrics) ElapsedTime() time.Duration

ElapsedTime is a convenience method that provides the elapsed time the job has been running. If it hasn't started yet, 0 is returned. If it has finished, the total time is returned.

func (UnitMetrics) IsFinished

func (u UnitMetrics) IsFinished() bool

type UnitReporter

type UnitReporter interface {
	UnitOk(ctx context.Context, unit SourceUnit) error
	UnitErr(ctx context.Context, err error) error
}

UnitReporter defines the interface a source will use to report whether a unit was found during enumeration. Either method may be called any number of times. Implementors of this interface should allow for concurrent calls.

type Validator

type Validator interface {
	Validate(ctx context.Context) []error
}

Validator is an interface for validating a source. Sources can optionally implement this interface to validate their configuration.

type VisitorReporter

type VisitorReporter struct {
	VisitUnit func(context.Context, SourceUnit) error
	VisitErr  func(context.Context, error) error
}

VisitorReporter is a UnitReporter that will call the provided callbacks for finding units and reporting errors. VisitErr is optional; if unset it will log the error.

func (VisitorReporter) UnitErr

func (v VisitorReporter) UnitErr(ctx context.Context, err error) error

func (VisitorReporter) UnitOk

func (v VisitorReporter) UnitOk(ctx context.Context, unit SourceUnit) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL