Documentation

Overview

    Package bike_share implements a Gazette consumer application which processes and serves streaming Citi Bike system data. It indexes a window of recent rides for each bike, serves a simple history API, and detects long graph cycles as they're completed by individual bikes.

    Index

    Constants

    View Source
    const CreateTableStmt = `` /* 875-byte string literal not displayed */
    

      CreateTableStmt bootstraps an embedded SQLite database with the "rides" table.

      View Source
      const InsertStmt = `` /* 352-byte string literal not displayed */
      

        InsertStmt inserts a CSV bike-share input record into the "rides" table.

        View Source
        const QueryCycleStmt = `` /* 1620-byte string literal not displayed */
        

          QueryCycleStmt returns a path taken by a bike ID ($1) of at least length $2, such that the path starts and ends at the bike's final station but does not visit it in between, and where the bike is not relocated between rides.

          View Source
          const QueryHistoryStmt = `
          SELECT uuid, start_time, end_time, start_station_name, end_station_name
          FROM rides WHERE bike_id = $1
          `

            QueryHistoryStmt retrieves the current window of rides of the given bike ($1). It powers the ServeBikeHistory API.

            View Source
            const WindowStmt = `` /* 173-byte string literal not displayed */
            

              WindowStmt windows a bike ID ($1) to the $2 most-recent rides.

              Variables

              This section is empty.

              Functions

              This section is empty.

              Types

              type Application

              type Application struct {
              	runconsumer.BaseConfig
              	DBAddr string `long:"postgres" description:"Database connection string" default:"host=/var/run/postgresql" required:"true"`
              	// contains filtered or unexported fields
              }

                Application is a consumer framework application which finds cycles in bike-share data. It implements the consumer.Application interface as well as runconsumer.Application, which extends consumer.Application with configuration parsing and initialization.

                func (*Application) ConsumeMessage

                func (app *Application) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, pub *message.Publisher) error

                  ConsumeMessage inserts a CSVRecord of bike-share ride data into the store and queries for a now-completed cycle of the record's bike ID. If matched, it publishes a Cycle event.

                  func (Application) FinalizeTxn

                    FinalizeTxn is a no-op for this Application. If the application kept in-memory-only updates while consuming messages, FinalizeTxn would be the time for it to flush them out.

                    func (*Application) InitApplication

                    func (app *Application) InitApplication(args runconsumer.InitArgs) (err error)

                      InitApplication initializes dynamic mappings of bike-share data types to responsible partitions, opens the configured database, prepares DB statements for future use, and registers a simple bike-share history HTTP API.

                      func (*Application) NewConfig

                      func (app *Application) NewConfig() runconsumer.Config

                        NewConfig returns the Config struct of our Application, parse-able with `github.com/jessevdk/go-flags`. In this case our Application's type is also its config.

                        func (Application) NewMessage

                        func (Application) NewMessage(*pb.JournalSpec) (message.Message, error)

                          NewMessage returns an instance of the appropriate message type for decoding from the given journal. For this use-case, we use the provided message.CSVRecord type.

                          func (*Application) NewStore

                          func (app *Application) NewStore(_ consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)

                            NewStore instantiates either a SQLStore using the remote DB or, if the Shard has a recovery log, an embedded store_sqlite.Store.

                            func (*Application) ServeBikeHistory

                            func (app *Application) ServeBikeHistory(w http.ResponseWriter, r *http.Request)

                              ServeBikeHistory is an http.HandlerFunc which returns the most recent rides of a bike ID provided via URL parameter. Invoke as:

                              /api/bikes?id=12345
                              

                              If the bike ID is served by a non-local shard, ServeBikeHistory will proxy to the appropriate peer.

                              type Cycle

                              type Cycle struct {
                              	UUID   message.UUID
                              	BikeID int
                              	Steps  []CycleStep
                              }

                                Cycle describes a graph cycle completed by a bike.

                                func (*Cycle) GetUUID

                                func (c *Cycle) GetUUID() message.UUID

                                  GetUUID returns the Cycle's UUID.

                                  func (*Cycle) NewAcknowledgement

                                  func (c *Cycle) NewAcknowledgement(pb.Journal) message.Message

                                    NewAcknowledgement returns a new & zero-valued Cycle. message.Publisher uses it to build a message which acknowledge other Cycle messages previously published.

                                    func (*Cycle) SetUUID

                                    func (c *Cycle) SetUUID(uuid message.UUID)

                                      SetUUID sets the Cycle's UUID. It's called by message.Publisher.

                                      type CycleStep

                                      type CycleStep struct {
                                      	Time    time.Time
                                      	Station string
                                      }

                                        CycleStep is a path step of a bike's graph cycle.

                                        Directories

                                        Path Synopsis