lib

package
v0.0.0-...-1d1c70e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2019 License: GPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoBadgerConfig = fmt.Errorf("badger is not configured")

ErrNoBadgerConfig is the result of attempting to connect to a badgerDB without one configured

View Source
var (
	// ErrNotFound is a standard error for a missing thing
	ErrNotFound = fmt.Errorf("not found")
)
View Source
var VersionNumber = "0.1.0-dev"

VersionNumber is the current semver of the walk package

Functions

func JSONCoordinatorConfigFromFilepath

func JSONCoordinatorConfigFromFilepath(path string) func(*CoordinatorConfig)

JSONCoordinatorConfigFromFilepath returns a func that reads a json-encoded config if the file specified by filepath exists, failing silently if no file is present

func NewRecordRedirectClient

func NewRecordRedirectClient(coord Coordinator) *http.Client

NewRecordRedirectClient creates a http client with a custom checkRedirect function that creates records of Redirects & sends them to the coordinator

func NormalizeURL

func NormalizeURL(u *url.URL) string

NormalizeURL canonicalizes a URL

func NormalizeURLString

func NormalizeURLString(urlstr string) (string, error)

NormalizeURLString canonicalizes a URL

func SetLogLevel

func SetLogLevel(level string)

SetLogLevel sets the amount of logging output the library produces

Types

type BadgerConfig

type BadgerConfig struct {
	// TODO - make badger configurable :/
	badger.Options
}

BadgerConfig configures the badger instance for walk

func NewBadgerConfig

func NewBadgerConfig() *BadgerConfig

NewBadgerConfig creates a badger configuration with default options

func (*BadgerConfig) DB

func (c *BadgerConfig) DB() (conn *badger.DB, err error)

DB returns a connection to Badger

type BadgerRequestStore

type BadgerRequestStore struct {
	// contains filtered or unexported fields
}

BadgerRequestStore implements the request store interface in badger

func NewBadgerRequestStore

func NewBadgerRequestStore(db *badger.DB) BadgerRequestStore

NewBadgerRequestStore creates a RequestStore from a badger.Db connection

func (BadgerRequestStore) GetRequest

func (rs BadgerRequestStore) GetRequest(urlstr string) (req *Request, err error)

GetRequest from the store by URL string

func (BadgerRequestStore) ListRequests

func (rs BadgerRequestStore) ListRequests(limit, offset int) (frc []*Request, err error)

ListRequests shows requests in the store TODO - THIS WILL ONLY WORK IF LIST EVERYTHING. MUST FIX listing should happen by lexographical URL order

func (BadgerRequestStore) PutRequest

func (rs BadgerRequestStore) PutRequest(r *Request) (err error)

PutRequest in the store

type CBORResourceFileReader

type CBORResourceFileReader struct {
	// contains filtered or unexported fields
}

CBORResourceFileReader is an implementation of Walk that reads from CBOR archives, implements the Walk interface

func NewCBORResourceFileReader

func NewCBORResourceFileReader(path string) (*CBORResourceFileReader, error)

NewCBORResourceFileReader creates a reader from a path to a walk, loading the cdxj index

func (*CBORResourceFileReader) FindIndex

func (cr *CBORResourceFileReader) FindIndex(url string) int

FindIndex returns the index position of a given url, -1 if not found

func (*CBORResourceFileReader) Get

func (cr *CBORResourceFileReader) Get(url string, t time.Time) (*Resource, error)

Get grabs an individual resource from the Walk

func (*CBORResourceFileReader) ID

func (cr *CBORResourceFileReader) ID() string

ID gives an identifier for this Walk. not garunteed to be unique

func (*CBORResourceFileReader) Index

func (cr *CBORResourceFileReader) Index(limit, offset int) (rsc []*Resource, err error)

Index gives a limit & Offset

func (*CBORResourceFileReader) Len

func (cr *CBORResourceFileReader) Len() int

Len returns the number of resources listed in the Walk

func (*CBORResourceFileReader) SortedIndex

func (cr *CBORResourceFileReader) SortedIndex(limit, offset int) (rsc []*Resource, err error)

SortedIndex returns an abbreviated list of records, assumes the values are sorted by SURT url

func (*CBORResourceFileReader) Timespan

func (cr *CBORResourceFileReader) Timespan() (start, stop time.Time)

Timespan gives the earliest & latest times this Walk covers

type CBORResourceFileWriter

type CBORResourceFileWriter struct {
	// contains filtered or unexported fields
}

CBORResourceFileWriter creates [multhash].cbor in a folder specified by basePath the file writer also writes a .cdxj index of the urls it recorded to basePath/index.cdxj

func NewCBORResourceFileWriter

func NewCBORResourceFileWriter(dir string) (*CBORResourceFileWriter, error)

NewCBORResourceFileWriter writes

func (*CBORResourceFileWriter) FinalizeResources

func (rh *CBORResourceFileWriter) FinalizeResources() error

FinalizeResources writes the index to it's destination writer

func (*CBORResourceFileWriter) HandleResource

func (rh *CBORResourceFileWriter) HandleResource(rsc *Resource)

HandleResource implements the ResourceHandler interface

func (*CBORResourceFileWriter) Type

func (rh *CBORResourceFileWriter) Type() string

Type implements ResourceHandler, distinguishing this RH as "CBOR" type

type Collection

type Collection interface {
	// Collections must implement the Walk interface, aggregating across all
	// Walks
	Walk
	// Walks gives access to the list of individual Walks
	Walks() ([]Walk, error)
}

Collection defines operations on a group of Walks

func NewCollection

func NewCollection(walks ...Walk) Collection

NewCollection creates a new collection from any number of walks

func NewCollectionFromConfig

func NewCollectionFromConfig(cfg *CollectionConfig) (Collection, error)

NewCollectionFromConfig creates a collection from a collection configuration currently it only supports creating CBOR walk readers from exact directories. in the future functionality should be expanded to write to places other than a local filesystem

type CollectionConfig

type CollectionConfig struct {
	// LocalDirs is a slice of locations on disk to check for walks
	LocalDirs []string
}

CollectionConfig configures the on-disk collection. There can be at most one collection per walk process

type Coordinator

type Coordinator interface {
	// NewJob creates a new job on this coordinator
	NewJob(confg *JobConfig) (*Job, error)
	// Jobs provides a list of jobs this Coordinator owns
	Jobs() ([]*Job, error)
	// Job fetches a single job from the coordinator
	Job(id string) (*Job, error)
	// StartJob Begins job execution
	StartJob(id string) error
	// Queue returns a channel of Requests, which contain urls that need
	// to be fetched & turned into one or more resources
	Queue() Queue
	// Coordinators must store a set of requests they've made
	RequestStore() RequestStore
	// Completed work is submitted to the Job by submitting one or more
	// constructed resources
	CompletedResources(rsc ...*Resource) error
	// Shutdown stopts the coordinator, closing any jobs it owns. this can take
	// some time (possibly minutes) to drain existing job queues & gracefully
	// terminate
	Shutdown() error
}

Coordinator can coordinate workers. Workers pull from the output Requests channel and post finished resources using the completed method. This is the minimum interface a worker should need to turn Requests into Resources

func NewCoordinator

func NewCoordinator(configs ...func(*CoordinatorConfig)) (coord Coordinator, err error)

NewCoordinator creates a coordinator

type CoordinatorConfig

type CoordinatorConfig struct {
	Badger       *BadgerConfig
	RequestStore *RequestStoreConfig
	Queue        *QueueConfig
	Collection   *CollectionConfig
	// UnfetchedScanFreqMilliseconds sets how often the crawler should scan the list of fetched
	// urls, checking links for unfetched urls. this "rehydrates" the crawler with urls that
	// might be missed while avoiding duplicate fetching. default value of 0 disables the check
	UnfetchedScanFreqMilliseconds int
}

CoordinatorConfig is the global configuration for all components of a walk

func ApplyCoordinatorConfigs

func ApplyCoordinatorConfigs(configs ...func(c *CoordinatorConfig)) *CoordinatorConfig

ApplyCoordinatorConfigs takes zero or more configuration functions to produce a single configuration

func DefaultCoordinatorConfig

func DefaultCoordinatorConfig() *CoordinatorConfig

DefaultCoordinatorConfig returns the default configuration for a worker

type Entry

type Entry struct {
	URL       string    `json:"url"`
	Title     string    `json:"title"`
	Timestamp time.Time `json:"timestamp"`
	Status    int       `json:"status"`
	Redirects []string  `json:"redirects,omitempty"`
	Resources []string  `json:"resources,omitempty"`
	Links     []string  `json:"links,omitempty"`
}

Entry is a subset of a resource relevant to a sitemap

func NewEntryFromResource

func NewEntryFromResource(r *Resource) *Entry

NewEntryFromResource pulls releveant values from a resource to create a Entry

type Job

type Job struct {
	// id for this crawl
	ID string
	// contains filtered or unexported fields
}

Job is the central reporting hub for a crawl. It's in charge of populating the queue & keeping up-to-date records in the fetch request store. workers post their completed work back to the Job, which sends the created resources to any registered resource handlers

func (*Job) Complete

func (c *Job) Complete()

Complete marks the job as finished

func (*Job) Config

func (c *Job) Config() *JobConfig

Config exposes the Job configuration

func (*Job) Errored

func (c *Job) Errored(err error)

Errored sets the current job state to errored & retains the error

func (*Job) Seeds

func (c *Job) Seeds() (seeds chan string, err error)

Seeds produces a channel of seed URLS to enqueue

func (*Job) Start

func (c *Job) Start() (err error)

Start kicks off coordinated fetching, seeding the queue & store & awaiting responses start will block until a signal is received on the stop channel, keep in mind a number of conditions can stop the crawler depending on configuration

type JobConfig

type JobConfig struct {
	// Seeds is a list of urls to seed the crawler with
	Seeds []string
	// SeedsPath is a filepath or URL to a newline-delimited list of seed URL strings
	SeedsPath string
	// If true, links from completed resources returned to the job will
	// be added to the queue (aka, crawling). Only links within the domains list
	// that don't match ignore patterns will be crawled
	Crawl bool
	// Domains is the list of domains to crawl. Only domains listed
	// in this list will be crawled
	Domains []string
	// Ignore is a list of url patterns to ignore
	IgnorePatterns []string
	// How frequently to check to see if a job is done, in milliseconds
	DoneScanMilli int
	// DelayMilli determines how long to wait between fetches for a given worker
	DelayMilli int
	// StopAfterEntries kills the crawler after a specified number of urls have
	// been visited. a value of 0 (the default) doesn't limit the number of entries
	StopAfterEntries int
	// StopUrl will stop the crawler after crawling a given URL
	StopURL string
	// BackoffResponseCodes is a list of response codes that when encountered will add
	// half the value of of CrawlDelayMilliseconds per request, slowing the crawl in response
	// every minute
	BackoffResponseCodes []int
	// MaxAttempts is the maximum number of times to try a url before giving up
	MaxAttempts int

	// Workers specifies configuration details for workers this job would like to
	// be sent to. The coordinator that orchestrates this job will take care of
	// worker allocation
	Workers []*WorkerConfig
	// ResourceHandler specifies where the results of completed requests should
	// be routed to. The coordinator that orchestarates this job will take care
	// of ResourceHandler allocation & routing
	ResourceHandlers []*ResourceHandlerConfig
}

JobConfig holds all Job configuration details

func DefaultJobConfig

func DefaultJobConfig() *JobConfig

DefaultJobConfig creates a job configuration

func JSONJobConfigFromFilepath

func JSONJobConfigFromFilepath(path string) (*JobConfig, error)

JSONJobConfigFromFilepath reads a job config from a json file

type JobStatus

type JobStatus uint8

JobStatus tracks the state of a job

const (
	// JobStatusNew indicates a newly-created job
	JobStatusNew JobStatus = iota
	// JobStatusRunning indicates a job is running
	JobStatusRunning
	// JobStatusPaused indicates a job is paused
	JobStatusPaused
	// JobStatusComplete indicates a job is finished
	JobStatusComplete
	// JobStatusErrored indicates a job is errored
	JobStatusErrored
)

func (JobStatus) String

func (js JobStatus) String() string

String implements the stringer interface for job Status

type LocalWorker

type LocalWorker struct {
	// contains filtered or unexported fields
}

LocalWorker is an in-process implementation of worker TODO - finish parallelism implementation

func NewLocalWorker

func NewLocalWorker(cfg *WorkerConfig) *LocalWorker

NewLocalWorker creates a LocalWorker with crawl configuration settings

func (*LocalWorker) SetDelay

func (w *LocalWorker) SetDelay(d time.Duration)

SetDelay configures the delay between requests

func (*LocalWorker) Start

func (w *LocalWorker) Start(coord Coordinator) error

Start the local worker reporting results to the given coordinator

func (*LocalWorker) Stop

func (w *LocalWorker) Stop() error

Stop the worker

type MemQueue

type MemQueue struct {
	OnPush func(r *Request)
	OnPop  func(r *Request)
	// contains filtered or unexported fields
}

MemQueue is an in-memory implementation of the Queue interface, with optional funcs for listening in on push & pop calls

func NewMemQueue

func NewMemQueue() *MemQueue

NewMemQueue initializes a new MemQueue

func (*MemQueue) Chan

func (q *MemQueue) Chan() (chan *Request, error)

Chan returns the queue structured as a go channel

func (*MemQueue) Len

func (q *MemQueue) Len() (int, error)

Len returns the number of Requests in the queue

func (*MemQueue) Pop

func (q *MemQueue) Pop() *Request

Pop removes a request from the queue TODO - consider implementing acknowledgement/confirmation for guaranteed delivery: when popping, move the item to a secondary queue, then delete it from that queue when acknowledgement happens or move it back to the main queue if you don’t get acknowledgement within a given timeframe because the worker died.

func (*MemQueue) Push

func (q *MemQueue) Push(r *Request)

Push adds a fetch request to the end of the queue

type MemRequestStore

type MemRequestStore struct {
	// lock protects access to urls domains map
	sync.Mutex
	// contains filtered or unexported fields
}

MemRequestStore is an in-memory implementation of a request store

func NewMemRequestStore

func NewMemRequestStore() *MemRequestStore

NewMemRequestStore creates a new

func (*MemRequestStore) GetRequest

func (m *MemRequestStore) GetRequest(urlstr string) (*Request, error)

GetRequest from the store by URL string

func (*MemRequestStore) ListRequests

func (m *MemRequestStore) ListRequests(limit, offset int) (frc []*Request, err error)

ListRequests shows requests in the store TODO - THIS WILL ONLY WORK IF LIST EVERYTHING. MUST FIX listing should happen by lexographical URL order

func (*MemRequestStore) PutRequest

func (m *MemRequestStore) PutRequest(r *Request) error

PutRequest in the store

type MemResourceHandler

type MemResourceHandler struct {
	Resources []*Resource
}

MemResourceHandler is an in-memory resource handler, it keeps resources in a simple slice

func (*MemResourceHandler) FinalizeResources

func (m *MemResourceHandler) FinalizeResources() error

FinalizeResources just makes sure MemResourceHandler gets to write down any final URLS before the Coordinator quits

func (*MemResourceHandler) HandleResource

func (m *MemResourceHandler) HandleResource(r *Resource)

HandleResource stores the resource in memory

func (*MemResourceHandler) Type

func (m *MemResourceHandler) Type() string

Type returns the type of handler

type Queue

type Queue interface {
	Push(*Request)
	Pop() *Request
	Len() (int, error)
	Chan() (chan *Request, error)
}

Queue is an interface for queing up Requests. it's expected that the queue operates in FIFO order, pushing requests that need processing onto one end popping requests off the other for processing

type QueueConfig

type QueueConfig struct {
	Type string
}

QueueConfig holds configuration details for a Queue

type Request

type Request struct {
	JobID  string
	URL    string
	Status RequestStatus
	// TODO - currently not in use
	FetchAfter    time.Time
	AttemptsMade  int
	PrevResStatus int
}

Request is a URL that needs to be turned into a resource through Fetching (request the URL & recording the response) Requests are held in stores, placed in queues, and consumed by workers

type RequestStatus

type RequestStatus int

RequestStatus enumerates all possible states a request can be in

const (
	// RequestStatusUnknown is the default state
	RequestStatusUnknown RequestStatus = iota
	// RequestStatusFetch indicates this Request still needs fetching
	RequestStatusFetch
	// RequestStatusQueued indicates this Request is queued for fetching
	RequestStatusQueued
	// RequestStatusRequesting indicates this Request is currently being fetched
	RequestStatusRequesting
	// RequestStatusDone indicates this Request has successfully completed
	RequestStatusDone
	// RequestStatusFailed indicates this request cannot be completed
	RequestStatusFailed
)

type RequestStore

type RequestStore interface {
	PutRequest(*Request) error
	GetRequest(URL string) (*Request, error)
	ListRequests(limit, offset int) ([]*Request, error)
}

RequestStore is the interface for storing requests by their URL string

type RequestStoreConfig

type RequestStoreConfig struct {
	Type string
}

RequestStoreConfig holds configuration details for a request store

type Resource

type Resource struct {
	// Unique identifier for the crawl job that created this resource
	JobID string `json:"jobID,omitempty"`
	// A Url is uniquely identified by URI string without
	// any normalization. Url strings must always be absolute.
	URL string `json:"url"`
	// Timestamp of completed request
	Timestamp time.Time `json:"timestamp,omitempty"`
	// RequestDuration is the time remote server took to transfer content
	RequestDuration time.Duration `json:"duration,omitempty"`
	// Returned HTTP status code
	Status int `json:"status,omitempty"`
	// Returned HTTP 'Content-Type' header
	ContentType string `json:"contentType,omitempty"`
	// Result of mime sniffing to GET response body, as detailed at https://mimesniff.spec.whatwg.org
	ContentSniff string `json:"contentSniff,omitempty"`
	// ContentLength in bytes, will be the header value if only a HEAD request has been issued
	// After a valid GET response, it will be set to the length of the returned response
	ContentLength int64 `json:"contentLength,omitempty"`
	// HTML Title tag attribute
	Title string `json:"title,omitempty"`
	// key-value slice of returned headers from most recent HEAD or GET request
	// stored in the form [key,value,key,value...]
	Headers []string `json:"headers,omitempty"`
	// Hash is a base58 encoded multihash of res.Body
	Hash string `json:"hash,omitempty"`
	// Links
	Links []string `json:"links,omitempty"`
	// RedirectTo speficies where this url redirects to, cannonicalized
	RedirectTo string `json:"redirectTo,omitempty"`
	// RedirectTo speficies where this url redirects from, cannonicalized
	RedirectFrom string `json:"redirectFrom,omitempty"`
	// Error contains any fetching error string
	Error string `json:"error,omitempty"`
	// contents of response body
	Body []byte `json:"body,omitempty"`
}

Resource is data associated with a given URL at a point in time

func (u *Resource) ExtractDocLinks(doc *goquery.Document) error

ExtractDocLinks extracts & stores a page's linked documents by selecting all a[href] links from a given qoquery document, using the receiver *Url as the base

func (*Resource) HandleResponse

func (u *Resource) HandleResponse(started time.Time, res *http.Response, recordHeaders bool) (err error)

HandleResponse populates a resource based on an HTTP response

func (*Resource) HeadersMap

func (u *Resource) HeadersMap() (headers map[string]string)

HeadersMap formats u.Headers (a string slice) as a map[header]value

func (*Resource) Meta

func (u *Resource) Meta() *Resource

Meta returns a shallow copy of the resource without body bytes

type ResourceFinalizer

type ResourceFinalizer interface {
	FinalizeResources() error
}

ResourceFinalizer is an opt-in interface for ResourceHandler Finalize is called when a crawl is concluded, giving handlers a chance to clean up, write files, etc.

type ResourceHandler

type ResourceHandler interface {
	Type() string
	HandleResource(*Resource)
}

ResourceHandler is the interface for doing stuff with a resource, usually just after it's been created

func NewResourceHandler

func NewResourceHandler(db *badger.DB, cfg *ResourceHandlerConfig) (ResourceHandler, error)

NewResourceHandler creates a ResourceHandler from a config

func NewResourceHandlers

func NewResourceHandlers(db *badger.DB, cfgs []*ResourceHandlerConfig) (rhs []ResourceHandler, err error)

NewResourceHandlers creates a slice of ResourceHandlers from a config

type ResourceHandlerConfig

type ResourceHandlerConfig struct {
	Type string
	// SrcPath is the path to an input site file from a previous crawl
	SrcPath string
	// DstPath is the path to the output site file
	DstPath string
	// Prefix implements any namespacing for this config
	// not used by all ResourceHandlers
	Prefix string
}

ResourceHandlerConfig holds configuration details for a resource handler

type Sitemap

type Sitemap map[string]*Entry

Sitemap is a list of entries

type SitemapGenerator

type SitemapGenerator struct {
	// contains filtered or unexported fields
}

SitemapGenerator records resource reponses in a badgerDB key/value store and can create JSON output of the desired

func NewSitemapGenerator

func NewSitemapGenerator(prefix, dstPath string, db *badger.DB) *SitemapGenerator

NewSitemapGenerator creates a Sitemapgenerator from a given prefix & badger.DB connection

func (*SitemapGenerator) FinalizeResources

func (g *SitemapGenerator) FinalizeResources() error

FinalizeResources writes a json sitemap file to outpath

func (*SitemapGenerator) Generate

func (g *SitemapGenerator) Generate(path string) error

Generate creates a json sitemap file at the specified path

func (*SitemapGenerator) HandleResource

func (g *SitemapGenerator) HandleResource(r *Resource)

HandleResource implements ResourceHandler to add sitemap

func (*SitemapGenerator) Type

func (g *SitemapGenerator) Type() string

Type implements ResourceHandler, distinguishing this RH as "SITEMAP" type

type TimedCmd

type TimedCmd struct {
	JobID   string
	U       *url.URL
	M       string
	Started time.Time
}

TimedCmd defines a Command implementation that sets an internal timestamp whenever it's URL method is called

func NewTimedGet

func NewTimedGet(jobID, rawurl string) (*TimedCmd, error)

NewTimedGet creates a new GET command with an internal Timer

func (*TimedCmd) Method

func (c *TimedCmd) Method() string

Method returns the HTTP verb to use to process this command (i.e. "GET", "HEAD", etc.).

func (*TimedCmd) URL

func (c *TimedCmd) URL() *url.URL

URL returns the resource targeted by this command.

type Walk

type Walk interface {
	// Len returns the number of resources in the Walk
	Len() int
	// ID is an identifier for this Walk
	ID() string
	// SortedIndex returns an abbreviated list of records, assumes
	// the values are sorted by SURT url
	SortedIndex(limit, offset int) ([]*Resource, error)
	// FindIndex returns the porition within the index of a given url string, -1
	// if it doesn't exist
	FindIndex(url string) int
	// Get returns a resource for a given URL
	Get(url string, t time.Time) (*Resource, error)
	// Timespan returns the earliest & latest dates this Walk contains
	Timespan() (start, stop time.Time)
}

Walk are read-only operations on the result of a walk

type Worker

type Worker interface {
	SetDelay(time.Duration)
	Start(coord Coordinator) error
	Stop() error
}

Worker is the interface turning Requests into Resources by performing fetches

func NewWorker

func NewWorker(cfg *WorkerConfig) (w Worker, err error)

NewWorker creates a new worker for a given configuration

func NewWorkers

func NewWorkers(wc []*WorkerConfig) (ws []Worker, err error)

NewWorkers creates a slice of Workers from a slice of Worker configs

type WorkerConfig

type WorkerConfig struct {
	Parallelism int
	Type        string
	DelayMilli  int
	// Polite is weather or not to respect robots.txt
	Polite bool
	// RecordResponseHeaders sets weather or not to keep a map of response headers
	RecordResponseHeaders bool
	// RecordRedirects specifies weather redirects should be recorded as redirects
	RecordRedirects bool
	UserAgent       string
}

WorkerConfig holds configuration details for a request store

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL