Package bike_share implements a Gazette consumer application which processes and serves streaming Citi Bike system data. It indexes a window of recent rides for each bike, serves a simple history API, and detects long graph cycles as they're completed by individual bikes.



    View Source
    const CreateTableStmt = `` /* 875-byte string literal not displayed */

      CreateTableStmt bootstraps an embedded SQLite database with the "rides" table.

      View Source
      const InsertStmt = `` /* 352-byte string literal not displayed */

        InsertStmt inserts a CSV bike-share input record into the "rides" table.

        View Source
        const QueryCycleStmt = `` /* 1620-byte string literal not displayed */

          QueryCycleStmt returns a path taken by a bike ID ($1) of at least length $2, such that the path starts and ends at the bike's final station but does not visit it in between, and where the bike is not relocated between rides.

          View Source
          const QueryHistoryStmt = `
          SELECT uuid, start_time, end_time, start_station_name, end_station_name
          FROM rides WHERE bike_id = $1

            QueryHistoryStmt retrieves the current window of rides of the given bike ($1). It powers the ServeBikeHistory API.

            View Source
            const WindowStmt = `` /* 173-byte string literal not displayed */

              WindowStmt windows a bike ID ($1) to the $2 most-recent rides.


              This section is empty.


              This section is empty.


              type Application

              type Application struct {
              	DBAddr string `long:"postgres" description:"Database connection string" default:"host=/var/run/postgresql" required:"true"`
              	// contains filtered or unexported fields

                Application is a consumer framework application which finds cycles in bike-share data. It implements the consumer.Application interface as well as runconsumer.Application, which extends consumer.Application with configuration parsing and initialization.

                func (*Application) ConsumeMessage

                func (app *Application) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, pub *message.Publisher) error

                  ConsumeMessage inserts a CSVRecord of bike-share ride data into the store and queries for a now-completed cycle of the record's bike ID. If matched, it publishes a Cycle event.

                  func (Application) FinalizeTxn

                    FinalizeTxn is a no-op for this Application. If the application kept in-memory-only updates while consuming messages, FinalizeTxn would be the time for it to flush them out.

                    func (*Application) InitApplication

                    func (app *Application) InitApplication(args runconsumer.InitArgs) (err error)

                      InitApplication initializes dynamic mappings of bike-share data types to responsible partitions, opens the configured database, prepares DB statements for future use, and registers a simple bike-share history HTTP API.

                      func (*Application) NewConfig

                      func (app *Application) NewConfig() runconsumer.Config

                        NewConfig returns the Config struct of our Application, parse-able with ``. In this case our Application's type is also its config.

                        func (Application) NewMessage

                        func (Application) NewMessage(*pb.JournalSpec) (message.Message, error)

                          NewMessage returns an instance of the appropriate message type for decoding from the given journal. For this use-case, we use the provided message.CSVRecord type.

                          func (*Application) NewStore

                          func (app *Application) NewStore(_ consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)

                            NewStore instantiates either a SQLStore using the remote DB or, if the Shard has a recovery log, an embedded store_sqlite.Store.

                            func (*Application) ServeBikeHistory

                            func (app *Application) ServeBikeHistory(w http.ResponseWriter, r *http.Request)

                              ServeBikeHistory is an http.HandlerFunc which returns the most recent rides of a bike ID provided via URL parameter. Invoke as:


                              If the bike ID is served by a non-local shard, ServeBikeHistory will proxy to the appropriate peer.

                              type Cycle

                              type Cycle struct {
                              	UUID   message.UUID
                              	BikeID int
                              	Steps  []CycleStep

                                Cycle describes a graph cycle completed by a bike.

                                func (*Cycle) GetUUID

                                func (c *Cycle) GetUUID() message.UUID

                                  GetUUID returns the Cycle's UUID.

                                  func (*Cycle) NewAcknowledgement

                                  func (c *Cycle) NewAcknowledgement(pb.Journal) message.Message

                                    NewAcknowledgement returns a new & zero-valued Cycle. message.Publisher uses it to build a message which acknowledge other Cycle messages previously published.

                                    func (*Cycle) SetUUID

                                    func (c *Cycle) SetUUID(uuid message.UUID)

                                      SetUUID sets the Cycle's UUID. It's called by message.Publisher.

                                      type CycleStep

                                      type CycleStep struct {
                                      	Time    time.Time
                                      	Station string

                                        CycleStep is a path step of a bike's graph cycle.


                                        Path Synopsis