kiterunner

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2021 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Overview

Package kiterunner provides the core request loop for performing highly concurrent low allocation requests.

This package should be used where you want to make a large number of requests across a large number of targets quickly, concurrently and with 0 allocations.

The 0 allocation benchmark comes from BenchmarkKiterunnerEngineRunGoland1Async located in benchmark_integration_test.go the benchmark assumes the following

  • all your targets have been allocated
  • all your routes have been allocated
  • the caller releases Results back into the pool after they have completed reading the result

Hence the 0 allocations corresponds to 0 additional allocations performed while performing requests. On a 2020 Macbook 13" this yields the following results.

❯ ulimit -n 20000 && go test '-bench=BenchmarkKiterunnerEngineRunGoland1Async$' -v ./...   -run='^$'  -count=10 -benchtime=10000x -benchmem
goos: darwin
goarch: amd64
pkg: github.com/assetnote/kiterunner/pkg/kiterunner
BenchmarkKiterunnerEngineRunGoland1Async
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             75670 ns/op               5 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             79782 ns/op               5 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             88210 ns/op               6 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             88506 ns/op               6 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             93852 ns/op               6 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             78248 ns/op               6 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             74890 ns/op               6 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             74421 ns/op               7 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             74885 ns/op               6 B/op          0 allocs/op
BenchmarkKiterunnerEngineRunGoland1Async-8         10000             74722 ns/op               6 B/op          0 allocs/op
PASS
ok      github.com/assetnote/kiterunner/pkg/kiterunner  8.170s

The allocation savings primarily derive from aggressively using sync.Pools for objects that are re-used across targets and mis-using channels as low cost, cross goroutine communication buffers. These optimisations have resulted in slightly difficult to understand code in the handleTarget function. Future attempts to develop on this codebase should ensure that the baseline benchmarks aren't exceeded per commit, or where they are, a justifiable reason for the performance degradation is provided

The concurrency model elected for the Async loop is defined in RunAsync. This uses 3 goroutine worker separations

  • One goroutine per target to supervise the scheduling of requests
  • One goroutine for scheduling preflight requests. Spawned by the target thread
  • N goroutines (Max Conn Per Host) per target for performing requests.
  • request_wkr threads process both preflight requests and normal requests
  • preflight requests sent to request_wkr include the response channel where results are sent
  • normal requests are sent to the rx channel created a RunAsync time

This is illustrated in the following flow diagram

            (A)               (B)
            ┌─► preflight_wkr ──┬─► request_wkr ──┐
      ┌─► target_wkr─────(D)────┼─► request_wkr ──┼───┐
      │         ▲               └─► request_wkr ──┤  (E)
      │         └─────────────────────────────(C)─┘   │
      │     ┌─► preflight_wkr ──┬─► request_wkr ──┐   │
  tx ─┼─ target_wkr─────────────┼─► request_wkr ──┼───┼──►rx
      │        ▲                └─► request_wkr ──┤   │
      │        └──────────────────────────────────┘   │
      │     ┌─► preflight_wkr ──┬─► request_wkr ──┐   │
      └─► target_wkr────────────┼─► request_wkr ──┼───┘
                ▲               └─► request_wkr ──┤
                └─────────────────────────────────┘
(A) - target_wkr    - spawns preflight worker
(B) - preflight_wkr - schedules preflight requests for given baseline
(C) - request_wkr   - performs request and sends response to target_wkr for aggregation
(D) - target_wkr    - schedules a batch of requests for given baseline
(E) - request_wkr   - results are sent to rx channel for aggregation

This concurrency model was selected based on the benchmarks run in github.com/assetnote/kiterunner/benchmark/concurrency_test.go where this was determined to provide a compromise between:

  • Allowing for preflight requests to occur concurrently with normal requests
  • The target_wkr does not need to wait for a directory to complete before performing the next preflight requests
  • Restricting the number of requests sent to a given host at any one time
  • We only use the pool of request_wkr for each given target to make requests
  • Minimising inter-thread waits on channels
  • We batch up the requests on a directory so the request_wkr doesnt need to read from the channel for each subsequent request in a directory they already have data for
  • Avoiding a single slow target from blocking other targets from processing

Kiterunner Wrappers

kiterunner_wrappers.go contains a set of synchronous wrapper functions for RunAsync. These are provided as utilities for callers who don't wish to have the complexity of managing a channel based function call. Using the functions that return Results will avoid releasing the Result back into the pool, consuming requests, but allowing you to aggregate the results.

Example (KiterunnerRunAsync)

KiterunnerRunAsync demonstrates how to perform an RunAsync call and how to process results We recommend releasing the Result after processing is complete to avoid unnecessary allocations for future results. the penalty invoked is just the cost of allocating the memory for a new Result, which is relatively nominal compared to the number of requests sent

routes := MakeRedirectRoutes(1, 1)
e := NewEngine(routes,
	ReadBody(false),
	// adjusting this significantly affects allocations
	MaxParallelHosts(1),
	MaxConnPerHost(2),
)
ctx := context.Background()
targets := MakeTargets(1)

for _, v := range targets {
	// set the context so they can be cancelled if the task is cancelled
	v.SetContext(ctx)
	// parse the host header so we don't send garbage to the client
	v.ParseHostHeader()
}

// start the loop and get your communication channels
tx, rx, err := e.RunAsync(ctx)
if err != nil { // handle err
}

var res []*Result
for _, v := range targets {
	tx <- v
}
close(tx)

for r := range rx {
	res = append(res, r)
	// OR if you don't need results
	r.Release()
}

// rx will close when tx is closed
Output:

Example (KiterunnerRunSync)

KiterunnerRunSync demonstrates how to perform a synchronous call against the kiterunner Engine. The results can be used as normal. Modifying the target or route that is returned may result in unexpected behaviour if the routes and targets are re-used in a later iteration of the run

ctx := context.Background()
e := NewEngine(MakeRoutes(5), MaxParallelHosts(5), TargetQuarantineThreshold(0))
targets := MakeTargets(5)
for _, v := range targets {
	v.ParseHostHeader()
	v.SetContext(ctx)
}
res, err := e.Run(ctx, targets)
if err != nil {
	// handle err
}

for _, v := range res {
	fmt.Println(v.String())
}
Output:

Index

Examples

Constants

View Source
const CheckInterval = 10

CheckInterval indicates how many requests a worker thread will process before checking for context cancellation or target quarantine

Variables

View Source
var (
	// ErrTargetQuarantined indicates that the target has crossed the threshold amount for consecutive non-baseline requests
	// and has consequently been quarantined. This indicates the host should not be scanned anymore
	ErrTargetQuarantined = fmt.Errorf("target quarantined")
	// DefaultRootRoute is the default base route that is used as a canary baseline against all hosts.
	DefaultRootRoute = []*http.Route{{Path: []byte("/"), Method: http.GET}}
)
View Source
var (
	ErrLengthMatch             = fmt.Errorf("failed on content length check")
	ErrScaledLengthMatch       = fmt.Errorf("failed on adjusted content length check")
	ErrWordCountMatch          = fmt.Errorf("failed on word and line count match")
	ErrContentLengthRangeMatch = fmt.Errorf("failed on content length range match")
	ErrBlacklistedStatusCode   = fmt.Errorf("failed with blacklisted status code")
	ErrWhitelistedStatusCode   = fmt.Errorf("failed with not whitelisted status code")
)

These errors correspond to errors resulting from failing various response validation checks.

View Source
var (
	ErrGoogleBadRequest        = fmt.Errorf("google bad request found")
	ErrAmazonGatewayBadRequest = fmt.Errorf("amazon gateway bad request found")
)
View Source
var (
	PreflightCheckRoutes = []*http.Route{
		{
			Method: http.GET,
			Path:   []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16] + "/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]),
		},
		{
			Method: http.GET,
			Path:   []byte("/"),
		},
		{
			Method: http.GET,
			Path:   []byte("/" + strings.Repeat("A", 1500)),
		},
		{
			Method: http.POST,
			Path:   []byte("/"),
		},
		{
			Method: http.PUT,
			Path:   []byte("/auth" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]),

			Headers: []http.Header{{"Authorization", "Basic MTox"}},
		},
		{
			Method: http.GET,
			Path:   []byte("/auth" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]),

			Headers: []http.Header{{"Authorization", "Basic MTox"}},
		},

		{
			Method: http.GET,
			Path:   []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]),
		},
		{
			Method: http.PUT,
			Path:   []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]),
		},
		{
			Method: http.POST,
			Path:   []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]),
		},
		{
			Method: http.DELETE,
			Path:   []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]),
		},
		{
			Method: http.PATCH,
			Path:   []byte("/" + strings.Replace(uuid.New().String(), "-", "", -1)[0:16]),
		},
	}
)
View Source
var (
	ReqMsgPool sync.Pool
)

Functions

func LogResult

func LogResult(r *Result, config *Config)

func LogResults

func LogResults(res []*Result, config *Config)

LogResults will output the results using the configured logger

func LogResultsChan

func LogResultsChan(ctx context.Context, res chan *Result, config *Config)

LogResultsChan will output the results using the configured logger

func ReleaseResult

func ReleaseResult(h *Result)

ReleaseResult releases a host into the shared header pool

Types

type Attribute

type Attribute int
const (
	FgBlack Attribute = iota + 30
	FgRed
	FgGreen
	FgYellow
	FgBlue
	FgMagenta
	FgCyan
	FgWhite

	FgBrightBlack Attribute = 90
)

Foreground text colors

type Config

type Config struct {
	MaxParallelHosts     int           `toml:"max_parallel_hosts" json:"max_parallel_hosts" mapstructure:"max_parallel_hosts"`
	MaxConnPerHost       int           `toml:"max_conn_per_host" json:"max_conn_per_host" mapstructure:"max_conn_per_host"`
	WildcardDetection    bool          `json:"wildcard_detection"`
	Delay                time.Duration `toml:"delay_ms" json:"delay_ms" mapstructure:"delay_ms"`
	HTTP                 http.Config   `toml:"http" json:"http" mapstructure:"http"`
	QuarantineThreshold  int64
	PreflightCheckRoutes []*http.Route // these are the routes use to calculate the baseline. If the slice is empty, no baselines will be created so requests will match on the status codes
	ProgressBar          ProgressBar
	RequestValidators    []RequestValidator
}

func NewDefaultConfig

func NewDefaultConfig() *Config

func (*Config) Validate

func (c *Config) Validate() error

type ConfigOption

type ConfigOption func(*Config)

func AddProgressBar

func AddProgressBar(p ProgressBar) ConfigOption

func AddRequestFilter

func AddRequestFilter(f RequestValidator) ConfigOption

func BlacklistDomains

func BlacklistDomains(in []string) ConfigOption

func Delay

func Delay(n time.Duration) ConfigOption

func HTTPExtraHeaders

func HTTPExtraHeaders(h []http.Header) ConfigOption

func MaxConnPerHost

func MaxConnPerHost(v int) ConfigOption

func MaxParallelHosts

func MaxParallelHosts(v int) ConfigOption

func MaxRedirects

func MaxRedirects(n int) ConfigOption

func MaxTimeout

func MaxTimeout(n time.Duration) ConfigOption

func ReadBody

func ReadBody(v bool) ConfigOption

func ReadHeaders

func ReadHeaders(v bool) ConfigOption

func SetPreflightCheckRoutes

func SetPreflightCheckRoutes(r []*http.Route) ConfigOption

func SkipPreflight

func SkipPreflight(enabled bool) ConfigOption

SkipPreflight will zero out the preflight check routes

func TargetQuarantineThreshold

func TargetQuarantineThreshold(n int64) ConfigOption

func WildcardDetection

func WildcardDetection(enabled bool) ConfigOption

type ContentLengthValidator

type ContentLengthValidator struct {
	IgnoreRanges []http.Range
}

func NewContentLengthValidator

func NewContentLengthValidator(ranges []http.Range) *ContentLengthValidator

func (ContentLengthValidator) String

func (v ContentLengthValidator) String() string

func (*ContentLengthValidator) Validate

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine provides a scan configuration with a set of routes and a specified configuration Calling Run or RunAsync can be done concurrently. Each call will create its own threadpool If you wish to access one threadpool from multiple workers, use RunAsync and communicate using the provided channels The options are non-configurable after instantiation as modifying the routes or config during Run or RunAsync may lead to non-deterministic behaviour

func NewEngine

func NewEngine(routes http.RouteMap, opts ...ConfigOption) *Engine

NewEngine will create an engine with the defined routes and options. If you require a different set of routes, you should instantiate a new engine

func (*Engine) Config

func (e *Engine) Config() *Config

Config returns the config for the engine. Modifying this config will modify the config for any currently running scans

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, input []*http.Target) ([]*Result, error)

Run will perform the same operation as RunAsync. This wraps the channels with allocated structs and returns the results This is safe to call from concurrent threads and will use separate worker pools for each call. your callback will be invoked on each result received so you can asynchronously process the results if you wish All the results will still be returned by the []*Result slice. Modifying the result in the callback is considered undefined behaviour

func (*Engine) RunAsync

func (e *Engine) RunAsync(ctx context.Context) (tx chan *http.Target, rx chan *Result, err error)

RunAsync will begin all the concurrent threads for scanning. Inputs should be fed to the tx (to transmit) and results are read off rx (receive). This function can fail if the config provided to the engine is invalid Each call instantiates its own set of workers, tx, and rx. Hence this is safe to call concurrently The Engine will terminate when the context is cancelled, or when the tx channel is closed When the rx channel is closed, all results have been returned. The caller closing rx may panic and is considered unexpected behaviour

func (*Engine) RunCallback

func (e *Engine) RunCallback(ctx context.Context, input []*http.Target, cb ...func(r *Result, c *Config)) ([]*Result, error)

RunCallback will run the scan against the provided input, calling the provided callbacks on each result. the callbacks can be used to log the error in realtime, or perform other processes. You should not modify or use the Target, or route from the Result as this may have unintended side effects

func (*Engine) RunCallbackNoResult

func (e *Engine) RunCallbackNoResult(ctx context.Context, input []*http.Target, cb ...func(r *Result, c *Config)) error

RunCallbackNoResult will run the scan against the provided input, calling the provided callbacks on each result. the callbacks can be used to log the error in realtime, or perform other processes. You should not modify or use the Target, or route from the Result as this may have unintended side effects This function does not return the results as they are released immediately after all callbacks are called. It is unsafe to use the result after your callbacks return Use this when you don't require using the result after the callback, e.g. writing to disk/printing to output

type ErrBadConfig

type ErrBadConfig struct {
	// contains filtered or unexported fields
}

func (*ErrBadConfig) Error

func (e *ErrBadConfig) Error() string

type ErrFailedPreflight

type ErrFailedPreflight struct {
	// contains filtered or unexported fields
}

func (ErrFailedPreflight) Error

func (e ErrFailedPreflight) Error() string

func (ErrFailedPreflight) Unwrap

func (e ErrFailedPreflight) Unwrap() error

type KnownBadSitesValidator

type KnownBadSitesValidator struct{}

func (*KnownBadSitesValidator) Validate

func (v *KnownBadSitesValidator) Validate(r http.Response, wildcardResponses []WildcardResponse, c *Config) error

type NullProgressBar

type NullProgressBar struct {
	// contains filtered or unexported fields
}

func (*NullProgressBar) AddTotal

func (n *NullProgressBar) AddTotal(v int64)

func (*NullProgressBar) Incr

func (n *NullProgressBar) Incr(v int64)

type ProgressBar

type ProgressBar interface {
	Incr(n int64)
	AddTotal(n int64)
}

type ReqMsg

type ReqMsg struct {
	Preflight *subpathBaseline
	Job       *job
	// contains filtered or unexported fields
}

type ReqMsgType

type ReqMsgType int
const (
	PreflightMsg ReqMsgType = iota
	RequestMsg
)

type RequestValidator

type RequestValidator interface {
	Validate(r http.Response, wildcardResponses []WildcardResponse, c *Config) error
}

RequestValidator is an interface that lets you add custom validators for what are good and bad responses

type Result

type Result struct {
	Target   *http.Target
	Route    *http.Route
	Response http.Response
}

func AcquireResult

func AcquireResult() *Result

AcquireResult retrieves a host from the shared header pool

func (*Result) AppendBytes

func (r *Result) AppendBytes(b []byte) []byte

<METHOD> <STATUSCODE> <URL> [redirects ...]

func (*Result) AppendPrettyBytes

func (r *Result) AppendPrettyBytes(b []byte) []byte

<METHOD> <STATUSCODE> [lines, words, lines] <URL> [redirects ...]

func (*Result) Release

func (r *Result) Release()

Release will place the called result back into the pool. After release it is not safe for use

func (*Result) String

func (r *Result) String() string

type StatusCodeBlacklist

type StatusCodeBlacklist struct {
	Codes map[int]interface{}
}

func NewStatusCodeBlacklist

func NewStatusCodeBlacklist(valid []int) *StatusCodeBlacklist

func (StatusCodeBlacklist) String

func (v StatusCodeBlacklist) String() string

func (*StatusCodeBlacklist) Validate

type StatusCodeWhitelist

type StatusCodeWhitelist struct {
	Codes map[int]interface{}
}

func NewStatusCodeWhitelist

func NewStatusCodeWhitelist(valid []int) *StatusCodeWhitelist

func (StatusCodeWhitelist) String

func (v StatusCodeWhitelist) String() string

func (*StatusCodeWhitelist) Validate

type WildcardResponse

type WildcardResponse struct {
	DefaultStatusCode     int
	DefaultContentLength  int
	AdjustedContentLength int // adjustedContentLength is the content length adjusted for the length of the requested path
	AdjustmentScale       int // number of times the requested path appears in the request
	DefaultWordCount      int // number of spaces + 1
	DefaultLineCount      int // number of newlines + 1

}

type WildcardResponseValidator

type WildcardResponseValidator struct{}

func (*WildcardResponseValidator) Validate

func (v *WildcardResponseValidator) Validate(r http.Response, wildcardResponses []WildcardResponse, c *Config) error

type WildcardResponses

type WildcardResponses []WildcardResponse

func (WildcardResponses) UniqueAdd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL