timetogo

package module
v0.0.0-...-319f178 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2019 License: MIT Imports: 18 Imported by: 1

README

GoDoc Build Status Coverage Status Go Report Card

Overview

Allows efficient persistence of multiple series of time-series data, efficient updates, and efficient retrieval.

Use Case

This project was primarily designed as an optimization when you have many data files that are expensively loaded into a time-series memory structure and would like to persist that data between invocations. To persist, that single time-series can be cut into several well-defined ranges of time (such as months or years), encoded (e.g. flatbuffers or gob) and stored to disk with time-to-go and then, on future invocations, loaded one series at a time, on demand. It should (and does) also accomodate updates to individual time-series if any source-data changes, without disrupting the other series, while also keeping all of this data in a single stream rather than a whole filesystem tree.

The time-series data itself is an arbitrary blob provided via a Reader given by the caller, along with head and tail timestamps, filename, record count, and data length.

Stream Structure

Each series is followed by a series footer, which is followed by a brief "shadow" footer describing a version and length, and each stream ends with a stream footer, followed by another shadow footer. The stream is read from back to front, and summary information about all series are stored in the stream footer. So, it is very quick to determine which series will contain a certain timestamp and where those series are in the stream. This backwards-to-forwards methodology is meant to optimize updates.

Update Complexity

If an update is performed but none of the time-series stored at the front of the stream have been changed, no writes for those series are performed. If we're only updating existing series and they're in the same order as in the stream, only the stream footer is updated. If one or more series are dropped from the stream, then any following series that are to be kept will be copied directly from that later position in the stream to the earlier position. In all of these cases, the caller is not required to provide the series data, and the caller will know in advance whether or not they need to provide that data by which series it is passing for the update.

Notes

  • An update on a stream may produce a smaller stream, but will be dealing with byte streams and will not be able to truncate the stream to a shorter size. There is a Truncater interface defined that matches the Truncate() method on File. If the ReadWriteSeeker struct that is passed into Updater also satisfies the Truncater interface, it will automatically be truncated. Otherwise, it will be the caller's responsibility to truncate that stream (whether it's a byte slice, physical file, etc...) to the right length using the length returned by the update.

Documentation

Index

Examples

Constants

View Source
const (
	// SeriesDataCopyBufferSize is the size of the buffer to use for the copy of
	// the time-series data into the output stream.
	SeriesDataCopyBufferSize = 1024 * 1024
)
View Source
const (
	// ShadowFooterSize is the size of the shadow footer:
	//
	//   version + type + length + boundary marker
	//
	ShadowFooterSize = 2 + 1 + 2 + 1
)

Variables

View Source
var (
	// TestTimeSeriesData is test data.
	TestTimeSeriesData = []byte("some time series data")

	// TestTimeSeriesData2 is test data.
	TestTimeSeriesData2 = []byte("X some time series data 2 X")
)
View Source
var (
	// ScopeTypePhrases are simple labels for the various scope types.
	ScopeTypePhrases = map[ScopeType]string{
		StSeries: "series",
		StStream: "stream",
		StMisc:   "misc",
	}
)

Functions

func DumpBytes

func DumpBytes(description string, rs io.ReadSeeker, position int64, count int, requireAll bool)

DumpBytes prints raw bytes.

func WriteTestMultiseriesStream

func WriteTestMultiseriesStream() (raw []byte, footers []*SeriesFooter1, sb *StreamBuilder)

WriteTestMultiseriesStream creates a stream with multiple test-series and validates that it looks okay before returning.

Types

type FooterType

type FooterType byte

FooterType is an enum that represents all footer types.

const (
	// FtSeriesFooter describes a footer that contains series information.
	FtSeriesFooter FooterType = 1

	// FtStreamFooter describes a footer that contains stream information.
	FtStreamFooter FooterType = 2
)

type GobSingleObjectDecoderDatasource

type GobSingleObjectDecoderDatasource struct {
	// contains filtered or unexported fields
}

GobSingleObjectDecoderDatasource wraps a `gob.Decoder` as a `SeriesDataDatasourceReader`.

func NewGobSingleObjectDecoderDatasource

func NewGobSingleObjectDecoderDatasource(outputValue interface{}) *GobSingleObjectDecoderDatasource

NewGobSingleObjectDecoderDatasource returns a new `GobSingleObjectDecoderDatasource` struct.

func (*GobSingleObjectDecoderDatasource) ReadData

func (gdd *GobSingleObjectDecoderDatasource) ReadData(r io.Reader, sf SeriesFooter) (n int, err error)

ReadData is called when series data needs to be read and decodes the raw series-data into the struct that we were initialized with.

type GobSingleObjectEncoderDatasource

type GobSingleObjectEncoderDatasource struct {
	// contains filtered or unexported fields
}

GobSingleObjectEncoderDatasource wraps a `gob.Encoder` as a `SeriesDataDatasourceWriter`.

func NewGobSingleObjectEncoderDatasource

func NewGobSingleObjectEncoderDatasource(inputValue interface{}) *GobSingleObjectEncoderDatasource

NewGobSingleObjectEncoderDatasource returns a new GobSingleObjectEncoderDatasource struct.

func (GobSingleObjectEncoderDatasource) WriteData

func (ged GobSingleObjectEncoderDatasource) WriteData(w io.Writer, sf SeriesFooter) (n int, err error)

WriteData is called when series data needs to be written and encodes the struct that we were initialized with into the writer we are given.

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index allows you to efficiently identify a recorded series based on criteria that can be found in the info in the stream footer.

func NewIndex

func NewIndex(rs io.ReadSeeker) (index *Index, err error)

NewIndex returns a new `Index` struct.

func (*Index) GetWithTimestamp

func (index *Index) GetWithTimestamp(timestamp time.Time) (matched []StreamIndexedSequenceInfo, err error)

GetWithTimestamp returns all series that contain the given timestamp.

Example
b := rifs.NewSeekableBuffer()

// Stage stream.

sb := NewStreamBuilder(b)
series := AddTestSeries(sb)

_, err := sb.Finish()
log.PanicIf(err)

raw := b.Bytes()

for i, series := range series {
	fmt.Printf("Test series (%d): %s [%v, %v]\n", i, series.Uuid(), series.HeadRecordTime(), series.TailRecordTime())
}

fmt.Printf("\n")

// Parse stream.

r := bytes.NewReader(raw)

index, err := NewIndex(r)
log.PanicIf(err)

queryTimestamp := time.Date(2016, 10, 1, 12, 34, 57, 0, time.UTC)

fmt.Printf("Query: %v\n", queryTimestamp)
fmt.Printf("\n")

matched, err := index.GetWithTimestamp(queryTimestamp)
log.PanicIf(err)

for _, matchedSeries := range matched {
	fmt.Printf("MATCHED: %s\n", matchedSeries.Uuid())
}
Output:

Test series (0): d095abf5-126e-48a7-8974-885de92bd964 [2016-10-01 12:34:56 +0000 UTC, 2016-10-01 12:35:16 +0000 UTC]
Test series (1): 8a4ba0c4-0a0d-442f-8256-1d61adb16abc [2016-10-01 12:35:06 +0000 UTC, 2016-10-01 12:35:26 +0000 UTC]

Query: 2016-10-01 12:34:57 +0000 UTC

MATCHED: d095abf5-126e-48a7-8974-885de92bd964

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

Iterator efficiently steps backwards through the series in a stream in order.

func NewIterator

func NewIterator(sr *StreamReader) (it *Iterator, err error)

NewIterator returns an `Iterator` struct.

func (*Iterator) Count

func (it *Iterator) Count() int

Count returns the number of series in the stream.

func (*Iterator) Current

func (it *Iterator) Current() int

Current returns the number of the series that we're currently on. This decrements after each call and returns less than zero on EOF.

func (*Iterator) Iterate

func (it *Iterator) Iterate(dataWriter io.Writer) (seriesFooter SeriesFooter, checksumOk bool, err error)

Iterate reads the next series in the stream, from the back of the stream to the front.

Example

ExampleIterator_Iterate shows how to parse and step through stream data. Remember that we'll start from the end and step backwards.

See ExampleStreamReader_ReadSeriesWithIndexedInfo for an example of how to perform random or ordered reads of series within a stream (instead of having to step backward through all of them, in order).

b := rifs.NewSeekableBuffer()

// Stage stream.

sb := NewStreamBuilder(b)

series := AddTestSeries(sb)

for i, seriesFooter := range series {
	fmt.Printf("Test series (%d): [%s]\n", i, seriesFooter.Uuid())
}

fmt.Printf("\n")

_, err := sb.Finish()
log.PanicIf(err)

raw := b.Bytes()

// Open the stream.

r := bytes.NewReader(raw)

sr := NewStreamReader(r)
sr.SetStructureLogging(true)

it, err := NewIterator(sr)
log.PanicIf(err)

// Very cheap calls. Keep in mind that we will actually iterate through
// these in reverse order, below.
fmt.Printf("Number of series recorded in stream footer: (%d)\n", it.Count())

sisi := it.SeriesInfo(0)
fmt.Printf("Indexed series 0: %s\n", sisi.Uuid())

sisi = it.SeriesInfo(1)
fmt.Printf("Indexed series 1: %s\n", sisi.Uuid())

fmt.Printf("\n")

// Read first encountered series.

seriesNumber := it.Current()

seriesData := new(bytes.Buffer)

seriesFooter, checksumOk, err := it.Iterate(seriesData)
log.PanicIf(err)

if checksumOk != true {
	log.Panicf("first encountered checksum does not match")
}

fmt.Printf("Encountered series (%d): %s\n", seriesNumber, seriesFooter.Uuid())

// This is the original time-series' blob. It's the caller's responsibility
// to encode it and decode it.
fmt.Printf("Series (%d) data: %s\n", seriesNumber, string(seriesData.Bytes()))

// Read second encountered series.

seriesNumber = it.Current()

seriesData = new(bytes.Buffer)

seriesFooter, checksumOk, err = it.Iterate(seriesData)
log.PanicIf(err)

if checksumOk != true {
	log.Panicf("second encountered checksum does not match")
}

fmt.Printf("Encountered series (%d): %s\n", seriesNumber, seriesFooter.Uuid())

// This is the original time-series' blob. It's the caller's responsibility
// to encode it and decode it.
fmt.Printf("Series (%d) data: %s\n", seriesNumber, string(seriesData.Bytes()))

// Check EOF.

_, _, err = it.Iterate(nil)
if err != io.EOF {
	log.Panicf("expected EOF")
}

fmt.Printf("\n")

// Show that the structure loggingrepresents the offsets in reverse order
// (the order that they're visited). Note that certain milestones will
// include more than one entry. Some milestones can't be completely
// interpreted/applied until more information is read. So, we'll log those
// milestones as soon as they're encountered as well as when we have more
// information about them.
sr.Structure().Dump()
Output:

Test series (0): [d095abf5-126e-48a7-8974-885de92bd964]
Test series (1): [8a4ba0c4-0a0d-442f-8256-1d61adb16abc]

Number of series recorded in stream footer: (2)
Indexed series 0: d095abf5-126e-48a7-8974-885de92bd964
Indexed series 1: 8a4ba0c4-0a0d-442f-8256-1d61adb16abc

Encountered series (1): 8a4ba0c4-0a0d-442f-8256-1d61adb16abc
Series (1) data: X some time series data 2 X
Encountered series (0): d095abf5-126e-48a7-8974-885de92bd964
Series (0) data: some time series data

================
Stream Structure
================

OFF 553      MT boundary_marker                 SCOPE stream   UUID                                           COMM
             MT boundary_marker                 SCOPE misc     UUID                                           COMM
OFF 548      MT shadow_footer_head_byte         SCOPE misc     UUID                                           COMM
OFF 348      MT footer_head_byte                SCOPE misc     UUID                                           COMM
             MT stream_footer_head_byte         SCOPE stream   UUID                                           COMM
             MT stream_footer_decoded           SCOPE stream   UUID                                           COMM Stream: StreamFooter1<COUNT=(2)>
OFF 347      MT boundary_marker                 SCOPE series   UUID                                           COMM
             MT boundary_marker                 SCOPE misc     UUID                                           COMM
OFF 342      MT shadow_footer_head_byte         SCOPE misc     UUID                                           COMM
OFF 198      MT footer_head_byte                SCOPE misc     UUID                                           COMM
             MT series_footer_head_byte         SCOPE series   UUID                                           COMM
             MT series_footer_decoded           SCOPE series   UUID 8a4ba0c4-0a0d-442f-8256-1d61adb16abc      COMM
OFF 171      MT series_data_head_byte           SCOPE series   UUID 8a4ba0c4-0a0d-442f-8256-1d61adb16abc      COMM
OFF 170      MT boundary_marker                 SCOPE series   UUID                                           COMM
             MT boundary_marker                 SCOPE misc     UUID                                           COMM
OFF 165      MT shadow_footer_head_byte         SCOPE misc     UUID                                           COMM
OFF 21       MT footer_head_byte                SCOPE misc     UUID                                           COMM
             MT series_footer_head_byte         SCOPE series   UUID                                           COMM
             MT series_footer_decoded           SCOPE series   UUID d095abf5-126e-48a7-8974-885de92bd964      COMM
OFF 0        MT series_data_head_byte           SCOPE series   UUID d095abf5-126e-48a7-8974-885de92bd964      COMM

func (*Iterator) SeriesInfo

func (it *Iterator) SeriesInfo(i int) StreamIndexedSequenceInfo

SeriesInfo efficiently returns summary information for one of the series in the stream.

type MilestoneType

type MilestoneType string

MilestoneType is the name of the event-type.

const (
	// MtSeriesDataHeadByte marks the first byte of a series' time-series data.
	MtSeriesDataHeadByte MilestoneType = "series_data_head_byte"

	// MtFooterHeadByte marks the first byte of a footer of a yet-unidentified
	// type.
	MtFooterHeadByte MilestoneType = "footer_head_byte"

	// MtSeriesFooterHeadByte marks the first byte of a series footer.
	MtSeriesFooterHeadByte MilestoneType = "series_footer_head_byte"

	// MtStreamFooterHeadByte marks the first byte of a stream footer.
	MtStreamFooterHeadByte MilestoneType = "stream_footer_head_byte"

	// MtBoundaryMarker identifies a boundary marker.
	MtBoundaryMarker MilestoneType = "boundary_marker"

	// MtShadowFooterHeadByte marks the first byte of the static shadow footer
	// that immediately follows any other type of footer.
	MtShadowFooterHeadByte MilestoneType = "shadow_footer_head_byte"

	// MtSeriesFooterDecoded marks the first byte of a series footer that has
	// been successfully decoded.
	MtSeriesFooterDecoded MilestoneType = "series_footer_decoded"

	// MtStreamFooterDecoded marks the first byte of a stream footer that has
	// been successfully decoded.
	MtStreamFooterDecoded MilestoneType = "stream_footer_decoded"
)

type ScopeType

type ScopeType int

ScopeType is which type of data the event applies to.

const (
	// StSeries describes milestones that pertain to series.
	StSeries ScopeType = iota

	// StStream describes milestones that pertain to streams.
	StStream ScopeType = iota

	// StMisc describes milestones that are either agnostic (not likely) or
	// could be any other scope type but there's not yet enough information
	// to tell (likely).
	StMisc ScopeType = iota
)

type SeriesDataDatasourceReader

type SeriesDataDatasourceReader interface {
	ReadData(r io.Reader, sf SeriesFooter) (n int, err error)
}

SeriesDataDatasourceReader can be provided by a call to read the data themselves rather than providing an `io.Writer`.

type SeriesDataDatasourceReaderWrapper

type SeriesDataDatasourceReaderWrapper struct {
	// contains filtered or unexported fields
}

SeriesDataDatasourceReaderWrapper wraps a simple `io.Writer` and satisfies the `SeriesDataDatasourceReader` interface. It essentially converts a writer to a reader. This may not have a practical use, but we use it for testing.

func NewSeriesDataDatasourceReaderWrapperFromWriter

func NewSeriesDataDatasourceReaderWrapperFromWriter(w io.Writer) SeriesDataDatasourceReaderWrapper

NewSeriesDataDatasourceReaderWrapperFromWriter creates a new `SeriesDataDatasourceWriterWrapper` struct.

func (SeriesDataDatasourceReaderWrapper) ReadData

func (sddww SeriesDataDatasourceReaderWrapper) ReadData(r io.Reader, sf SeriesFooter) (n int, err error)

ReadData copies the reader to the writer.

type SeriesDataDatasourceWriter

type SeriesDataDatasourceWriter interface {
	WriteData(w io.Writer, sf SeriesFooter) (n int, err error)
}

SeriesDataDatasourceWriter can be provided by the caller to write the series- data themselves if an `io.Reader` is too simple for them.

type SeriesDataDatasourceWriterWrapper

type SeriesDataDatasourceWriterWrapper struct {
	// contains filtered or unexported fields
}

SeriesDataDatasourceWriterWrapper wraps a simple `io.Reader` and satisfies the `SeriesDataDatasourceWriter` interface. It essentially converts a reader to a writer. This may not have a practical use, but we use it for testing.

func NewSeriesDataDatasourceWriterWrapperFromReader

func NewSeriesDataDatasourceWriterWrapperFromReader(r io.Reader) SeriesDataDatasourceWriterWrapper

NewSeriesDataDatasourceWriterWrapperFromReader creates a new `SeriesDataDatasourceWriterWrapper` struct.

func (SeriesDataDatasourceWriterWrapper) WriteData

func (sddww SeriesDataDatasourceWriterWrapper) WriteData(w io.Writer, sf SeriesFooter) (n int, err error)

WriteData copies the reader to the writer.

type SeriesFooter

type SeriesFooter interface {
	// Uuid is a unique string that uniquely identifies this series.
	Uuid() string

	// HeadRecordTime is the timestamp of the first record
	HeadRecordTime() time.Time

	// TailRecordTime is the timestamp of the last record
	TailRecordTime() time.Time

	// BytesLength() is the number of bytes occupied on-disk
	BytesLength() uint64

	// RecordCount is the number of records in the list
	RecordCount() uint64

	// CreatedTime is the timestamp of the first write of this series
	CreatedTime() time.Time

	// UpdatedTime is the timestamp of the last update
	UpdatedTime() time.Time

	// SourceSha1 is the SHA1 of the raw source-data; can be used to determine
	// if the source-data has changed
	SourceSha1() []byte

	// DataFnv1aChecksum is the FNV-1a checksum of the time-series data on-disk
	DataFnv1aChecksum() uint32

	// Version returns the version of the footer.
	Version() SeriesFooterVersion

	// TouchUpdatedTime sets the 'updated time' field to now.
	TouchUpdatedTime()

	// SetBytesLength is used to set the bytes-length after the data is written
	// and the count is attained.
	SetBytesLength(bytesLength uint64)
}

SeriesFooter describes data derived from a stream footer.

type SeriesFooter1

type SeriesFooter1 struct {
	// contains filtered or unexported fields
}

SeriesFooter1 describes the data in a single series. Version 1.

func AddTestSeries

func AddTestSeries(sb *StreamBuilder) (footers []*SeriesFooter1)

AddTestSeries will append two test series to the given builder.

func NewSeriesFooter1

func NewSeriesFooter1(headRecordTime time.Time, tailRecordTime time.Time, recordCount uint64, sourceSha1 []byte) *SeriesFooter1

NewSeriesFooter1 returns a series footer structure. Version 1. The checksum will be populated on write.

func NewSeriesFooter1FromEncoded

func NewSeriesFooter1FromEncoded(footerBytes []byte) (sf *SeriesFooter1, err error)

NewSeriesFooter1FromEncoded returns a series footer struct (version 1). The checksum that was recorded during the write will be populated.

func (*SeriesFooter1) BytesLength

func (sf *SeriesFooter1) BytesLength() uint64

BytesLength is the number of bytes of series data.

func (*SeriesFooter1) CreatedTime

func (sf *SeriesFooter1) CreatedTime() time.Time

CreatedTime is the timestamp of the first write of this series

func (*SeriesFooter1) DataFnv1aChecksum

func (sf *SeriesFooter1) DataFnv1aChecksum() uint32

DataFnv1aChecksum is the FNV-1a checksum of the original data. This is set and checked automatically, though the result of the check is returned to the caller rather than being enforced by us.

func (*SeriesFooter1) HeadRecordTime

func (sf *SeriesFooter1) HeadRecordTime() time.Time

HeadRecordTime is the earliest timestamp represented in the series data.

func (*SeriesFooter1) RecordCount

func (sf *SeriesFooter1) RecordCount() uint64

RecordCount is the number of records in the series-data.

func (*SeriesFooter1) SetBytesLength

func (sf *SeriesFooter1) SetBytesLength(bytesLength uint64)

func (*SeriesFooter1) SourceSha1

func (sf *SeriesFooter1) SourceSha1() []byte

SourceSha1 is the SHA1 of the original data.

func (*SeriesFooter1) String

func (sf *SeriesFooter1) String() string

func (*SeriesFooter1) TailRecordTime

func (sf *SeriesFooter1) TailRecordTime() time.Time

TailRecordTime is the latest timestamp represented in the series data.

func (*SeriesFooter1) TouchUpdatedTime

func (sf *SeriesFooter1) TouchUpdatedTime()

TouchUpdatedTime bumps the updated-time field.

func (*SeriesFooter1) UpdatedTime

func (sf *SeriesFooter1) UpdatedTime() time.Time

UpdatedTime is the timestamp of the last update

func (*SeriesFooter1) Uuid

func (sf *SeriesFooter1) Uuid() string

Uuid returns the UUID of the series.

func (*SeriesFooter1) Version

func (sf *SeriesFooter1) Version() SeriesFooterVersion

Version returns the series-protocol represented by this struct.

type SeriesFooterVersion

type SeriesFooterVersion uint16

SeriesFooterVersion enum

const (
	// SeriesFooterVersion1 represents version 1 of the footer that describes a
	// single series in the stream.
	SeriesFooterVersion1 SeriesFooterVersion = 1
)

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

StreamBuilder is the high-level interface that owns the stream-building process and wraps `StreamWriter`.

func NewStreamBuilder

func NewStreamBuilder(ws io.WriteSeeker) *StreamBuilder

NewStreamBuilder returns a new `StreamBuilder`.

func (*StreamBuilder) AddSeries

func (sb *StreamBuilder) AddSeries(seriesDataWriter interface{}, sf SeriesFooter) (err error)

AddSeries adds a single series and associated metadata to the stream. The actual series data is provided to us by the caller in serialized (encoded) form from whatever their original format was.

func (*StreamBuilder) AddSeriesNoWrite

func (sb *StreamBuilder) AddSeriesNoWrite(footerDataPosition int64, totalSeriesSize int, sf SeriesFooter) (err error)

AddSeriesNoWrite logs a single series and associated metadata but doesn't actually write. It will be written (or potentially retained) through other means.

func (*StreamBuilder) Finish

func (sb *StreamBuilder) Finish() (totalSize int, err error)

Finish will finalize/complete the stream.

func (*StreamBuilder) NextOffset

func (sb *StreamBuilder) NextOffset() int64

NextOffset returns the position that the head bytes

func (*StreamBuilder) SetStructureLogging

func (sb *StreamBuilder) SetStructureLogging(flag bool)

SetStructureLogging enables/disables structure tracking.

func (*StreamBuilder) StreamWriter

func (sb *StreamBuilder) StreamWriter() *StreamWriter

StreamWriter returns the underlying `StreamWriter` struct.

func (*StreamBuilder) Structure

func (sb *StreamBuilder) Structure() *StreamStructure

Structure returns the `StreamStructure` struct (if enabled).

type StreamFooter

type StreamFooter interface {
	Series() []StreamIndexedSequenceInfo
}

StreamFooter describes a type that can return summary information about the series in a stream. This represents a basic encoded stream type.

func NewStreamFooter1FromEncoded

func NewStreamFooter1FromEncoded(footerBytes []byte) (sf StreamFooter, err error)

NewStreamFooter1FromEncoded decodes the given bytes and returns a `StreamFooter`-compatible struct.

func NewStreamFooter1FromStreamIndexedSequenceInfoSlice

func NewStreamFooter1FromStreamIndexedSequenceInfoSlice(series []StreamIndexedSequenceInfo) StreamFooter

NewStreamFooter1FromStreamIndexedSequenceInfoSlice returns a new `StreamFooter`-compatible struct.

type StreamFooter1

type StreamFooter1 struct {
	// contains filtered or unexported fields
}

StreamFooter1 represents the stream footer (version 1) that's encoded in the stream.

func (*StreamFooter1) Series

Series returns a list of all of the summary series information.

func (*StreamFooter1) String

func (sf *StreamFooter1) String() string

type StreamFooterVersion

type StreamFooterVersion uint16

StreamFooterVersion enum

const (
	// StreamFooterVersion1 represents version 1 of the footer that describes
	// the whole stream.
	StreamFooterVersion1 StreamFooterVersion = 1
)

type StreamIndexedSequenceInfo

type StreamIndexedSequenceInfo interface {
	// Uuid is a unique string that uniquely identifies this series.
	Uuid() string

	// HeadRecordTime is the timestamp of the first record
	HeadRecordTime() time.Time

	// TailRecordTime is the timestamp of the last record
	TailRecordTime() time.Time

	// AbsolutePosition is the absolute position of the boundary marker (NUL)
	AbsolutePosition() int64
}

StreamIndexedSequenceInfo describes summary information for a single series encoded into the stream footer.

type StreamIndexedSequenceInfo1

type StreamIndexedSequenceInfo1 struct {
	// contains filtered or unexported fields
}

StreamIndexedSequenceInfo1 briefly describes all series.

func NewStreamIndexedSequenceInfo1

func NewStreamIndexedSequenceInfo1(uuid string, headRecordTime, tailRecordTime time.Time, absolutePosition int64) *StreamIndexedSequenceInfo1

NewStreamIndexedSequenceInfo1 returns a sequence-info structure.

func NewStreamIndexedSequenceInfo1WithSeriesFooter

func NewStreamIndexedSequenceInfo1WithSeriesFooter(seriesFooter SeriesFooter, absolutePosition int64) *StreamIndexedSequenceInfo1

NewStreamIndexedSequenceInfo1WithSeriesFooter returns a summary `StreamIndexedSequenceInfo1` struct representing the given `SeriesFooter`-compatible struct.

func (StreamIndexedSequenceInfo1) AbsolutePosition

func (sisi StreamIndexedSequenceInfo1) AbsolutePosition() int64

AbsolutePosition is the absolute position of the boundary marker (NUL)

func (StreamIndexedSequenceInfo1) HeadRecordTime

func (sisi StreamIndexedSequenceInfo1) HeadRecordTime() time.Time

HeadRecordTime is the timestamp of the first record

func (StreamIndexedSequenceInfo1) String

func (sisi StreamIndexedSequenceInfo1) String() string

func (StreamIndexedSequenceInfo1) TailRecordTime

func (sisi StreamIndexedSequenceInfo1) TailRecordTime() time.Time

TailRecordTime is the timestamp of the last record

func (StreamIndexedSequenceInfo1) Uuid

func (sisi StreamIndexedSequenceInfo1) Uuid() string

Uuid is the timestamp of the first record

type StreamReader

type StreamReader struct {
	// contains filtered or unexported fields
}

StreamReader knows how to parse the raw stream.

func NewStreamReader

func NewStreamReader(rs io.ReadSeeker) *StreamReader

NewStreamReader returns a new `StreamReader`.

func (*StreamReader) ReadSeriesInfoWithBoundaryPosition

func (sr *StreamReader) ReadSeriesInfoWithBoundaryPosition(position int64) (seriesFooter SeriesFooter, dataOffset int64, seriesSize int, err error)

ReadSeriesInfoWithBoundaryPosition returns a `SeriesFooter` for the series whose boundary marker is at the given position.

func (*StreamReader) ReadSeriesInfoWithIndexedInfo

func (sr *StreamReader) ReadSeriesInfoWithIndexedInfo(sisi StreamIndexedSequenceInfo) (seriesFooter SeriesFooter, dataOffset int64, seriesSize int, err error)

ReadSeriesInfoWithIndexedInfo returns the `SeriesFooter` struct described by the given `StreamIndexedSequenceInfo` struct.

func (*StreamReader) ReadSeriesWithIndexedInfo

func (sr *StreamReader) ReadSeriesWithIndexedInfo(sisi StreamIndexedSequenceInfo, seriesDataReader interface{}) (seriesFooter SeriesFooter, seriesSize int, checksumOk bool, err error)

ReadSeriesWithIndexedInfo returns the `SeriesFooter` struct described by the given `StreamIndexedSequenceInfo` struct and writes the raw data associated with it to `dataWriter`.

func (*StreamReader) Reset

func (sr *StreamReader) Reset() (err error)

Reset will put us at the end of the file. This is required in order to iterate.

func (*StreamReader) SetStructureLogging

func (sr *StreamReader) SetStructureLogging(flag bool)

SetStructureLogging enables/disables structure tracking.

func (*StreamReader) Structure

func (sr *StreamReader) Structure() *StreamStructure

Structure returns the `StreamStructure` struct (if enabled).

type StreamStructure

type StreamStructure struct {
	// contains filtered or unexported fields
}

StreamStructure holds all of the milestones recorded for a given stream.

func NewStreamStructure

func NewStreamStructure() *StreamStructure

NewStreamStructure returns a new `StreamStructure`.

func (*StreamStructure) AllSeriesMilestones

func (ss *StreamStructure) AllSeriesMilestones() (milestoneIndex map[string][]StreamStructureOffsetInfo)

AllSeriesMilestones returns a map of all recorded series.

func (*StreamStructure) Dump

func (ss *StreamStructure) Dump()

Dump prints a table with all of the recorded milestones in the order that they were encountered.

Example

ExampleStreamStructure_Dump shows how to use structure-tracking to print the structure of the stream. The `Structure()` and `SetStructureLogging` methods (to enable and retrieve the `StreamStructure` struct, if enabled) are available on the `StreamBuilder`, “StreamReader`, `StreamWriter`, and `Updater` types.

This table is printed in forward order when writing a stream and reverse order when reading a stream.

Columns: 1) absolute offset in the stream, 2) milestone type (describes the type of data), 3) scope type (describes whether it's related to streams or series, or 'misc' if not enough is known yet during a parse), 4) UUID (only for series; usually present for at least all 'series_data_head_byte' milestone types), 5) milestone comment (not stored in original data).

b := rifs.NewSeekableBuffer()
sb := NewStreamBuilder(b)
sb.SetStructureLogging(true)

AddTestSeries(sb)

_, err := sb.Finish()
log.PanicIf(err)

sb.Structure().Dump()
Output:

================
Stream Structure
================

OFF 0        MT series_data_head_byte           SCOPE series   UUID d095abf5-126e-48a7-8974-885de92bd964      COMM
OFF 21       MT series_footer_head_byte         SCOPE series   UUID d095abf5-126e-48a7-8974-885de92bd964      COMM
OFF 165      MT shadow_footer_head_byte         SCOPE series   UUID                                           COMM
OFF 170      MT boundary_marker                 SCOPE series   UUID                                           COMM
OFF 171      MT series_data_head_byte           SCOPE series   UUID 8a4ba0c4-0a0d-442f-8256-1d61adb16abc      COMM
OFF 198      MT series_footer_head_byte         SCOPE series   UUID 8a4ba0c4-0a0d-442f-8256-1d61adb16abc      COMM
OFF 342      MT shadow_footer_head_byte         SCOPE series   UUID                                           COMM
OFF 347      MT boundary_marker                 SCOPE series   UUID                                           COMM
OFF 348      MT stream_footer_head_byte         SCOPE stream   UUID                                           COMM Stream: StreamFooter1<COUNT=(2)>
OFF 548      MT shadow_footer_head_byte         SCOPE stream   UUID                                           COMM
OFF 553      MT boundary_marker                 SCOPE stream   UUID                                           COMM

func (*StreamStructure) Milestones

func (ss *StreamStructure) Milestones() []StreamStructureOffsetInfo

Milestones returns all recorded milestones.

func (*StreamStructure) MilestonesWithFilter

func (ss *StreamStructure) MilestonesWithFilter(milestoneType string, scopeType int) []StreamStructureOffsetInfo

MilestonesWithFilter returns all milestones, optionally applying a filter.

func (*StreamStructure) Push

func (ss *StreamStructure) Push(offset int64, milestoneType MilestoneType, scopeType ScopeType, seriesUuid string, comment string)

Push records a single event.

func (*StreamStructure) SeriesMilestones

func (ss *StreamStructure) SeriesMilestones(uuid string) []StreamStructureOffsetInfo

SeriesMilestones returns all series-specific milestones,optionally filtering for a specific one. Returned as a flat list.

func (*StreamStructure) StreamMilestones

func (ss *StreamStructure) StreamMilestones() []StreamStructureOffsetInfo

StreamMilestones returns all stream-specific milestones.

func (*StreamStructure) String

func (ss *StreamStructure) String() string

type StreamStructureOffsetInfo

type StreamStructureOffsetInfo struct {
	Offset int64

	MilestoneType MilestoneType

	ScopeType ScopeType

	// SeriesUuid is a UUID of the series, if this offset refers to a series.
	SeriesUuid string

	Comment string
}

StreamStructureOffsetInfo describes a single recorded milestone.

func (StreamStructureOffsetInfo) String

func (ssoi StreamStructureOffsetInfo) String() string

type StreamWriter

type StreamWriter struct {
	// contains filtered or unexported fields
}

StreamWriter owns the semantics of encoding our storage structs to the raw bytes.

func NewStreamWriter

func NewStreamWriter(w io.Writer) *StreamWriter

NewStreamWriter returns a new `StreamWriter` struct.

func (*StreamWriter) SetStructureLogging

func (sw *StreamWriter) SetStructureLogging(flag bool)

SetStructureLogging enabled/disables structure logging.

func (*StreamWriter) Structure

func (sw *StreamWriter) Structure() *StreamStructure

Structure returns the recorded structure (if enabled).

func (*StreamWriter) Write

func (sw *StreamWriter) Write(data []byte) (n int, err error)

type Truncater

type Truncater interface {
	Truncate(size int64) error
}

Truncater is a type that knows how to truncate its bytes stream. This will frequently be paired with a `io.ReadWriteSeeker`.

type UpdateStats

type UpdateStats struct {
	Skips int
	Adds  int
	Drops int
}

UpdateStats keeps a tally of various operations.

func (UpdateStats) String

func (us UpdateStats) String() string

type Updater

type Updater struct {
	// contains filtered or unexported fields
}

Updater manages syncing what the caller has with what is stored.

  1. Copy all unchanged series, in their current sequence, from where they currently are to the front of the file.
  1. Use the data-writer interface to generate a serialized representation of the changed/new ones. Place them at the end in the order that they were stored before (those that are being updated) or in the order they were added (the new ones).

func NewUpdater

func NewUpdater(rws io.ReadWriteSeeker, seriesDataWriter interface{}) *Updater

NewUpdater returns a new `Updater` struct.

func (*Updater) AddSeries

func (updater *Updater) AddSeries(seriesFooter SeriesFooter)

AddSeries queues a series to be added. It's not actually written until Write() is called.

Example
b := rifs.NewSeekableBuffer()

// Stage stream.

sb := NewStreamBuilder(b)
sb.SetStructureLogging(true)

series := AddTestSeries(sb)

_, err := sb.Finish()
log.PanicIf(err)

fmt.Printf("\n")
fmt.Printf("Original:\n")
fmt.Printf("\n")

sb.Structure().Dump()

raw := b.Bytes()

// Update the stream with a new series.

sourceSha13 := []byte{
	77,
	88,
	99,
}

now := time.Now()

series3 := NewSeriesFooter1(
	now.Add(time.Second*20),
	now.Add(time.Second*30),
	33,
	sourceSha13)

series3.SetBytesLength(uint64(len(TestTimeSeriesData2)))

// Force a specific UUID so we know the exact output in support of the
// testable examples.
series3.uuid = "9a0e2d13-d14f-4a57-b43c-24bd3de6581e"

dataReader3 := bytes.NewBuffer(TestTimeSeriesData2)

sdtg := &SeriesDataTestGenerator{
	data: map[string]io.Reader{
		series3.Uuid(): dataReader3,
	},
}

rws := rifs.NewSeekableBufferWithBytes(raw)
updater := NewUpdater(rws, sdtg)

updater.AddSeries(series[0])
updater.AddSeries(series[1])
updater.AddSeries(series3)

_, _, err = updater.Write()
log.PanicIf(err)

finalRaw := rws.Bytes()

// Read the new stream.

r := bytes.NewReader(finalRaw)

sr := NewStreamReader(r)
sr.SetStructureLogging(true)

it, err := NewIterator(sr)
log.PanicIf(err)

for {
	_, _, err := it.Iterate(nil)
	if err == io.EOF {
		break
	}
}

fmt.Printf("Updated:\n")
fmt.Printf("\n")

sr.Structure().Dump()
Output:

func (*Updater) SetStructureLogging

func (updater *Updater) SetStructureLogging(flag bool)

SetStructureLogging enables/disables structure tracking.

func (*Updater) Structure

func (updater *Updater) Structure() *StreamStructure

Structure returns the `StreamStructure` struct (if enabled).

func (*Updater) Write

func (updater *Updater) Write() (totalSize int, stats UpdateStats, err error)

Write executes the queued changes.

Directories

Path Synopsis
protocol

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL