ingest

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2022 License: Apache-2.0 Imports: 21 Imported by: 0

README

ingest

The ingest CLI computes summary statistics from a collection of CSV files access via URLs.

The program read records consisting of "fname", "lname" and "age" in CSV format from a set of given URLs.

Disclaimer

I wanted to have fun with this exercise, so I made some interesting choices that are somewhat atypical of these types of assignments. I chose to use sqlite instead of writing yet another one off thread safe user store. I have been wanting to experiment with sqlite for some time, and I used this exercise to have some fun with it. I also have been wanting to experiment using golang-migrate to initialize a db schema during program initialization. Since the database is running in memory, this was the perfect opportunity. Using sqlite also made the computation of the average and median simple.

I enjoyed playing with the encoding/csv package from the stdlib. This was my first exposure to that package. This was also the my first time using the semaphore package. I used this instead of sync.Waitgroup, since the semaphore could be canceled via context, whereas the sync.Waitgroup could not.

I also included a logging package that I had written prior to this assignment, which I found helpful when I wanted to debug.

Finally, I made some pieces of the code configurable via environment variables. Thus I added some configuration code using kelseyhightower/envconfig package. It's a useful package with no additional dependencies.

Todo

There are a number of unit tests, and some db code that I have marked as TODO. I am running out of time, and plan to get to them later. I think what I am submitting is a good example of the quality of my work.

Development Tools

GoLang
  • Install Go 1.18
  • make (preferably GNU Make 4.2 or newer)
  • sqlite3 command line tool (Optional)

Building

The make command will be default run the build, lint, and test commands.

make build

This command will build the binary, placing it in dist/ingest.

make lint

This command will run rudimentary linting on the source code.

make unit_test

This command will perform the unit tests.

make test

This command will run the unit test and the "integrated test", where the integrated tests are running the ingest command against know data files and printing the output to the console.

Expected output of make
> make

Building ingest...
CGO_ENABLED=1 go build -ldflags "-s -w" -o ./dist/ingest ./cmd/ingest

Running go fmt...
go fmt ./...

Running go mod tidy...
go mod tidy

Checking for changed files...
git status --porcelain


Running go test...
go test -cover -race ./...
ok      github.com/icahoon/ingest       0.054s  coverage: 72.8% of statements
ok      github.com/icahoon/ingest/cmd/ingest    (cached)        coverage: 83.9% of statements

Testing with sample data...

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file1.csv
  total users               1000                     
  average age               28.916000
  median age                29.000000
  person(s) with median age Brittany GALLEGOS / Jesus MEYERS
  go routines               1
  run time                  197.5546ms

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file2.csv
  total users               1000                     
  average age               30.115000
  median age                31.000000
  person(s) with median age Taylor STRICKLAND / Edgar POWERS
  go routines               1
  run time                  350.0616ms

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file3.csv
  total users               10000                    
  average age               29.570900
  median age                30.000000
  person(s) with median age Brittany CURTIS / Cindy MCGUIRE
  go routines               1
  run time                  436.7323ms

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file4.csv
  total users               1000                     
  average age               28.575000
  median age                28.000000
  person(s) with median age Taylor PALACIOS / Deanna VELEZ
  go routines               1
  run time                  364.0327ms

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file5.csv
  total users               1000                     
  average age               89.975000
  median age                90.000000
  person(s) with median age Brandy FAULKNER / Joanna CARPENTER
  go routines               1
  run time                  351.7848ms

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file6.csv
  total users               1999                     
  average age               59.439720
  median age                49.000000
  person(s) with median age Peter CAMPBELL
  go routines               1
  run time                  364.5945ms

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file7.csv
  total users               4                        
  average age               26.500000
  median age                26.500000
  person(s) with median age Mikayla SERRANO / Caitlin PARKER
  go routines               1
  run time                  176.4688ms

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file8.csv
  total users               5                        
  average age               12.000000
  median age                12.000000
  person(s) with median age Caitlin PARKER
  go routines               1
  run time                  170.0224ms

Users Summary
  urls                      https://raw.githubusercontent.com/icahoon/ingest/main/data/file1.csv, https://raw.githubusercontent.com/icahoon/ingest/main/data/file2.csv, https://raw.githubusercontent.com/icahoon/ingest/main/data/file3.csv, https://raw.githubusercontent.com/icahoon/ingest/main/data/file4.csv, https://raw.githubusercontent.com/icahoon/ingest/main/data/file5.csv
  total users               14000                    
  average age               33.806429
  median age                31.000000
  person(s) with median age Henry KRUEGER / David DAVID
  go routines               5
  run time                  207.3979ms

Failed to load data from url 'https://raw.githubusercontent.com/icahoon/ingest/main/data/file6_bad.csv'
Reason: csv header missing, expected 'fname, lname, age'

Failed to load data from url 'https://raw.githubusercontent.com/icahoon/ingest/main/data/file9_bad.csv'
Reason: record on line 5: wrong number of fields

Failed to load data from url 'http://foo.bar'
Reason: Get "http://foo.bar": dial tcp: lookup foo.bar on 172.17.32.1:53: no such host

Failed to load data from url 'https://arcadium.dev'
Reason: csv header missing, expected 'fname, lname, age'

Configuration

The tool can additional be configured via environment variables.

These variables a made availabe in the env file.

make test calls the run bash script, which loads the environment variables in env.

INGEST_DSN

This is the DSN used with sqlite.

The default setting is to use an in-memory database, file::memory:?cache=shared. See https://www.sqlite.org/inmemorydb.html for more information.

This can also be set to a location on disk to aid in debugging.

INGEST_LOG_LEVEL

The default is set to error.

The available options are debug, info, warn and error. Debug provides the most verbose output, progressing to error, which only provides output on internal errors.

INGEST_MAX_WORKERS

This is the number of concurrent go routines that are allowed to run processing input data. This defaults to the runtime.GOMAXPROCS.

From https://pkg.go.dev/runtime#GOMAXPROCS

The GOMAXPROCS variable limits the number of operating system threads that can execute user-level Go code simultaneously.

INGEST_TIMEOUT_IN_SECS

This is the number of seconds the program will wait before exiting when the go routines processing user data become unresponsive.

The default timeout is 300 seconds.

Additional Considerations

What assumptions did you make in your design? Why?

I placed more importance on designing this so it could be horizontally scaled, rather than on pure performance.

I have worked with cloud based applications that need to scale for a number of years, so I default to thinking about how something could horizontally scale. Implementing the datastore using database/sql is straightforward way to handle scaling from the small to very large. For a small scale problem, sqlite with an in memory store, can solve the problem and still be performant. For large scale problems, using sqlite and an on disk store, or perhaps moving to an external database like PostgreDB or CockroachDB is an option. Using database/sql solves the problem and provides flexibility for future enhancements.

How would you change your program if it had to process many files where each file was over 10M records?

Assuming that this requirement means storing the data in memory would be burdensome, I could write the database to a unique location on disk rather than in memory, and remove the db file after completion of the program. Optionally I could use a single location on disk, and provide an id for a set of user data. I could provide full CRUD for user data sets.

How would you change your program if it had to process data from more than 20K URLs?

I wouldn't input the URLs from the CLI. I would read them from a file or from stdin and send each URL as it was read to the ingestion layer.

I could change the ingestion layer to use a channel for input rather than using a slice of URLs. In that scenario, would mostly likely want to use semaphore to limit the size of the worker pool, where the pool would simply being the number of go routines executing at the same time to ingest the data.

I think the direction of the question is asking how the design would differ when you have a relatively small set of static inputs, versus a dynamic set of large inputs. I think in this situation you start thinking about having the worker poll be fed by a queue rather than a preallocated list of work. A simple queue could be implemented with a channel between go routines or at a larger scale, software like Redis, a database, RabbitMQ, SQS, Kafka, etc could be used, depending on the durability requirements of the data.

Documentation

Index

Constants

View Source
const (
	// LogLevelDebug provides the most verbose logging, allowing logs for debug, info,
	// warn, and error.
	LogLevelDebug LogLevel = iota

	// LogLevelInfo is the default log level. It allows logs for info, warn, and
	// error.
	LogLevelInfo

	// LogLevelWarn is reserved for logging warnings and errors.
	LogLevelWarn

	// LogLevelError provides the least verbose logging, allowing logs only for
	// errors.
	LogLevelError

	// LogLevelInvalid indicates and invalid log level.
	LogLevelInvalid

	// LogFormatJSON encodes each log entry as a single JSON object.
	LogFormatJSON LogFormat = iota

	// LogFormatLogfmt encodes each log entry in logfmt format.
	LogFormatLogfmt

	// LogFormatNop will suppress logging output entirely.
	LogFormatNop

	// LogFormatInvalid indicates an invalid log format.
	LogFormatInvalid
)
View Source
const (
	DefaultChunkSize = 100
)

Variables

View Source
var (
	// ErrInvalidLogLevel will be returned when the level given to the WirhLogLevel
	// option is invalid.
	ErrInvalidLogLevel = errors.New("invalid level")

	// ErrInvalidLogFormat will be returned when the format given to the WithLogFormat
	// options is invalid.
	ErrInvalidLogFormat = errors.New("invalid format")

	// ErrInvalidOutput will be returned when the output writer given to WithOuput
	// is nil.
	ErrInvalidOutput = errors.New("invalid output")
)

Functions

func Debug

func Debug(kv ...interface{})

Debug logs an debug level message to the default logger.

func Error

func Error(kv ...interface{})

Error logs an error level message to the default logger.

func Info

func Info(kv ...interface{})

Info logs an info level message to the default logger.

func NewContextWithLogger

func NewContextWithLogger(ctx context.Context, logger Logger) context.Context

NewContextWithLogger returns a new context with the given logger.

func Warn

func Warn(kv ...interface{})

Warn logs a warn level message to the default logger.

Types

type DB

type DB struct {
	// contains filtered or unexported fields
}

func OpenDB

func OpenDB(ctx context.Context, dsn string) (*DB, error)

OpenDB creates the DB, using the given DSN.

func (*DB) Close

func (db *DB) Close() error

Close closes the database.

func (*DB) GetUsersSummary added in v0.0.6

func (db *DB) GetUsersSummary(ctx context.Context) (UsersSummary, error)

func (*DB) InsertUsers added in v0.0.5

func (db *DB) InsertUsers(ctx context.Context, users []User) error

InsertUsers stores the given users in the database.

func (*DB) Migrate

func (db *DB) Migrate() error

Migrate runs the migrations stored in the migrations directory.

type Ingest

type Ingest struct {
	MaxWorkers int64
	ChunkSize  int
	DB         UserStore
}

func (Ingest) Ingest added in v0.0.5

func (i Ingest) Ingest(ctx context.Context, urls []string) []*Result

Ingest ingests the data available from the given urls. The result of the operation is returned.

func (Ingest) Worker

func (i Ingest) Worker(ctx context.Context, result *Result)

Worker pulls the data from the url (available in the given result), parses each line of the file as CSV, and stores the parsed data in the db (in chunks of 100).

type LogFormat

type LogFormat uint

LogFormat defines the output formats of the logger. Supported formats are LogFormatLogfmt (the default), LogFormatJSON, and LogFormatNop (no logging).

func ToLogFormat

func ToLogFormat(f string) LogFormat

ToLogFormat translates the given format as a string to a LogFormat.

type LogLevel

type LogLevel uint

LogLevel defines the logging levels available to the Logger, with a level of debug logging all message and error logging only error message.

func ToLogLevel

func ToLogLevel(l string) LogLevel

ToLogLevel translates the given level as a string to a LogLevel.

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger is the interface for all logging operations.

var (
	DefaultLogger Logger
)

func LoggerFromContext

func LoggerFromContext(ctx context.Context) Logger

LoggerFromContext returns the logger for the current request.

func NewLogger

func NewLogger(opts ...Option) (Logger, error)

NewLogger returns a Logger.

func (Logger) Debug

func (l Logger) Debug(kv ...interface{})

Debug logs a debug level message.

func (Logger) Error

func (l Logger) Error(kv ...interface{})

Error logs an error level message.

func (Logger) Info

func (l Logger) Info(kv ...interface{})

Info logs an info level message.

func (Logger) Warn

func (l Logger) Warn(kv ...interface{})

Warn logs a warn level message.

func (Logger) With

func (l Logger) With(kv ...interface{}) Logger

With returns a new contextual logger with keyvals prepended to those passed to calls to log.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option provides for Logger configuration.

func AsDefaultLogger

func AsDefaultLogger() Option

AsDefaultLogger sets the DefaultLogger.

func WithLogFormat

func WithLogFormat(format LogFormat) Option

WithLogFormat allows the format to be configured. The default format is LogFormatLogfmt.

func WithLogLevel

func WithLogLevel(level LogLevel) Option

WithLogLevel allows the level to be configured. The default level is LogLevelInfo.

func WithLogOutput

func WithLogOutput(writer io.Writer) Option

WithLogOutput allows the format to be configured. The default writer is os.Stdout.

func WithoutLogTimestamp

func WithoutLogTimestamp() Option

WithoutLogTimestamp disables the use of a timestamp for logs. Useful for unit tests.

type Result

type Result struct {
	URL   string
	Error error
}

type StringBuffer

type StringBuffer struct {
	// contains filtered or unexported fields
}

StringBuffer implements a simple buffer that can be used in tests. It implements the io.Writer interface. Each write will append a string to the Buffer.

func NewStringBuffer

func NewStringBuffer() *StringBuffer

NewStringBuffer returns a StringBuffer.

func (*StringBuffer) Index

func (l *StringBuffer) Index(i int) string

Index ... TODO

func (*StringBuffer) Len

func (l *StringBuffer) Len() int

Len ... TODO

func (*StringBuffer) Write

func (l *StringBuffer) Write(p []byte) (int, error)

Write implements the io.Writer interface.

type User

type User struct {
	FirstName string
	LastName  string
	Age       int
}

type UserStore added in v0.0.5

type UserStore interface {
	InsertUsers(ctx context.Context, users []User) error
}

type UsersSummary added in v0.0.6

type UsersSummary struct {
	Total      int
	Average    float64
	MedianAge  float64
	MedianName string
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL