blobproc

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: MIT Imports: 27 Imported by: 0

README

BLOBPROC

Queues like it's 1995!

Build

Build binaries:

$ make

Create a debian package:

$ make deb

Release a new version:

$ make update-version V=1.2.3                    # updates VERSION and version.go
$ git commit -am "v1.2.3" && git tag v1.2.3      # tag needs this format for CI!
$ git push origin main && git push origin --tags # CI build deb, uploads it to nexus

Background

BLOBPROC is a less kafkaesque version of PDF postprocessing found in sandcrawler, which is part of IA Scholar infra. Specifically it is designed to process and persist documents with minimum number of external components and little to no state.

The goal is to have artifacts (fulltext, thumbnails, metadata, ...) derived from millions of PDF files available in a storage system (e.g. S3). In the best case, the artifacts can be kept up to date in an unattended way.

BLOBPROC currently ships with two cli programs:

  • blobprocd exposes an HTTP server that can receive binary data and stores it in a spool folder (maybe a better name would be blob-spoold)
  • blobproc is a process that scans the spool folder and executes post processing tasks on each PDF, and removes the file from spool, if a best-effort-style processing of the file is done (periodically called by a systemd timer) (this is a one off command, not a server)

In our case pdf data may come from:

  • Heritrix crawl, via a ScriptedProcessor
  • (wip) a WARC file, a crawl collection or similar
  • in general, by any process that can deposit a file in the spool folder or send an HTTP request to blobprocd

In our case blobproc will execute the following tasks:

  • send PDF to GROBID and store the result in S3, using grobidclient Go library
  • generate text from PDF via pdftotext and store the result in S3 (seaweedfs)
  • generate a thumbnail from PDF via pdftoppm and store the result in S3 (seaweedfs)
  • find all weblinks in the PDF text and send them to a crawl API (wip)

More tasks can be added by extending blobproc itself. A focus remains on simple deployment via an OS distribution package. By pushing various parts into library functions (or external packages like grobidclient), the main processing routine shrinks to about 100 lines of code (as of 08/2024). Currently both blobproc and blobprocd run on a dual-core 2nd gen XEON with 24GB of RAM; blobprocd received up to 100 rps and wrote pdfs to rotational disk.

Bulk, back-of-the-envelope, reprocessing

Currently, about 5 pdfs/s. GROBID may be able to handle up to 10 pdfs/s. To reprocess, say 200M pdfs in less than a month, we would need about 10 GROBID instances.

Mode of operation

  • receive blob over HTTP, may be heritrix, curl, some backfill process
  • regularly scan spool dir and process found files

Usage

Server component.

$ blobproc serve --help
Start an HTTP server that receives binary PDF data via POST or PUT
requests and stores them in the spool folder for later processing.

The server provides the following endpoints:
  POST/PUT /spool    - Upload a PDF blob
  GET /spool         - List spool contents
  GET /spool/{id}    - Get status of a specific spool item

Usage:
  blobproc serve [flags]

Flags:
      --access-log string         access log file (empty = discard)
      --addr string               server listen address (default "0.0.0.0:8000")
  -h, --help                      help for serve
      --server-timeout duration   server read/write timeout (default 15s)
      --urlmap-file string        URL map database file (empty = disabled)
      --urlmap-header string      HTTP header for URL mapping (default "X-Original-URL")

Global Flags:
      --config string              config file (searches: ./blobproc.yaml, /home/tir/.config/blobproc/blobproc.yaml, /etc/blobproc/blobproc.yaml)
      --debug                      enable debug logging
      --grobid-host string         GROBID host URL (default "http://localhost:8070")
      --grobid-max-filesize int    max file size for GROBID in bytes (default 268435456)
      --grobid-timeout duration    GROBID request timeout (default 30s)
      --log-file string            log file path (empty = stderr)
      --s3-access-key string       S3 access key (default "minioadmin")
      --s3-default-bucket string   S3 default bucket (default "sandcrawler")
      --s3-endpoint string         S3 endpoint (default "localhost:9000")
      --s3-secret-key string       S3 secret key (default "minioadmin")
      --s3-use-ssl                 use SSL for S3 connections
      --spool-dir string           spool directory path (default "/home/tir/.local/share/blobproc/spool")
      --timeout duration           subprocess timeout (default 5m0s)

Processing command line tool.

$ blobproc run --help
Process all PDF files in the spool directory, generating
derivatives and storing them in S3. This is the main processing mode.

Usage:
  blobproc run [flags]

Flags:
  -h, --help          help for run
  -k, --keep          keep files in spool after processing
  -w, --workers int   number of parallel workers (1=sequential, >1=parallel) (default 4)

Global Flags:
      --config string              config file (searches: ./blobproc.yaml, /home/tir/.config/blobproc/blobproc.yaml, /etc/blobproc/blobproc.yaml)
      --debug                      enable debug logging
      --grobid-host string         GROBID host URL (default "http://localhost:8070")
      --grobid-max-filesize int    max file size for GROBID in bytes (default 268435456)
      --grobid-timeout duration    GROBID request timeout (default 30s)
      --log-file string            log file path (empty = stderr)
      --s3-access-key string       S3 access key (default "minioadmin")
      --s3-default-bucket string   S3 default bucket (default "sandcrawler")
      --s3-endpoint string         S3 endpoint (default "localhost:9000")
      --s3-secret-key string       S3 secret key (default "minioadmin")
      --s3-use-ssl                 use SSL for S3 connections
      --spool-dir string           spool directory path (default "/home/tir/.local/share/blobproc/spool")
      --timeout duration           subprocess timeout (default 5m0s)

Performance data points

The initial, unoptimized version would process about 25 pdfs/minute or 36K pdfs/day. We were able to crawl much faster than that, e.g. we reached 63G captured data (not all pdf) after about 4 hours. GROBID should be able to handle up to 10 pdfs/s.

A parallel walker could process about 300 pdfs/minute, and would match the inflow generated by one heritrix crawl node.

Scaling

  • tasks will run in parallel, e.g. text, thumbnail generation and grobid all run in parallel, but we process one file by one for now
  • we should be able to configure a pool of grobid hosts to send requests to

Backfill

  • point to CDX file, crawl collection or similar and have all PDF files sent to BLOBPROC, even if this may take days or weeks

TODO

  • for each file placed into spool, try to record the URL-SHA1 pair somewhere
  • pluggable write backend for testing, e.g. just log what would happen
  • log performance measures
  • grafana

ASCII

                      PDF SOURCES
                          │
          ┌───────────────┼───────────────┐
          │               │               │
      Heritrix      WARC Files        Manual/
      Crawler         │               curl/etc
          │         blobfetch              │
          │           │                    │
          │           ├─────┐              │
          │           │     │              │
          │           v     v              v
          │      ┌─────────────────────────┐
          └─────>│   blobproc serve        │
                 │  (HTTP endpoint)        │
                 │  :8000/upload           │
                 └──────────┬──────────────┘
                            │
                            v
                 ┌──────────────────────┐
                 │   SPOOL DIRECTORY    │
                 │  ~/.local/share/...  │
                 │   (file queue)       │
                 └──────────┬───────────┘
                            │
                            v
                 ┌──────────────────────┐
                 │   blobproc run       │<─── systemd timer
                 │  (batch processor)   │     (periodic)
                 └──────────┬───────────┘
                            │
              ┌─────────────┼─────────────┐
              │             │             │
              v             v             v
        ┌─────────┐   ┌─────────┐   ┌─────────┐
        │ GROBID  │   │pdftotext│   │pdftoppm │
        │ (XML)   │   │ (text)  │   │ (thumb) │
        └────┬────┘   └────┬────┘   └────┬────┘
             │             │             │
             └─────────────┼─────────────┘
                           │ (parallel)
                           v
                     ┌───────────┐
                     │ S3 Store  │
                     │(seaweedfs)│
                     └───────────┘
                           │
                           v
                      [Artifacts]
                    (fulltext.txt)
                    (metadata.xml)
                    (thumbnail.png)

Notes

This tool should cover most of the following areas from sandcrawler:

  • run_grobid_extract
  • run_pdf_extract
  • run_persist_grobid
  • run_persist_pdftext
  • run_persist_thumbnail

Including references workers.

Performance: Processing 1605 pdfs, 1515 successful, 2.23 docs/s, when processed in parallel, via fd ... -x - or about 200K docs per day.

real    11m0.767s
user    73m57.763s
sys     5m55.393s

Documentation

Index

Constants

View Source
const (
	DefaultURLMapHttpHeader = "X-BLOBPROC-URL"
	ExpectedSHA1Length      = 40
)
View Source
const Version = "0.4.0"

Version of library and cli tools.

Variables

View Source
var (
	ErrFileTooLarge = errors.New("file too large")
	ErrInvalidHash  = errors.New("invalid hash")
	DefaultBucket   = "sandcrawler" // DefaultBucket for S3
)

Functions

This section is empty.

Types

type BlobRequestOptions

type BlobRequestOptions struct {
	Folder  string
	Blob    []byte
	SHA1Hex string
	Ext     string
	Prefix  string
	Bucket  string
}

BlobRequestOptions wraps the blob request options, both for setting and retrieving a blob.

Currently used folder names:

- "pdf" for thumbnails - "xml_doc" for TEI-XML - "html_body" for HTML TEI-XML - "unknown" for generic

Default bucket is "sandcrawler-dev", other buckets via infra:

- "sandcrawler" for sandcrawler_grobid_bucket - "sandcrawler" for sandcrawler_text_bucket - "thumbnail" for sandcrawler_thumbnail_bucket

type BlobStore

type BlobStore struct {
	Client *minio.Client
}

BlobStore slightly wraps I/O around our S3 store with convenience methods.

func NewBlobStore

func NewBlobStore(endpoint string, opts *BlobStoreOptions) (*BlobStore, error)

NewBlobStore creates a new, slim wrapper around S3.

func (*BlobStore) GetBlob

func (bs *BlobStore) GetBlob(ctx context.Context, req *BlobRequestOptions) ([]byte, error)

GetBlob returns the object bytes given a blob request.

func (*BlobStore) PutBlob

func (bs *BlobStore) PutBlob(ctx context.Context, req *BlobRequestOptions) (*PutBlobResponse, error)

PutBlob puts data in to S3 with key derived from the given options. If the options do not contain the SHA1 of the content, it gets computed here. If no bucket name is given, a default bucket name is used. If the bucket does not exist, if gets created.

type BlobStoreOptions

type BlobStoreOptions struct {
	AccessKey     string
	SecretKey     string
	DefaultBucket string
	UseSSL        bool
}

BlobStoreOptions mostly contains pass through options for minio client. Keys from environment, e.g. ...BLOB_ACCESS_KEY

type LimitedReader

type LimitedReader struct {
	R        io.Reader
	N        int64
	MaxBytes int64
}

LimitedReader wraps an io.Reader and limits the number of bytes that can be read

func (*LimitedReader) Read

func (l *LimitedReader) Read(p []byte) (n int, err error)

type Payload

type Payload struct {
	Path     string
	FileInfo fs.FileInfo
}

Payload is what we pass to workers. Since the worker needs file size information, we pass it along, as the expensive stat has already been performed.

type ProcessPDFParams

type ProcessPDFParams struct {
	Path              string
	Size              int64
	Grobid            *grobidclient.Grobid
	S3                *BlobStore
	GrobidMaxFileSize int64
	Logger            *slog.Logger
}

ProcessPDFParams configures a single PDF processing run.

Grobid and S3 are both optional; a nil client causes the corresponding derivative step to be logged and skipped. Logger defaults to slog.Default().

type ProcessPDFResult

type ProcessPDFResult struct {
	SHA1Hex   string // sha1 of the input file
	Thumbnail []byte // page-0 JPEG thumbnail
	Text      string // extracted plain text
	TEI       []byte // GROBID TEI XML body
}

ProcessPDFResult collects the derivatives extracted from a PDF. Library callers can use this directly (e.g. with S3 = nil) instead of (or in addition to) the S3 uploads ProcessPDF performs when an S3 client is configured. Any field may be empty if the corresponding step was skipped or failed; consult the returned errors for details.

func ProcessPDF

func ProcessPDF(ctx context.Context, p ProcessPDFParams) (*ProcessPDFResult, []error)

ProcessPDF runs the full per-file pipeline against a PDF on disk: pdfextract for text + page-0 thumbnail, then GROBID for structured TEI. When an S3 client is configured each derivative is uploaded to its conventional bucket/folder; the same data is also returned in the result so callers can use ProcessPDF as a library function. The returned result is always non-nil, with fields populated as they become available. The errors slice collects every error encountered; an empty (or nil) slice means the run was fully successful. The caller is responsible for stats accounting and removing the file from the spool.

type PutBlobResponse

type PutBlobResponse struct {
	Bucket     string
	ObjectPath string
}

PutBlobResponse wraps a blob put request response.

type URLMap

type URLMap struct {
	Path string
	// contains filtered or unexported fields
}

URLMap wraps an sqlite3 database for URL and SHA1 lookups.

func (*URLMap) Close

func (u *URLMap) Close() error

Close closes the database connection.

func (*URLMap) EnsureDB

func (u *URLMap) EnsureDB() error

EnsureDB creates a new database with schema, if it is not already set up.

func (*URLMap) Insert

func (u *URLMap) Insert(url, sha1 string) error

Insert inserts a new pair into the database. We lock at the application level to avoid 'database is locked (5) (SQLITE_BUSY)'. This will return an error if the database has not been initialized before.

type WalkFast

type WalkFast struct {
	Dir               string
	NumWorkers        int
	KeepSpool         bool
	GrobidMaxFileSize int64
	Timeout           time.Duration
	Grobid            *grobidclient.Grobid
	S3                *BlobStore
	// contains filtered or unexported fields
}

WalkFast is a walker that runs postprocessing in parallel.

func (*WalkFast) Run

func (w *WalkFast) Run(ctx context.Context) error

Run start processing files. Do some basic sanity check before setting up workers as we do not have a constructor function.

type WalkStats

type WalkStats struct {
	Processed int64
	OK        int64
}

WalkStats are a poor mans metrics.

func (*WalkStats) SuccessRatio

func (ws *WalkStats) SuccessRatio() float64

SuccessRatio calculates the ration of successful to total processed files.

type WebSpoolService

type WebSpoolService struct {
	Dir        string
	ListenAddr string
	// TODO: add a (optional) reference to a store for url content hashes; it
	// would be good to keep it optional (so one may just copy files into the
	// spool folder), and maybe to provide a simple interface that can be
	// easily fulfilled by different backend.
	URLMap *URLMap
	// The HTTP header to look for a URL associated with a pdf blob payload.
	URLMapHttpHeader string
	// Minimum required free disk space percentage (default 10%)
	MinFreeDiskPercent int
	// Maximum allowed file size (default 0 = no limit)
	MaxFileSize int64
}

WebSpoolService saves web payload to a configured directory. TODO: add limit in size (e.g. 80% of disk or absolute value)

func (*WebSpoolService) BlobHandler

func (svc *WebSpoolService) BlobHandler(w http.ResponseWriter, r *http.Request)

BlobHandler receives binary blobs and saves them on disk. This handler returns as soon as the file has been written into the spool directory of the service, using a sharded SHA1 as path.

func (*WebSpoolService) SpoolListHandler

func (svc *WebSpoolService) SpoolListHandler(w http.ResponseWriter, r *http.Request)

SpoolListHandler returns a single, long jsonlines response with information about all files in the spool directory.

func (*WebSpoolService) SpoolStatusHandler

func (svc *WebSpoolService) SpoolStatusHandler(w http.ResponseWriter, r *http.Request)

SpoolStatusHandler returns HTTP 200, if a given file is in the spool directory and HTTP 404, if the file is not in the spool directory.

Directories

Path Synopsis
Package cdx wraps CDX records.
Package cdx wraps CDX records.
cmd
blobfetch command
blobfetch finds and fetches files from archive collections to be put into a spool folder for postprocessing.
blobfetch finds and fetches files from archive collections to be put into a spool folder for postprocessing.
blobproc command
Package dedent: https://github.com/lithammer/dedent
Package dedent: https://github.com/lithammer/dedent

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL