Back to godoc.org
rescribe.xyz/bookpipeline

Package bookpipeline

v0.2.4
Latest Go to latest
Published: May 29, 2020 | License: GPL3 | Module: rescribe.xyz/bookpipeline

Overview

The bookpipeline package contains various tools and functions for the OCR of books, with a focus on distributed OCR using short-lived virtual servers. It also contains several tools that are useful standalone; read the accompanying README for more details.

Introduction

The book pipeline is a way to split the different processes that for book OCR into small jobs, which can be processed when a computer is ready for them. It is currently implemented with Amazon's AWS cloud systems, and can scale from zero to many computers, with jobs being processed faster when more servers are available.

Central to the bookpipeline in terms of software is the bookpipeline command, which is part of the rescribe.xyz/bookpipeline package. Presuming you have the go tools installed, you can install it, and useful tools to control the system, with this command:

go get -u rescribe.xyz/bookpipeline/...

All of the tools provided in the bookpipeline package will give information on what they do and how they work with the '-h' flag, so for example to get usage information on the booktopipeline tool simply run the following:

booktopipeline -h

To get the pipeline tools to work for you, you'll need to change the settings in cloudsettings.go, and set up your ~/.aws/credentials appropriately.

Managing servers

Most of the time the bookpipeline is expected to be run from potentially short-lived servers on Amazon's EC2 system. EC2 provides servers which have no guaranteed of stability (though in practice they seem to be), called "Spot Instances", which we use for bookpipeline. bookpipeline can handle a process or server being suddenly destroyed without warning (more on this later), so Spot Instances are perfect for us. We have set up a machine image with bookpipeline preinstalled which will launch at bootup, which is all that's needed to launch an bookpipeline instance. Presuming the bookpipeline package has been installed on your computer (see above), the spot instance can be started with the command:

spotme

You can keep an eye on the servers (spot or otherwise) that are running, and the jobs left to do and in progress, with the "lspipeline" tool (which is also part of the bookpipeline package). It's recommended to use this with the ssh private key for the servers, so that it can also report on what each server is currently doing, but it can run successfully without it. It takes a little while to run, so be patient. It can be run with the command:

lspipeline -i key.pem

Spot instances can be terminated with ssh, using their ip address which can be found with lspipeline, like so:

ssh -i key.pem admin@<ip-address> sudo poweroff

The bookpipeline program is run as a service managed by systemd on the servers. The system is fully resiliant in the face of unexpected failures. See the section "How the pipeline works" for details on this. bookpipeline can be managed like any other systemd service. A few examples:

# show all logs for bookpipeline:
ssh -i key.pem admin@<ip-address> journalctl -n all -u bookpipeline
# restart bookpipeline
ssh -i key.pem admin@<ip-address> systemctl restart bookpipeline

Using the pipeline

Books can be added to the pipeline using the "booktopipeline" tool. This takes a directory of page images as input, and uploads them all to S3, adding a job to the pipeline queue to start processing them. So it can be used like this:

booktopipeline -v ExcellentBook/

Getting a finished book

Once a book has been finished, it can be downloaded using the "getpipelinebook" tool. This has several options to download specific parts of a book, but the default case will download the best hOCR for each page, PDFs, and the best, conf and graph.png files. Use it like this:

getpipelinebook ExcellentBook

To get the plain text from the book, use the hocrtotxt tool, which is part of the rescribe.xyz/utils package. You can get the package, and run the tool, like this:

go get -u rescribe.xyz/utils/...
hocrtotext ExcellentBook/0010_bin0.2.hocr > ExcellentBook/0010_bin0.2.txt

How the pipeline works

The central part of the book pipeline is several SQS queues, which contain jobs which need to be done by a server running bookpipeline. The exact content of the SQS messages vary according to each queue, as some jobs need more information than others. Each queue is checked at least once every couple of minutes on any server that isn't currently processing a job.

When a job is taken from the queue by a process, it is hidden from the queue for 2 minutes so that no other process can take it. Once per minute when processing a job the process sends a message updating the queue, to tell it to keep the job hidden for two minutes. This is called the "heartbeat", as if the process fails for any reason the heartbeat will stop, and in 2 minutes the job will reappear on the queue for another process to have a go at. Once a job is completed successfully it is deleted from the queue.

Queues

Queue names are defined in cloudsettings.go.

queuePreProc

Each message in the queuePreProc queue is a bookname, optionally followed by a space and the name of the training to use. Each page of the bookname will be binarised with several different parameters, and then wiped, with each version uploaded to S3, with the path of the preprocessed page, plus the training name if it was provided, will be added to the queueOcrPage queue. The pages are binarised with different parameters as it can be difficult to determine which binarisation level will be best prior to OCR, so several different options are used, and in the queueAnalyse step the best one is chosen, based on the confidence of the OCR output.

example message: APolishGentleman_MemoirByAdamKruczkiewicz
example message: APolishGentleman_MemoirByAdamKruczkiewicz rescribelatv7

queueWipeOnly

This queue works the same as queuePreProc, except that it doesn't binarise the pages, only runs the wiper. Hence it is designed for books which have been prebinarised.

example message: APolishGentleman_MemoirByAdamKruczkiewicz
example message: APolishGentleman_MemoirByAdamKruczkiewicz rescribefrav2

queueOcrPage

This queue contains the path of individual pages, optionally followed by a space and the name of the training to use. Each page is OCRed, and the results are uploaded to S3. After each page is OCRed, a check is made to see whether all pages that look like they were preprocessed have corresponding .hocr files. If so, the bookname is added to the queueAnalyse queue.

example message: APolishGentleman_MemoirByAdamKruczkiewicz/00162_bin0.0.png
example message: APolishGentleman_MemoirByAdamKruczkiewicz/00162_bin0.0.png rescribelatv7

queueAnalyse

A message on the queueAnalyse queue contains only a book name. The confidences for each page are calculated and saved in the 'conf' file, and the best version of each page is decided upon and saved in the 'best' file. PDFs are then generated, and the confidence graph is generated.

example message: APolishGentleman_MemoirByAdamKruczkiewicz

Queue manipulation

The queues should generally only be messed with by the bookpipeline and booktopipeline tools, but if you're feeling ambitious you can take a look at the `addtoqueue` tool.

Remember that messages in a queue are hidden for a few minutes when they are read, so for example you couldn't straightforwardly delete a message which was currently being processed by a server, as you wouldn't be able to see it.

Page naming

At present the bookpipeline has some silly limitations of file names for book pages to be recognised. This is something which will be fixed in due course.

Pages that are to be fully processed: *[0-9]{4}.jpg$
Pages that are to be wiped only: *[0-9]{6}(.bin)?.png$

Index

Package Files

  • aws.go
  • cloudsettings.go
  • doc.go
  • fonts.go
  • graph.go
  • local.go
  • pdf.go

func Graph

func Graph(confs map[string]*Conf, bookname string, w io.Writer) error

Graph creates a graph of the confidence of each page in a book

func GraphOpts

func GraphOpts(confs map[string]*Conf, bookname string, xaxis string, guidelines bool, w io.Writer) error

Graph creates a graph of confidences

type AwsConn

type AwsConn struct {
	// these should be set before running Init(), or left to defaults
	Region string
	Logger *log.Logger
	// contains filtered or unexported fields
}

AwsConn contains the necessary things to interact with various AWS services in ways useful for the bookpipeline package. It is designed to be generic enough to swap in other backends easily.

func (*AwsConn) AddToQueue

func (a *AwsConn) AddToQueue(url string, msg string) error

func (*AwsConn) AnalyseQueueId

func (a *AwsConn) AnalyseQueueId() string

func (*AwsConn) CheckQueue

func (a *AwsConn) CheckQueue(url string, timeout int64) (Qmsg, error)

func (*AwsConn) CreateBucket

func (a *AwsConn) CreateBucket(name string) error

CreateBucket creates a new S3 bucket

func (*AwsConn) CreateQueue

func (a *AwsConn) CreateQueue(name string) error

CreateQueue creates a new SQS queue Note the queue attributes are currently hardcoded; it may make sense to specify them as arguments in the future.

func (*AwsConn) DelFromQueue

func (a *AwsConn) DelFromQueue(url string, handle string) error

func (*AwsConn) Download

func (a *AwsConn) Download(bucket string, key string, path string) error

func (*AwsConn) GetInstanceDetails

func (a *AwsConn) GetInstanceDetails() ([]InstanceDetails, error)

func (*AwsConn) GetLogger

func (a *AwsConn) GetLogger() *log.Logger

func (*AwsConn) GetQueueDetails

func (a *AwsConn) GetQueueDetails(url string) (string, string, error)

GetQueueDetails gets the number of in progress and available messages for a queue. These are returned as strings.

func (*AwsConn) Init

func (a *AwsConn) Init() error

Init initialises aws services, also finding the urls needed to address SQS queues directly.

func (*AwsConn) ListObjectPrefixes

func (a *AwsConn) ListObjectPrefixes(bucket string) ([]string, error)

func (*AwsConn) ListObjects

func (a *AwsConn) ListObjects(bucket string, prefix string) ([]string, error)

func (*AwsConn) ListObjectsWithMeta

func (a *AwsConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, error)

func (*AwsConn) Log

func (a *AwsConn) Log(v ...interface{})

Log records an item in the with the Logger. Arguments are handled as with fmt.Println.

func (*AwsConn) LogAndPurgeQueue

func (a *AwsConn) LogAndPurgeQueue(url string) error

func (*AwsConn) MinimalInit

func (a *AwsConn) MinimalInit() error

MinimalInit does the bare minimum to initialise aws services

func (*AwsConn) MkPipeline

func (a *AwsConn) MkPipeline() error

mkpipeline sets up necessary buckets and queues for the pipeline TODO: also set up the necessary security group and iam stuff

func (*AwsConn) OCRPageQueueId

func (a *AwsConn) OCRPageQueueId() string

func (*AwsConn) PreQueueId

func (a *AwsConn) PreQueueId() string

func (*AwsConn) QueueHeartbeat

func (a *AwsConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error)

QueueHeartbeat updates the visibility timeout of a message. This ensures that the message remains "in flight", meaning that it cannot be seen by other processes, but if this process fails the timeout will expire and it will go back to being available for any other process to retrieve and process.

SQS only allows messages to be "in flight" for up to 12 hours, so this will detect if the request for an update to visibility timeout fails, and if so will attempt to find the message on the queue, and return it, as the handle will have changed.

func (*AwsConn) StartInstances

func (a *AwsConn) StartInstances(n int) error

func (*AwsConn) Upload

func (a *AwsConn) Upload(bucket string, key string, path string) error

func (*AwsConn) WIPStorageId

func (a *AwsConn) WIPStorageId() string

func (*AwsConn) WipeQueueId

func (a *AwsConn) WipeQueueId() string

type Conf

type Conf struct {
	Path, Code string
	Conf       float64
}

type Fpdf

type Fpdf struct {
	// contains filtered or unexported fields
}

Fpdf abstracts the gofpdf.Fpdf adding some useful methods

func (*Fpdf) AddPage

func (p *Fpdf) AddPage(imgpath, hocrpath string, smaller bool) error

AddPage adds a page to the pdf with an image and (invisible) text from an hocr file

func (*Fpdf) Save

func (p *Fpdf) Save(path string) error

Save saves the PDF to the file at path

func (*Fpdf) Setup

func (p *Fpdf) Setup() error

Setup creates a new PDF with appropriate settings and fonts

type GraphConf

type GraphConf struct {
	Pgnum, Conf float64
}

type InstanceDetails

type InstanceDetails struct {
	Id, Name, Ip, Spot, Type, State, LaunchTime string
}

type LocalConn

type LocalConn struct {
	// these should be set before running Init(), or left to defaults
	TempDir string
	Logger  *log.Logger
}

LocalConn is a simple implementation of the pipeliner interface that doesn't rely on any "cloud" services, instead doing everything on the local machine. This is particularly useful for testing.

func (*LocalConn) AddToQueue

func (a *LocalConn) AddToQueue(url string, msg string) error

AddToQueue adds a message to a queue

func (*LocalConn) AnalyseQueueId

func (a *LocalConn) AnalyseQueueId() string

func (*LocalConn) CheckQueue

func (a *LocalConn) CheckQueue(url string, timeout int64) (Qmsg, error)

CheckQueue checks for any messages in a queue

func (*LocalConn) DelFromQueue

func (a *LocalConn) DelFromQueue(url string, handle string) error

DelFromQueue deletes a message from a queue

func (*LocalConn) Download

func (a *LocalConn) Download(bucket string, key string, path string) error

Download just copies the file from TempDir/bucket/key to path

func (*LocalConn) GetLogger

func (a *LocalConn) GetLogger() *log.Logger

func (*LocalConn) GetQueueDetails

func (a *LocalConn) GetQueueDetails(url string) (string, string, error)

GetQueueDetails gets the number of in progress and available messages for a queue. These are returned as strings.

func (*LocalConn) Init

func (a *LocalConn) Init() error

Init just does the same as MinimalInit

func (*LocalConn) ListObjects

func (a *LocalConn) ListObjects(bucket string, prefix string) ([]string, error)

func (*LocalConn) ListObjectsWithMeta

func (a *LocalConn) ListObjectsWithMeta(bucket string, prefix string) ([]ObjMeta, error)

func (*LocalConn) Log

func (a *LocalConn) Log(v ...interface{})

Log records an item in the with the Logger. Arguments are handled as with fmt.Println.

func (*LocalConn) MinimalInit

func (a *LocalConn) MinimalInit() error

MinimalInit does the bare minimum initialisation

func (*LocalConn) OCRPageQueueId

func (a *LocalConn) OCRPageQueueId() string

func (*LocalConn) PreQueueId

func (a *LocalConn) PreQueueId() string

func (*LocalConn) QueueHeartbeat

func (a *LocalConn) QueueHeartbeat(msg Qmsg, qurl string, duration int64) (Qmsg, error)

QueueHeartbeat is a no-op with LocalConn

func (*LocalConn) Upload

func (a *LocalConn) Upload(bucket string, key string, path string) error

Upload just copies the file from path to TempDir/bucket/key

func (*LocalConn) WIPStorageId

func (a *LocalConn) WIPStorageId() string

func (*LocalConn) WipeQueueId

func (a *LocalConn) WipeQueueId() string

type ObjMeta

type ObjMeta struct {
	Name string
	Date time.Time
}

type Qmsg

type Qmsg struct {
	Id, Handle, Body string
}
Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier