db

package
v0.0.0-...-1dbe30f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2020 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MessagesByTopicPartitionOffset byte = iota
	Offsets
	TimestampIndex
	TimestampTopicIndex
	TimestampTopicPartitionIndex
)

Variables

This section is empty.

Functions

func BytesToInt64

func BytesToInt64(blob []byte) int64

func Contains

func Contains(slice []int, value int) bool

func CopyBytes

func CopyBytes(src []byte) []byte

func Int32ToBytes

func Int32ToBytes(value int32) []byte

func Int64ToBytes

func Int64ToBytes(value int64) []byte

func Key

func Key(table byte, parts ...[]byte) []byte

func Millis

func Millis(nanos int64) int64

func StringToBytes

func StringToBytes(s string) []byte

func UInt32ToBytes

func UInt32ToBytes(value uint32) []byte

func UInt64ToBytes

func UInt64ToBytes(value uint64) []byte

Types

type BinaryData

type BinaryData struct {
	Value []byte `json:"value"`
	Size  int    `json:"size"`
	Type  string `json:"type"`
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

func Open

func Open(dir string) (*DB, error)

func (*DB) Close

func (db *DB) Close()

func (*DB) FindMessage

func (db *DB) FindMessage(topic string, partition int32, offset int64) (*Message, error)

func (*DB) FindOffset

func (db *DB) FindOffset(topic string, partition int32) (int64, error)

func (*DB) SaveMessage

func (db *DB) SaveMessage(msg *Message) error

func (*DB) SaveOffset

func (db *DB) SaveOffset(topic string, partition int32, offset int64) error

func (*DB) SearchMessagesByTime

func (db *DB) SearchMessagesByTime(search *SearchRequest) (*SearchResponse, error)

func (*DB) Update

func (db *DB) Update(ops ...Operation) error

type Datastore

type Datastore interface {
	Close()

	SaveMessage(msg *Message) error
	FindMessage(topic string, partition int32, offset int64) (*Message, error)

	SaveOffset(topic string, partition int32, offset int64) error
	FindOffset(topic string, partition int32) (int64, error)

	SearchMessagesByTime(search *SearchRequest) (*SearchResponse, error)
}

type Lookup

type Lookup func(start, end, current []byte, count int) []byte

type Message

type Message struct {
	Key            BinaryData     `json:"key"`
	Value          BinaryData     `json:"value"`
	Topic          string         `json:"topic"`
	Partition      int32          `json:"partition"`
	Offset         int64          `json:"offset"`
	Timestamp      int64          `json:"timestamp"`
	BlockTimestamp int64          `json:"blockTimestamp"`
	Tags           map[string]Tag `json:"tags"`
}

func MessageFromBytes

func MessageFromBytes(blob []byte) (*Message, error)

func NewMessage

func NewMessage(msg *sarama.ConsumerMessage) *Message

func (*Message) AddTag

func (m *Message) AddTag(tag, value, details string)

func (*Message) ToBytes

func (m *Message) ToBytes() ([]byte, error)

type Operation

type Operation func(txn *badger.Txn) error

type Paging

type Paging struct {
	Limit int   `json:"limit"`
	Pages []int `json:"pages"`
}

type SearchRequest

type SearchRequest struct {
	Search   string  `json:"search"`
	Earliest int64   `json:"earliest"`
	Latest   int64   `json:"latest"`
	Page     int     `json:"page"`
	Offset   []byte  `json:"offset"`
	Paging   *Paging `json:"paging"`
}

func (*SearchRequest) SkipRows

func (r *SearchRequest) SkipRows() int

type SearchResponse

type SearchResponse struct {
	Total      int               `json:"total"`
	Earliest   int64             `json:"earliest"`
	Latest     int64             `json:"latest"`
	Rows       []*Message        `json:"rows"`
	Page       int               `json:"page"`
	OffsetUsed bool              `json:"offsetUsed"`
	Offsets    map[string][]byte `json:"offsets"`
	Took       int64             `json:"took"`
	TimedOut   bool              `json:"timedOut"`
}

func (*SearchResponse) AddOffset

func (r *SearchResponse) AddOffset(page int, offset []byte)

func (*SearchResponse) CleanOffsets

func (r *SearchResponse) CleanOffsets()

func (*SearchResponse) TotalPages

func (r *SearchResponse) TotalPages() int

type Tag

type Tag struct {
	Value   string `json:"value"`
	Details string `json:"details,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL