Documentation ¶
Index ¶
- Variables
- func Offset(storageKey []byte) uint64
- func OffsetBytes(offset uint64) []byte
- func StorageKey(msg api.Message) []byte
- func StorageValue(msg api.Message) []byte
- type AcquireMode
- type Client
- func (c *Client) AcquireDB(topic string, mode AcquireMode) (*pebble.DB, context.CancelFunc, error)
- func (c *Client) Close() error
- func (c *Client) CreateDB(topic string) error
- func (c *Client) CreateTopics(ctx context.Context, topics ...string) (api.TopicErrors, error)
- func (c *Client) DeleteDB(topic string) error
- func (c *Client) DeleteTopics(ctx context.Context, topics ...string) (api.TopicErrors, error)
- func (c *Client) FindFirst(ctx context.Context, topic string) ([]byte, error)
- func (c *Client) FindLast(ctx context.Context, topic string) ([]byte, error)
- func (c *Client) Get(topic string, storageKey []byte) (api.Message, error)
- func (c *Client) GetLogger() api.LoggerFunc
- func (c *Client) IsExistsError(err error) bool
- func (c *Client) NewReader(topic string) api.Reader
- func (c *Client) NewWriter() api.Writer
- func (c *Client) Read(ctx context.Context, topic string, partition int, offset *uint64) (api.Message, error)
- func (c *Client) ReadNext(ctx context.Context, topic string, currentKey []byte) (api.Message, error)
- func (c *Client) SetLogger(fn api.LoggerFunc)
- func (c *Client) Subscribe(ctx context.Context, topic string, fn HandleFunc) error
- func (c *Client) Write(ctx context.Context, topic string, msg ...api.Message) error
- type HandleFunc
- type Message
- type Mtx
- type OffsetStatus
- type Reader
- type StartOffset
- type Writer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrorTableNotInitalized = errors.New("pebble.DB not initalized") ErrorPipeClosed = errors.New("pebble.Reader pipe closed") ErrorInvalidStartOffset = errors.New("pebble.Reader invalid start offset") ErrorOffsetNotFound = errors.New("pebble.Reader could not find start offset") ErrorReicevedOldMessage = errors.New("pebble.Reader received old message from pebble.Client") )
View Source
var Metrics = &Mtx{ Reads: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "kstore", Subsystem: "pebble", Name: "reads_total", Help: "Messages read by topic and status", }, []string{"topic", "status"}, ), Writes: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "kstore", Subsystem: "pebble", Name: "writes_total", Help: "Messages written by topic", }, []string{"topic"}, ), }
View Source
var OffsetStatuses = []OffsetStatus{OffsetStatusOlder, OffsetStatusCurrent, OffsetStatusNewer}
Functions ¶
func OffsetBytes ¶
OffsetBytes converts an offset to a byte slice of 8 bytes.
func StorageKey ¶
StorageKey returns a []byte key used for storing messages ordered by offset.
func StorageValue ¶
Types ¶
type AcquireMode ¶
type AcquireMode int
const ( AcquireModeRead AcquireMode = iota AcquireModeWrite )
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) AcquireDB ¶
func (c *Client) AcquireDB(topic string, mode AcquireMode) (*pebble.DB, context.CancelFunc, error)
func (*Client) CreateTopics ¶
func (*Client) DeleteTopics ¶
func (*Client) GetLogger ¶
func (c *Client) GetLogger() api.LoggerFunc
func (*Client) IsExistsError ¶
func (*Client) SetLogger ¶
func (c *Client) SetLogger(fn api.LoggerFunc)
type Mtx ¶
type Mtx struct { Reads *prometheus.CounterVec Writes *prometheus.CounterVec }
func (*Mtx) GetReads ¶
func (m *Mtx) GetReads(topic string, status ...OffsetStatus) map[OffsetStatus]int
func (*Mtx) ObserveRead ¶
func (m *Mtx) ObserveRead(msg api.Message, topic string, status OffsetStatus)
func (*Mtx) ObserveWrite ¶
type OffsetStatus ¶
type OffsetStatus int
const ( OffsetStatusOlder OffsetStatus = iota OffsetStatusCurrent OffsetStatusNewer )
func CompareOffsetByKey ¶
func CompareOffsetByKey(currentKey, otherKey []byte) OffsetStatus
compareOffset compares the offset of the message with the offset of the last message seen. NOTE: Must be protected by r.mu!
func (OffsetStatus) String ¶
func (s OffsetStatus) String() string
type StartOffset ¶
type StartOffset int
const ( StartOffsetFirst StartOffset = iota StartOffsetLast )
Click to show internal directories.
Click to hide internal directories.