storage

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2023 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetNowTimestamp

func GetNowTimestamp() uint64

func SaveAgentMeta

func SaveAgentMeta(path string, meta AgentMeta) error

Types

type AgentMeta

type AgentMeta struct {
	PublisherID       string
	SubscriberID      string
	PublishedOffsets  TopicFragmentOffsets
	SubscribedOffsets TopicFragmentOffsets
	LastFetchedOffset TopicFragmentOffsets
}

func LoadAgentMeta

func LoadAgentMeta(path string) (AgentMeta, error)

type CFIndex

type CFIndex int
const (
	DefaultCF CFIndex = iota
	RecordCF
	RecordExpCF // column family for record-expiration
)

func (CFIndex) String

func (c CFIndex) String() string

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is helper for grocksdb

func NewDB

func NewDB(name, dir string) (*DB, error)

func (*DB) Close

func (d *DB) Close()

func (*DB) ColumnFamilyHandles

func (d *DB) ColumnFamilyHandles() grocksdb.ColumnFamilyHandles

func (*DB) DeleteExpiredRecords

func (d *DB) DeleteExpiredRecords() (numDeleted int, deletionErr error)

DeleteExpiredRecords Record only can be deleted on expired

func (*DB) Destroy

func (d *DB) Destroy() error

func (*DB) Flush

func (d *DB) Flush() error

func (*DB) GetRecord

func (d *DB) GetRecord(topic string, fragmentId uint32, offset uint64) (*grocksdb.Slice, error)

func (*DB) PutRecord

func (d *DB) PutRecord(topic string, fragmentId uint32, offset uint64, seqNum uint64, data []byte, expirationDate uint64) error

PutRecord expirationDate is timestamp(second) type

func (*DB) Scan

func (d *DB) Scan(cfIndex CFIndex) *grocksdb.Iterator

type FragmentKey

type FragmentKey string

func NewFragmentKey

func NewFragmentKey(topicName string, fragmentId uint) FragmentKey

type RecordKey

type RecordKey struct {
	*grocksdb.Slice
	// contains filtered or unexported fields
}

func NewRecordKey

func NewRecordKey(slice *grocksdb.Slice) *RecordKey

func NewRecordKeyFromData

func NewRecordKeyFromData(topic string, fragmentId uint32, offset uint64) *RecordKey

func (RecordKey) Data

func (k RecordKey) Data() []byte

func (RecordKey) FragmentId

func (k RecordKey) FragmentId() uint32

func (RecordKey) Offset

func (k RecordKey) Offset() uint64

func (*RecordKey) SetData

func (k *RecordKey) SetData(data []byte)

func (*RecordKey) SetOffset

func (k *RecordKey) SetOffset(offset uint64)

func (RecordKey) Size

func (k RecordKey) Size() int

func (RecordKey) Topic

func (k RecordKey) Topic() string

type RecordValue

type RecordValue struct {
	*grocksdb.Slice
	// contains filtered or unexported fields
}

func NewRecordValue

func NewRecordValue(slice *grocksdb.Slice) *RecordValue

func NewRecordValueFromData

func NewRecordValueFromData(seqNum uint64, publishedData []byte) *RecordValue

func (RecordValue) Data

func (v RecordValue) Data() []byte

func (RecordValue) PublishedData

func (v RecordValue) PublishedData() []byte

func (RecordValue) SeqNum

func (v RecordValue) SeqNum() uint64

func (*RecordValue) SetData

func (v *RecordValue) SetData(data []byte)

func (RecordValue) Size

func (v RecordValue) Size() int

type RetentionPeriodKey

type RetentionPeriodKey struct {
	*grocksdb.Slice
	// contains filtered or unexported fields
}

func NewRetentionPeriodKey

func NewRetentionPeriodKey(slice *grocksdb.Slice) *RetentionPeriodKey

func NewRetentionPeriodKeyFromData

func NewRetentionPeriodKeyFromData(recordKey *RecordKey, expirationDate uint64) *RetentionPeriodKey

func (RetentionPeriodKey) Data

func (k RetentionPeriodKey) Data() []byte

func (RetentionPeriodKey) ExpirationDate

func (k RetentionPeriodKey) ExpirationDate() uint64

func (RetentionPeriodKey) RecordKey

func (k RetentionPeriodKey) RecordKey() RecordKey

func (*RetentionPeriodKey) SetData

func (k *RetentionPeriodKey) SetData(data []byte)

func (RetentionPeriodKey) Size

func (k RetentionPeriodKey) Size() int

type RetentionScheduler

type RetentionScheduler struct {
	// contains filtered or unexported fields
}

func NewRetentionScheduler

func NewRetentionScheduler(db *DB, interval uint) *RetentionScheduler

NewRetentionScheduler interval is milliseconds

func (*RetentionScheduler) Run

func (r *RetentionScheduler) Run(ctx context.Context)

type TopicFragmentOffsets

type TopicFragmentOffsets struct {
	*sync.Map
}

func NewTopicFragmentOffsets

func NewTopicFragmentOffsets(m map[FragmentKey]uint64) TopicFragmentOffsets

func (*TopicFragmentOffsets) ToMap

func (o *TopicFragmentOffsets) ToMap() map[FragmentKey]uint64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL