Version: v0.89.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2021 License: MIT Imports: 20 Imported by: 0



Package bike_share implements a Gazette consumer application which processes and serves streaming Citi Bike system data. It indexes a window of recent rides for each bike, serves a simple history API, and detects long graph cycles as they're completed by individual bikes.



View Source
const CreateTableStmt = `` /* 875-byte string literal not displayed */

CreateTableStmt bootstraps an embedded SQLite database with the "rides" table.

View Source
const InsertStmt = `` /* 352-byte string literal not displayed */

InsertStmt inserts a CSV bike-share input record into the "rides" table.

View Source
const QueryCycleStmt = `` /* 1620-byte string literal not displayed */

QueryCycleStmt returns a path taken by a bike ID ($1) of at least length $2, such that the path starts and ends at the bike's final station but does not visit it in between, and where the bike is not relocated between rides.

View Source
const QueryHistoryStmt = `
SELECT uuid, start_time, end_time, start_station_name, end_station_name
FROM rides WHERE bike_id = $1

QueryHistoryStmt retrieves the current window of rides of the given bike ($1). It powers the ServeBikeHistory API.

View Source
const WindowStmt = `` /* 173-byte string literal not displayed */

WindowStmt windows a bike ID ($1) to the $2 most-recent rides.


This section is empty.


This section is empty.


type Application

type Application struct {
	DBAddr string `long:"postgres" description:"Database connection string" default:"host=/var/run/postgresql" required:"true"`
	// contains filtered or unexported fields

Application is a consumer framework application which finds cycles in bike-share data. It implements the consumer.Application interface as well as runconsumer.Application, which extends consumer.Application with configuration parsing and initialization.

func (*Application) ConsumeMessage

func (app *Application) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, pub *message.Publisher) error

ConsumeMessage inserts a CSVRecord of bike-share ride data into the store and queries for a now-completed cycle of the record's bike ID. If matched, it publishes a Cycle event.

func (Application) FinalizeTxn

FinalizeTxn is a no-op for this Application. If the application kept in-memory-only updates while consuming messages, FinalizeTxn would be the time for it to flush them out.

func (*Application) InitApplication

func (app *Application) InitApplication(args runconsumer.InitArgs) (err error)

InitApplication initializes dynamic mappings of bike-share data types to responsible partitions, opens the configured database, prepares DB statements for future use, and registers a simple bike-share history HTTP API.

func (*Application) NewConfig

func (app *Application) NewConfig() runconsumer.Config

NewConfig returns the Config struct of our Application, parse-able with ``. In this case our Application's type is also its config.

func (Application) NewMessage

func (Application) NewMessage(*pb.JournalSpec) (message.Message, error)

NewMessage returns an instance of the appropriate message type for decoding from the given journal. For this use-case, we use the provided message.CSVRecord type.

func (*Application) NewStore

func (app *Application) NewStore(_ consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)

NewStore instantiates either a SQLStore using the remote DB or, if the Shard has a recovery log, an embedded store_sqlite.Store.

func (*Application) ServeBikeHistory

func (app *Application) ServeBikeHistory(w http.ResponseWriter, r *http.Request)

ServeBikeHistory is an http.HandlerFunc which returns the most recent rides of a bike ID provided via URL parameter. Invoke as:


If the bike ID is served by a non-local shard, ServeBikeHistory will proxy to the appropriate peer.

type Cycle

type Cycle struct {
	UUID   message.UUID
	BikeID int
	Steps  []CycleStep

Cycle describes a graph cycle completed by a bike.

func (*Cycle) GetUUID

func (c *Cycle) GetUUID() message.UUID

GetUUID returns the Cycle's UUID.

func (*Cycle) NewAcknowledgement

func (c *Cycle) NewAcknowledgement(pb.Journal) message.Message

NewAcknowledgement returns a new & zero-valued Cycle. message.Publisher uses it to build a message which acknowledge other Cycle messages previously published.

func (*Cycle) SetUUID

func (c *Cycle) SetUUID(uuid message.UUID)

SetUUID sets the Cycle's UUID. It's called by message.Publisher.

type CycleStep

type CycleStep struct {
	Time    time.Time
	Station string

CycleStep is a path step of a bike's graph cycle.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL