Documentation ¶
Overview ¶
Package bike_share implements a Gazette consumer application which processes and serves streaming Citi Bike system data. It indexes a window of recent rides for each bike, serves a simple history API, and detects long graph cycles as they're completed by individual bikes.
Index ¶
- Constants
- type Application
- func (app *Application) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, ...) error
- func (Application) FinalizeTxn(consumer.Shard, consumer.Store, *message.Publisher) error
- func (app *Application) InitApplication(args runconsumer.InitArgs) (err error)
- func (app *Application) NewConfig() runconsumer.Config
- func (Application) NewMessage(*pb.JournalSpec) (message.Message, error)
- func (app *Application) NewStore(_ consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)
- func (app *Application) ServeBikeHistory(w http.ResponseWriter, r *http.Request)
- type Cycle
- type CycleStep
Constants ¶
const CreateTableStmt = `` /* 875-byte string literal not displayed */
CreateTableStmt bootstraps an embedded SQLite database with the "rides" table.
const InsertStmt = `` /* 352-byte string literal not displayed */
InsertStmt inserts a CSV bike-share input record into the "rides" table.
const QueryCycleStmt = `` /* 1620-byte string literal not displayed */
QueryCycleStmt returns a path taken by a bike ID ($1) of at least length $2, such that the path starts and ends at the bike's final station but does not visit it in between, and where the bike is not relocated between rides.
const QueryHistoryStmt = `
SELECT uuid, start_time, end_time, start_station_name, end_station_name
FROM rides WHERE bike_id = $1
`
QueryHistoryStmt retrieves the current window of rides of the given bike ($1). It powers the ServeBikeHistory API.
const WindowStmt = `` /* 173-byte string literal not displayed */
WindowStmt windows a bike ID ($1) to the $2 most-recent rides.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Application ¶
type Application struct { runconsumer.BaseConfig DBAddr string `long:"postgres" description:"Database connection string" default:"host=/var/run/postgresql" required:"true"` // contains filtered or unexported fields }
Application is a consumer framework application which finds cycles in bike-share data. It implements the consumer.Application interface as well as runconsumer.Application, which extends consumer.Application with configuration parsing and initialization.
func (*Application) ConsumeMessage ¶
func (app *Application) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, pub *message.Publisher) error
ConsumeMessage inserts a CSVRecord of bike-share ride data into the store and queries for a now-completed cycle of the record's bike ID. If matched, it publishes a Cycle event.
func (Application) FinalizeTxn ¶
FinalizeTxn is a no-op for this Application. If the application kept in-memory-only updates while consuming messages, FinalizeTxn would be the time for it to flush them out.
func (*Application) InitApplication ¶
func (app *Application) InitApplication(args runconsumer.InitArgs) (err error)
InitApplication initializes dynamic mappings of bike-share data types to responsible partitions, opens the configured database, prepares DB statements for future use, and registers a simple bike-share history HTTP API.
func (*Application) NewConfig ¶
func (app *Application) NewConfig() runconsumer.Config
NewConfig returns the Config struct of our Application, parse-able with `github.com/jessevdk/go-flags`. In this case our Application's type is also its config.
func (Application) NewMessage ¶
func (Application) NewMessage(*pb.JournalSpec) (message.Message, error)
NewMessage returns an instance of the appropriate message type for decoding from the given journal. For this use-case, we use the provided message.CSVRecord type.
func (*Application) NewStore ¶
func (app *Application) NewStore(_ consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)
NewStore instantiates either a SQLStore using the remote DB or, if the Shard has a recovery log, an embedded store_sqlite.Store.
func (*Application) ServeBikeHistory ¶
func (app *Application) ServeBikeHistory(w http.ResponseWriter, r *http.Request)
ServeBikeHistory is an http.HandlerFunc which returns the most recent rides of a bike ID provided via URL parameter. Invoke as:
/api/bikes?id=12345
If the bike ID is served by a non-local shard, ServeBikeHistory will proxy to the appropriate peer.
type Cycle ¶
Cycle describes a graph cycle completed by a bike.
func (*Cycle) NewAcknowledgement ¶
NewAcknowledgement returns a new & zero-valued Cycle. message.Publisher uses it to build a message which acknowledge other Cycle messages previously published.