tipubsub

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

README

tipubsub

A small library using TiDB providing Sub/Pub API at sacle.

Subscriber:

type MySubscriber struct {}

var _ pubsub.Subscriber = (*MySubscriber)(nil)

func (s *MySubscriber) OnMessages(streamName string, msgs []pubsub.Message) {
	for _, msg := range msgs {
		log.I("Got Message:", msg, msg.ID)
	}
}

func (s *MySubscriber) Id() string {
	return "my_subscriber"
}

func main() {
    ...
	sub := &MySubscriber{}
	hub, err := pubsub.NewHub(cfg)
	if err != nil {
		log.Fatal(err)
	}
	err = hub.Subscribe("test_stream", sub, offset)
	if err != nil {
		log.Fatal(err)
	}
    ...
}

Publisher:

func main() {
    ...
	hub, err := pubsub.NewHub(cfg)
	if err != nil {
		log.Fatal(err)
	}
	for i := 0; i < 10000; i++ {
		hub.Publish("test_stream", &pubsub.Message{
			Data: []byte(fmt.Sprintf("Message: %d", i)),
		})
	}
    ...
}

See example for more details

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStreamNotFound error = errors.New("stream not found")
)

Functions

func PrintSampleConfig

func PrintSampleConfig()

Types

type Config

type Config struct {
	// DSN is the data source name.
	DSN string `toml:"dsn" env:"DSN" env-default:"root:@tcp(localhost:4000)/test"`
	// MaxBatchSize is the maximum number of messages to batch a transaction.
	MaxBatchSize int `toml:"max_batch_size" env:"MAX_BATCH_SIZE" env-default:"1000"`
	// PollIntervalInMs is the interval to poll the database.
	PollIntervalInMs int `toml:"poll_interval_in_ms" env:"POLL_INTERVAL_IN_MS" env-default:"100"`
	// GCIntervalInSec is the interval to run garbage collection.
	GCIntervalInSec int `toml:"gc_interval_in_sec" env:"GC_INTERVAL_IN_SEC" env-default:"600"`
	// GCKeepItems is the number of items to keep in the cache.
	GCKeepItems int `toml:"gc_keep_items" env:"GC_KEEP_ITEMS" env-default:"10000"`
}

func DefaultConfig

func DefaultConfig() *Config

func LoadConfig

func LoadConfig(path string) (*Config, error)

LoadConfig loads config from file.

func MustLoadConfig

func MustLoadConfig(path string) *Config

LoadConfigFromEnv loads config from environment variables.

func (*Config) String

func (c *Config) String() string

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

func NewHub

func NewHub(c *Config) (*Hub, error)

func (*Hub) DB

func (m *Hub) DB() *sql.DB

func (*Hub) ForceGC

func (m *Hub) ForceGC(streamName string) error

func (*Hub) GetStreamNames

func (m *Hub) GetStreamNames() ([]string, error)

func (*Hub) MessagesSinceOffset

func (m *Hub) MessagesSinceOffset(streamName string, offset Offset) ([]Message, error)

func (*Hub) MinMaxID

func (m *Hub) MinMaxID(streamName string) (int64, int64, error)

func (*Hub) PollStat

func (m *Hub) PollStat(streamName string) map[string]interface{}

func (*Hub) Publish

func (m *Hub) Publish(streamName string, msg *Message) error

func (*Hub) Subscribe

func (m *Hub) Subscribe(streamName string, subscriberID string) (<-chan Message, error)

func (*Hub) Unsubscribe

func (m *Hub) Unsubscribe(streamName string, subscriberID string)

type Message

type Message struct {
	ID   int64  `json:"id,string"`
	Ts   int64  `json:"ts,string"`
	Data string `json:"data"`
}

func (Message) String

func (m Message) String() string

type Offset

type Offset int64
var (
	LatestId Offset = -1
)

func (Offset) String

func (o Offset) String() string

type PollWorker

type PollWorker struct {
	// contains filtered or unexported fields
}

PollWorker is a worker that polls messages from a stream

func (*PollWorker) Stat

func (pw *PollWorker) Stat() map[string]interface{}

func (*PollWorker) Stop

func (pw *PollWorker) Stop()

type Stat

type Stat struct {
	// contains filtered or unexported fields
}

NOT yet implemented, just a placeholder

func GetStat

func GetStat() *Stat

func (*Stat) Inc

func (s *Stat) Inc(key string, val int)

func (*Stat) JSON

func (s *Stat) JSON() (string, error)

func (*Stat) Set

func (s *Stat) Set(key string, val interface{})

type Store

type Store interface {
	// Init initializes the store, call it after creating the store
	Init() error
	// CreateStream creates a stream
	CreateStream(streamName string) error
	// PutMessages puts messages into a stream
	PutMessages(streamName string, messages []*Message) error
	// FetchMessages fetches messages from a stream
	FetchMessages(streamName string, offset Offset, limit int) ([]Message, Offset, error)
	// MinMaxID returns the min, max offset of a stream
	MinMaxID(streamName string) (int64, int64, error)
	// GetStreamNames returns the names of all streams
	GetStreamNames() ([]string, error)
	// DB returns the underlying database
	DB() *sql.DB
}

Store is the interface for the storage of the messages

func OpenStore

func OpenStore(dsn string) (Store, error)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(cfg *Config, s Store, name string) (*Stream, error)

func (*Stream) MinMaxID

func (s *Stream) MinMaxID() (int64, int64, error)

func (*Stream) Name

func (s *Stream) Name() string

func (*Stream) Open

func (s *Stream) Open() error

func (*Stream) Publish

func (s *Stream) Publish(m *Message)

type TiDBStore

type TiDBStore struct {
	// contains filtered or unexported fields
}

func NewTiDBStore

func NewTiDBStore(dsn string) *TiDBStore

func (*TiDBStore) CreateStream

func (s *TiDBStore) CreateStream(streamName string) error

CreateStream creates a stream, every stream is a table in the database

func (*TiDBStore) DB

func (s *TiDBStore) DB() *sql.DB

func (*TiDBStore) FetchMessages

func (s *TiDBStore) FetchMessages(streamName string, idOffset Offset, limit int) ([]Message, Offset, error)

func (*TiDBStore) GetStreamNames

func (s *TiDBStore) GetStreamNames() ([]string, error)

func (*TiDBStore) Init

func (s *TiDBStore) Init() error

func (*TiDBStore) MinMaxID

func (s *TiDBStore) MinMaxID(streamName string) (int64, int64, error)

func (*TiDBStore) PutMessages

func (s *TiDBStore) PutMessages(streamName string, messages []*Message) error

Directories

Path Synopsis
cmd
cli
example
pub
sub

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL