Documentation ¶
Index ¶
- Variables
- func PrintSampleConfig()
- type Config
- type Hub
- func (m *Hub) DB() *sql.DB
- func (m *Hub) ForceGC(streamName string) error
- func (m *Hub) GetStreamNames() ([]string, error)
- func (m *Hub) MessagesSinceOffset(streamName string, offset Offset) ([]Message, error)
- func (m *Hub) MinMaxID(streamName string) (int64, int64, error)
- func (m *Hub) PollStat(streamName string) map[string]interface{}
- func (m *Hub) Publish(streamName string, msg *Message) error
- func (m *Hub) Subscribe(streamName string, subscriberID string) (<-chan Message, error)
- func (m *Hub) Unsubscribe(streamName string, subscriberID string)
- type Message
- type Offset
- type PollWorker
- type Stat
- type Store
- type Stream
- type TiDBStore
- func (s *TiDBStore) CreateStream(streamName string) error
- func (s *TiDBStore) DB() *sql.DB
- func (s *TiDBStore) FetchMessages(streamName string, idOffset Offset, limit int) ([]Message, Offset, error)
- func (s *TiDBStore) GetStreamNames() ([]string, error)
- func (s *TiDBStore) Init() error
- func (s *TiDBStore) MinMaxID(streamName string) (int64, int64, error)
- func (s *TiDBStore) PutMessages(streamName string, messages []*Message) error
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrStreamNotFound error = errors.New("stream not found")
)
Functions ¶
func PrintSampleConfig ¶
func PrintSampleConfig()
Types ¶
type Config ¶
type Config struct { // DSN is the data source name. DSN string `toml:"dsn" env:"DSN" env-default:"root:@tcp(localhost:4000)/test"` // MaxBatchSize is the maximum number of messages to batch a transaction. MaxBatchSize int `toml:"max_batch_size" env:"MAX_BATCH_SIZE" env-default:"1000"` // PollIntervalInMs is the interval to poll the database. PollIntervalInMs int `toml:"poll_interval_in_ms" env:"POLL_INTERVAL_IN_MS" env-default:"100"` // GCIntervalInSec is the interval to run garbage collection. GCIntervalInSec int `toml:"gc_interval_in_sec" env:"GC_INTERVAL_IN_SEC" env-default:"600"` // GCKeepItems is the number of items to keep in the cache. GCKeepItems int `toml:"gc_keep_items" env:"GC_KEEP_ITEMS" env-default:"10000"` }
func DefaultConfig ¶
func DefaultConfig() *Config
func MustLoadConfig ¶
LoadConfigFromEnv loads config from environment variables.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
func (*Hub) GetStreamNames ¶
func (*Hub) MessagesSinceOffset ¶
func (*Hub) Unsubscribe ¶
type Message ¶
type PollWorker ¶
type PollWorker struct {
// contains filtered or unexported fields
}
PollWorker is a worker that polls messages from a stream
func (*PollWorker) Stat ¶
func (pw *PollWorker) Stat() map[string]interface{}
func (*PollWorker) Stop ¶
func (pw *PollWorker) Stop()
type Stat ¶
type Stat struct {
// contains filtered or unexported fields
}
NOT yet implemented, just a placeholder
type Store ¶
type Store interface { // Init initializes the store, call it after creating the store Init() error // CreateStream creates a stream CreateStream(streamName string) error // PutMessages puts messages into a stream PutMessages(streamName string, messages []*Message) error // FetchMessages fetches messages from a stream FetchMessages(streamName string, offset Offset, limit int) ([]Message, Offset, error) // MinMaxID returns the min, max offset of a stream MinMaxID(streamName string) (int64, int64, error) // GetStreamNames returns the names of all streams GetStreamNames() ([]string, error) // DB returns the underlying database DB() *sql.DB }
Store is the interface for the storage of the messages
type TiDBStore ¶
type TiDBStore struct {
// contains filtered or unexported fields
}
func NewTiDBStore ¶
func (*TiDBStore) CreateStream ¶
CreateStream creates a stream, every stream is a table in the database
func (*TiDBStore) FetchMessages ¶
func (*TiDBStore) GetStreamNames ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.