README

Build Status godoc reference

NetLog

A lightweight, HTTP-centric, log-based (Kafka-style) message queue.

Alpha software

This is still early software and potentially buggy. To peek at the internals start with BigLog.

Roadmap
  • low-level log management
  • HTTP transport
  • scanner based pub/sub
  • custom data retention policy
  • persistent scanners
  • batching
  • compression
  • good test coverage
  • proper documentation
  • streaming based pub/sub
  • async replication
  • kinesis-compatible transport
  • gRPC transport
Non-goals
  • Match Kafka's performance.
  • Distributed system.
Getting started

While posting and fetching single messages is very inefficient, it's the simplest way to get started using nothing but curl commands.

# compile server
go get github.com/ninibe/netlog/cmd/netlog

# run server
bin/netlog

# create new topic
curl -XPOST localhost:7200/demo

# post messages
curl -XPOST localhost:7200/demo/payload --data-binary "message number one"
curl -XPOST localhost:7200/demo/payload --data-binary "message number two"
curl -XPOST localhost:7200/demo/payload --data-binary "message number three"

# check topic info
curl localhost:7200/demo

# create scanner
curl -XPOST "localhost:7200/demo/scanner?from=0"

export SC="...UUID RETURNED..."

# start scanning...
curl -XGET "localhost:7200/demo/scan?id=$SC"
x times ...

# wait 5 seconds for new messages
curl -XGET "localhost:7200/demo/scan?id=$SC&wait=5s"

# wait 5 minutes
curl -XGET "localhost:7200/demo/scan?id=$SC&wait=5m"

# post more messages in another window
curl -XPOST localhost:7200/demo/payload --data-binary "message number four"
curl -XPOST localhost:7200/demo/payload --data-binary "message number five"

# new scanner since 1 minute ago
curl -XPOST "localhost:7200/demo/scanner?from=1m"

One-line-ish pub/sub
# create new topic
curl -XPOST localhost:7200/pubsubdemo

# get scanner ID with jq
export SCANNER=$(curl -s -XPOST "localhost:7200/pubsubdemo/scanner?from=0&persist=true" | jq -r .id)

# subscribe to the topic
while true; do; curl "localhost:7200/pubsubdemo/scan?id=$SCANNER&wait=1h" && echo; done

# IN ANOTHER WINDOW

# publish on the topic
while true; do; read data; curl localhost:7200/pubsubdemo/payload --data-binary $data; done
# write something and hit enter

Contributing

Contributions are more than welcome, check the contributing guidelines. To ask any questions you can write to the netlog-dev mailing list.

Expand ▾ Collapse ▴

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnknown is returned when an underlying stardard Go error reaches the user.
	ErrUnknown = newErr(http.StatusInternalServerError, "netlog: unkwown error")
	// ErrInvalidDir is returned when the data folder provided does not exists or is not writable.
	ErrInvalidDir = newErr(http.StatusInternalServerError, "netlog: invalid data directory")

	// ErrBadRequest is returned when invalid parameters are received.
	ErrBadRequest = newErr(http.StatusBadRequest, "netlog: bad request")
	// ErrInvalidOffset is returned when the requested offset can not be parsed into an number.
	ErrInvalidOffset = newErr(http.StatusBadRequest, "netlog: invalid offset")
	// ErrInvalidDuration is returned when a given big duration can not be parsed
	ErrInvalidDuration = newErr(http.StatusBadRequest, "netlog: invalid duration")
	// ErrInvalidCompression is returning when the compression type defined is unknown
	ErrInvalidCompression = newErr(http.StatusBadRequest, "netlog: invalid compression type")
	// ErrTopicExists is returning when trying to create an already existing topic.
	ErrTopicExists = newErr(http.StatusBadRequest, "netlog: topic exists")
	// ErrEndOfTopic is returned when the reader has read all the way until the end of the topic.
	ErrEndOfTopic = newErr(http.StatusNotFound, "netlog: end of topic")
	// ErrTopicNotFound is returned when addressing an non-existing topic.
	ErrTopicNotFound = newErr(http.StatusNotFound, "netlog: topic not found")

	// ErrScannerNotFound is returning when using a non-existing scanner ID.
	ErrScannerNotFound = newErr(http.StatusNotFound, "netlog: scanner not found")
	// ErrOffsetNotFound is returning when the offset is no longer or not yet present in the topic.
	ErrOffsetNotFound = newErr(http.StatusNotFound, "netlog: offset not found")

	// ErrCRC is returned when a message's payload does not match's the CRC header.
	ErrCRC = newErr(http.StatusInternalServerError, "netlog: checksum error")
	// ErrBusy is retuning when trying to close or delete a topic with readers attached to it.
	ErrBusy = newErr(http.StatusConflict, "netlog: resource busy")
)

Functions

func NopWCloser

func NopWCloser(w io.Writer) io.WriteCloser

    NopWCloser returns a WriteCloser with a no-op Close method wrapping the provided Writer w.

    Types

    type BLTopicScanner

    type BLTopicScanner struct {
    	// contains filtered or unexported fields
    }

      BLTopicScanner implements TopicScanner reading from BigLog.

      func (*BLTopicScanner) Close

      func (ts *BLTopicScanner) Close() error

        Close implements io.Closer and releases the TopicScanner resources.

        func (*BLTopicScanner) ID

        func (ts *BLTopicScanner) ID() string

          ID returns the ID of the scanner

          func (*BLTopicScanner) Info

          func (ts *BLTopicScanner) Info() TScannerInfo

            Info returns a TScannerInfo struct with the scanner's next offset and the initial offset.

            func (*BLTopicScanner) Scan

            func (ts *BLTopicScanner) Scan(ctx context.Context) (m Message, offset int64, err error)

              Scan advances the Scanner to the next message, returning the message and the offset. Scan will block when it reaches EOF until there is more data available, the user must provide a context to cancel the request when it needs to stop waiting.

              type CompressionType

              type CompressionType uint8

                CompressionType indicates a type of compression for message sets

                const (
                	// CompressionDefault is used when falling back to the default compression of the system.
                	CompressionDefault CompressionType = 0
                	// CompressionNone is used by messages sets with uncompressed payloads
                	CompressionNone CompressionType = 1
                	// CompressionGzip is used by message sets with gzipped payloads
                	CompressionGzip CompressionType = 2
                	// CompressionSnappy is used by message sets with snappy payloads
                	CompressionSnappy CompressionType = 3
                )

                type IntegrityChecker

                type IntegrityChecker struct {
                	// contains filtered or unexported fields
                }

                  IntegrityChecker is used to check the integrity of an entire topic.

                  func NewIntegrityChecker

                  func NewIntegrityChecker(t *Topic, from int64) (*IntegrityChecker, error)

                    NewIntegrityChecker creates a new integrity checker for a given topic.

                    func (*IntegrityChecker) Check

                    func (ic *IntegrityChecker) Check(ctx context.Context) (errors []*IntegrityError)

                      Check reads all data collecting errors which then returns. Is recommended to pass a cancellable context since this operation can be slow.

                      func (*IntegrityChecker) Close

                      func (ic *IntegrityChecker) Close() error

                        Close releases the underlying resources.

                        type IntegrityError

                        type IntegrityError struct {
                        	Offset   int64              `json:"offset"`
                        	ODelta   int                `json:"odelta"`
                        	Type     IntegrityErrorType `json:"type"`
                        	Expected string             `json:"expected"`
                        	Actual   string             `json:"actual"`
                        }

                          IntegrityError is the struct with metadata about an any integrity error found.

                          func CheckMessageIntegrity

                          func CheckMessageIntegrity(m Message, delta int) *IntegrityError

                            CheckMessageIntegrity checks the integrity of a single message

                            type IntegrityErrorType

                            type IntegrityErrorType string

                              IntegrityErrorType is the category of possible errors in the data.

                              const (
                              
                              	// IntegrityChecksumErr is returned when the checksum in the message
                              	// header doesn't match the checksum recalculated from the payload.
                              	IntegrityChecksumErr IntegrityErrorType = "checksum"
                              
                              	// IntegrityLengthErr is returned when the length in the message
                              	// header doesn't match the length of the payload.
                              	IntegrityLengthErr IntegrityErrorType = "length"
                              
                              	// IntegrityUnknownErr is returned when data can not be read because
                              	// of an underlying error reading the data.
                              	IntegrityUnknownErr IntegrityErrorType = "unknown"
                              )

                              type Message

                              type Message []byte

                                Message the unit of data storage.

                                func MessageFromPayload

                                func MessageFromPayload(p []byte) Message

                                  MessageFromPayload returns a message with the appropriate calculated headers from a give data payload.

                                  func MessageSet

                                  func MessageSet(msgs []Message, comp CompressionType) Message

                                    MessageSet returns a new message with a batch of compressed messages as payload Compression will compress the payload and set the compression header, please be ware that compression at this level is only meant for batching several messages into a single message-set in increase throughput. MessageSet will panic if a compression type is not provided, since nothing would indicate to streaming clients that further messages are embedded in the payload.

                                    func ReadMessage

                                    func ReadMessage(r io.Reader) (entry Message, err error)

                                      ReadMessage reads a message from r and returns it if the message is compressed it does not attempt to unpack the contents.

                                      func Unpack

                                      func Unpack(set Message) ([]Message, error)

                                        Unpack takes a message-set and returns a slice with the component messages.

                                        func (*Message) Bytes

                                        func (m *Message) Bytes() []byte

                                          Bytes returns the entire message casted back to bytes.

                                          func (*Message) CRC32

                                          func (m *Message) CRC32() uint32

                                            CRC32 returns the checksum of the payload.

                                            func (*Message) ChecksumOK

                                            func (m *Message) ChecksumOK() bool

                                              ChecksumOK recalculates the CRC from the payload and compares it with the one stored in the header

                                              func (*Message) CompVer

                                              func (m *Message) CompVer() uint8

                                                CompVer returns the first byte which reflects both compression and format version.

                                                func (*Message) Compression

                                                func (m *Message) Compression() CompressionType

                                                  Compression returns the compression encoded in bits 4 to 8 of the header.

                                                  func (*Message) PLength

                                                  func (m *Message) PLength() uint32

                                                    PLength returns the length (bytes) of the payload.

                                                    func (*Message) Payload

                                                    func (m *Message) Payload() []byte

                                                      Payload returns the data bytes.

                                                      func (*Message) Size

                                                      func (m *Message) Size() int

                                                        Size returns the total size in bytes of the message.

                                                        func (*Message) Version

                                                        func (m *Message) Version() uint8

                                                          Version returns the format version encoded in bits 0 to 3 of the header.

                                                          type NLError

                                                          type NLError interface {
                                                          	Error() string
                                                          	String() string
                                                          	StatusCode() int
                                                          }

                                                            NLError is a known NetLog error with an associates status code.

                                                            func ExtErr

                                                            func ExtErr(err error) NLError

                                                              ExtErr maps external errors, mostly BigLog errors to NetLog errors.

                                                              type NetLog

                                                              type NetLog struct {
                                                              	// contains filtered or unexported fields
                                                              }

                                                                NetLog is the main struct that serves a set of topics, usually it must be wrapped with an HTTP transport.

                                                                func NewNetLog

                                                                func NewNetLog(dataDir string, opts ...Option) (nl *NetLog, err error)

                                                                  NewNetLog creates a new NetLog in a given data folder that must exist and be writable.

                                                                  func (*NetLog) CreateTopic

                                                                  func (nl *NetLog) CreateTopic(name string, settings TopicSettings) (t *Topic, err error)

                                                                    CreateTopic creates a new topic with a given name and default settings.

                                                                    func (*NetLog) DeleteTopic

                                                                    func (nl *NetLog) DeleteTopic(name string, force bool) (err error)

                                                                      DeleteTopic deletes an existing topic by name.

                                                                      func (*NetLog) Topic

                                                                      func (nl *NetLog) Topic(name string) (*Topic, error)

                                                                        Topic returns an existing topic by name.

                                                                        func (*NetLog) TopicList

                                                                        func (nl *NetLog) TopicList() []string

                                                                          TopicList returns the list of existing topic names.

                                                                          type Option

                                                                          type Option func(*NetLog)

                                                                            Option is the type of function used to set internal parameters.

                                                                            func DefaultTopicSettings

                                                                            func DefaultTopicSettings(settings TopicSettings) Option

                                                                              DefaultTopicSettings sets the default topic settings used if no other is defined at creation time.

                                                                              func MonitorInterval

                                                                              func MonitorInterval(interval bigduration.BigDuration) Option

                                                                                MonitorInterval defines de interval at which the segment monitor in charge of spiting and discarding segments runs.

                                                                                type PersistentTopicScanner

                                                                                type PersistentTopicScanner struct {
                                                                                	// contains filtered or unexported fields
                                                                                }

                                                                                  PersistentTopicScanner synchronizes the underlying scanner state to a given writer

                                                                                  func (*PersistentTopicScanner) Close

                                                                                  func (p *PersistentTopicScanner) Close() error

                                                                                    Close deletes the offset tracking file, closes the offset channel and closes the underlying scanner

                                                                                    func (*PersistentTopicScanner) ID

                                                                                      ID the ID of the scanner

                                                                                      func (*PersistentTopicScanner) Info

                                                                                        Info returns a TScannerInfo struct with the scanner's next offset and the last scanned one

                                                                                        func (*PersistentTopicScanner) Scan

                                                                                        func (p *PersistentTopicScanner) Scan(ctx context.Context) (m Message, offset int64, err error)

                                                                                          Scan offloads the actual scan to the underlying scanner while updates the last read offset

                                                                                          type SegmentMonitor

                                                                                          type SegmentMonitor struct {
                                                                                          	// contains filtered or unexported fields
                                                                                          }

                                                                                            SegmentMonitor periodically checks for segments to split or discard at a given interval.

                                                                                            type StreamerAtomicMap

                                                                                            type StreamerAtomicMap struct {
                                                                                            	// contains filtered or unexported fields
                                                                                            }

                                                                                              StreamerAtomicMap is a copy-on-write thread-safe map of pointers to Streamer

                                                                                              func NewStreamerAtomicMap

                                                                                              func NewStreamerAtomicMap() *StreamerAtomicMap

                                                                                                NewStreamerAtomicMap returns a new initialized StreamerAtomicMap

                                                                                                func (*StreamerAtomicMap) Delete

                                                                                                func (am *StreamerAtomicMap) Delete(key string)

                                                                                                  Delete removes the pointer to Streamer under key from the map

                                                                                                  func (*StreamerAtomicMap) Get

                                                                                                  func (am *StreamerAtomicMap) Get(key string) (value *biglog.Streamer, ok bool)

                                                                                                    Get returns a pointer to Streamer for a given key

                                                                                                    func (*StreamerAtomicMap) GetAll

                                                                                                    func (am *StreamerAtomicMap) GetAll() map[string]*biglog.Streamer

                                                                                                      GetAll returns the underlying map of pointers to Streamer this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

                                                                                                      func (*StreamerAtomicMap) Len

                                                                                                      func (am *StreamerAtomicMap) Len() int

                                                                                                        Len returns the number of elements in the map

                                                                                                        func (*StreamerAtomicMap) Set

                                                                                                        func (am *StreamerAtomicMap) Set(key string, value *biglog.Streamer)

                                                                                                          Set inserts in the map a pointer to Streamer under a given key

                                                                                                          type TScannerInfo

                                                                                                          type TScannerInfo struct {
                                                                                                          	ID      string `json:"id"`
                                                                                                          	Next    int64  `json:"next"`
                                                                                                          	From    int64  `json:"from"`
                                                                                                          	Persist bool   `json:"persistent"`
                                                                                                          }

                                                                                                            TScannerInfo holds the scanner's offset information

                                                                                                            type Topic

                                                                                                            type Topic struct {
                                                                                                            	// contains filtered or unexported fields
                                                                                                            }

                                                                                                              Topic is a log of linear messages.

                                                                                                              func (*Topic) CheckIntegrity

                                                                                                              func (t *Topic) CheckIntegrity(ctx context.Context, from int64) ([]*IntegrityError, error)

                                                                                                                CheckIntegrity scans the topic and checks for inconsistencies in the data

                                                                                                                func (*Topic) CheckSegments

                                                                                                                func (t *Topic) CheckSegments() error

                                                                                                                  CheckSegments is called by the runner and discards or splits segments when conditions are met.

                                                                                                                  func (*Topic) DeleteScanner

                                                                                                                  func (t *Topic) DeleteScanner(ID string) (err error)

                                                                                                                    DeleteScanner removes the scanner from the topic

                                                                                                                    func (*Topic) DirPath

                                                                                                                    func (t *Topic) DirPath() string

                                                                                                                      DirPath returns the absolute path to the folder with the topic's files

                                                                                                                      func (*Topic) FlushBuffered

                                                                                                                      func (t *Topic) FlushBuffered() error

                                                                                                                        FlushBuffered flushes all buffered messages into the BigLog. Notice that the BigLog might have a buffer on its own that this function does not flush, so calling this does not mean the data has been stored on disk.

                                                                                                                        func (*Topic) Info

                                                                                                                        func (t *Topic) Info() (i *TopicInfo, err error)

                                                                                                                          Info provides all public topic information.

                                                                                                                          func (*Topic) Name

                                                                                                                          func (t *Topic) Name() string

                                                                                                                            Name returns the Topic's name, which maps to the folder name

                                                                                                                            func (*Topic) NewScanner

                                                                                                                            func (t *Topic) NewScanner(from int64, persist bool) (ts TopicScanner, err error)

                                                                                                                              NewScanner creates a new scanner starting at offset `from`. If `persist` is true, the scanner and it's state will survive server restarts

                                                                                                                              func (*Topic) ParseOffset

                                                                                                                              func (t *Topic) ParseOffset(str string) (int64, error)

                                                                                                                                ParseOffset converts an offset string into a numeric precise offset 'beginning', 'first' or 'oldest' return the lowest available offset in the topic 'last' or 'latest' return the highest available offset in the topic 'end' or 'now' return the next offset to be written in the topic numeric string values are directly converted to integer duration notation like "1day" returns the first offset available since 1 day ago.

                                                                                                                                func (*Topic) Payload

                                                                                                                                func (t *Topic) Payload(offset int64) ([]byte, error)

                                                                                                                                  Payload is a utility method to fetch the payload of a single offset.

                                                                                                                                  func (*Topic) ReadFrom

                                                                                                                                  func (t *Topic) ReadFrom(r io.Reader) (n int64, err error)

                                                                                                                                    ReadFrom reads an entry or stream of entries from r until EOF is reached writes the entry/stream into the topic is the entry is valid. The return value n is the number of bytes read. It implements the io.ReaderFrom interface.

                                                                                                                                    func (*Topic) Scanner

                                                                                                                                    func (t *Topic) Scanner(ID string) (ts TopicScanner, err error)

                                                                                                                                      Scanner returns an existing scanner for the topic given and ID or ErrScannerNotFound if it doesn't exists.

                                                                                                                                      func (*Topic) Sync

                                                                                                                                      func (t *Topic) Sync() error

                                                                                                                                        Sync flushes all data to disk.

                                                                                                                                        func (*Topic) Write

                                                                                                                                        func (t *Topic) Write(p []byte) (n int, err error)

                                                                                                                                          Write implements the io.Writer interface for a Topic.

                                                                                                                                          func (*Topic) WriteN

                                                                                                                                          func (t *Topic) WriteN(p []byte, n int) (written int, err error)

                                                                                                                                            WriteN writes a set of N messages to the Topic

                                                                                                                                            type TopicAtomicMap

                                                                                                                                            type TopicAtomicMap struct {
                                                                                                                                            	// contains filtered or unexported fields
                                                                                                                                            }

                                                                                                                                              TopicAtomicMap is a copy-on-write thread-safe map of pointers to Topic

                                                                                                                                              func NewTopicAtomicMap

                                                                                                                                              func NewTopicAtomicMap() *TopicAtomicMap

                                                                                                                                                NewTopicAtomicMap returns a new initialized TopicAtomicMap

                                                                                                                                                func (*TopicAtomicMap) Delete

                                                                                                                                                func (am *TopicAtomicMap) Delete(key string)

                                                                                                                                                  Delete removes the pointer to Topic under key from the map

                                                                                                                                                  func (*TopicAtomicMap) Get

                                                                                                                                                  func (am *TopicAtomicMap) Get(key string) (value *Topic, ok bool)

                                                                                                                                                    Get returns a pointer to Topic for a given key

                                                                                                                                                    func (*TopicAtomicMap) GetAll

                                                                                                                                                    func (am *TopicAtomicMap) GetAll() map[string]*Topic

                                                                                                                                                      GetAll returns the underlying map of pointers to Topic this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

                                                                                                                                                      func (*TopicAtomicMap) Len

                                                                                                                                                      func (am *TopicAtomicMap) Len() int

                                                                                                                                                        Len returns the number of elements in the map

                                                                                                                                                        func (*TopicAtomicMap) Set

                                                                                                                                                        func (am *TopicAtomicMap) Set(key string, value *Topic)

                                                                                                                                                          Set inserts in the map a pointer to Topic under a given key

                                                                                                                                                          type TopicInfo

                                                                                                                                                          type TopicInfo struct {
                                                                                                                                                          	*biglog.Info
                                                                                                                                                          	Scanners map[string]TScannerInfo `json:"scanners"`
                                                                                                                                                          }

                                                                                                                                                            TopicInfo returns the topic information including information about size, segments, scanners and streamers

                                                                                                                                                            type TopicScanner

                                                                                                                                                            type TopicScanner interface {
                                                                                                                                                            	ID() string
                                                                                                                                                            	Scan(ctx context.Context) (m Message, offset int64, err error)
                                                                                                                                                            	Info() TScannerInfo
                                                                                                                                                            	Close() error
                                                                                                                                                            }

                                                                                                                                                              TopicScanner reads one by one over the messages in a topic blocking until new data is available for a period of time. TopicScanners are thread-safe.

                                                                                                                                                              func NewTopicScanner

                                                                                                                                                              func NewTopicScanner(t *Topic, ID string, from int64, persist bool) (TopicScanner, error)

                                                                                                                                                                NewTopicScanner returns a new topic scanner ready to scan starting at offset `from`, if persist is true, the scanner and its last position will survive across server restarts

                                                                                                                                                                type TopicScannerAtomicMap

                                                                                                                                                                type TopicScannerAtomicMap struct {
                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                }

                                                                                                                                                                  TopicScannerAtomicMap is a copy-on-write thread-safe map of TopicScanner

                                                                                                                                                                  func NewTopicScannerAtomicMap

                                                                                                                                                                  func NewTopicScannerAtomicMap() *TopicScannerAtomicMap

                                                                                                                                                                    NewTopicScannerAtomicMap returns a new initialized TopicScannerAtomicMap

                                                                                                                                                                    func (*TopicScannerAtomicMap) Delete

                                                                                                                                                                    func (am *TopicScannerAtomicMap) Delete(key string)

                                                                                                                                                                      Delete removes the TopicScanner under key from the map

                                                                                                                                                                      func (*TopicScannerAtomicMap) Get

                                                                                                                                                                      func (am *TopicScannerAtomicMap) Get(key string) (value TopicScanner, ok bool)

                                                                                                                                                                        Get returns a TopicScanner for a given key

                                                                                                                                                                        func (*TopicScannerAtomicMap) GetAll

                                                                                                                                                                        func (am *TopicScannerAtomicMap) GetAll() map[string]TopicScanner

                                                                                                                                                                          GetAll returns the underlying map of TopicScanner this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

                                                                                                                                                                          func (*TopicScannerAtomicMap) Len

                                                                                                                                                                          func (am *TopicScannerAtomicMap) Len() int

                                                                                                                                                                            Len returns the number of elements in the map

                                                                                                                                                                            func (*TopicScannerAtomicMap) Set

                                                                                                                                                                            func (am *TopicScannerAtomicMap) Set(key string, value TopicScanner)

                                                                                                                                                                              Set inserts in the map a TopicScanner under a given key

                                                                                                                                                                              type TopicSettings

                                                                                                                                                                              type TopicSettings struct {
                                                                                                                                                                              	// SegAge is the age at after which old segments are discarded.
                                                                                                                                                                              	SegAge bigduration.BigDuration `json:"segment_age,ommitempty"`
                                                                                                                                                                              	// SegSize is the size at which a new segment should be created.
                                                                                                                                                                              	SegSize int64 `json:"segment_size,ommitempty"`
                                                                                                                                                                              	// BatchNumMessages is the maximum number of messages to be batched.
                                                                                                                                                                              	BatchNumMessages int `json:"batch_num_messages,ommitempty"`
                                                                                                                                                                              	// BatchInterval is the interval at which batched messages are flushed to disk.
                                                                                                                                                                              	BatchInterval bigduration.BigDuration `json:"batch_interval,ommitempty"`
                                                                                                                                                                              	// CompressionType allows to specify how batches are compressed.
                                                                                                                                                                              	CompressionType CompressionType `json:"compression_type,ommitempty"`
                                                                                                                                                                              }

                                                                                                                                                                                TopicSettings holds the tunable settings of a topic.

                                                                                                                                                                                Directories

                                                                                                                                                                                Path Synopsis
                                                                                                                                                                                cmd