sej

package module
v0.0.0-...-97b8310 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2018 License: BSD-2-Clause Imports: 23 Imported by: 4

README

sej: Segmented Journals

h12.io/sej provides composable components of distributed, persisted message queue and allows trading off between reliablilty, latency and throughput with minimal devops overhead.

Package Organization

  • h12.io/sej: writer, scanner and offset
    • shard: sharding
    • hub: copying across machines
    • cmd/sej: command line tool

SEJ Directory

[root-dir]/
    [sej-dir]/
        jnl.lck
        jnl/
            0000000000000000.jnl
            000000001f9e521e.jnl
            ......
        ofs/
            reader1.ofs
            reader1.lck
            reader2.ofs
            reader2.lck
            ......

Journal File format

segment_file = { message }                          .
message      = offset timestamp type key value size .
offset       = uint64                               .
timestamp    = int64                                .
type         = uint8                                .
key          = key_size { uint8 }                   .
key_size     = int8                                 .
value        = value_size { uint8 }                 .
value_size   = int32                                .
size         = int32                                .

All integers are written in the big endian format.

name description
offset the position of the message in the queue
timestamp the timestamp represented in nanoseconds since Unix Epoch
type an int8 value that could be used to indicate the type of the message
key the encoded key
value the encoded value
size the size of the whole message including itself, allowing reading backward

Writer

  • Append from the last offset in segmented journal files
  • File lock to prevent other writers from opening the journal files
  • Startup corruption detection & truncation

Scanner

  • Read from an offset in segmented journal files
  • Change monitoring
    • directory
    • file append
  • Handle incomplete last message
  • Truncation detection & fail fast
  • Timeout

Offset

  • First/last offset
  • Offset persistence

Sharding

[root-dir]/
    [shard0]/
    [shard1]/
    ......

Each shard directory is a SEJ directory with a name in the form of [prefix].[shard-bit].[shard-index].

  • prefix must satisfy [a-zA-Z0-9_-]*
  • when prefix is empty, [prefix]. including the dot is omitted
  • shard-bit: 1, 2, ..., 9, a
  • shard-index: 000, 001, ..., 3ff

Hub

[root-dir]/
    [client-id0].[shard0]/
    [client-id1].[shard0]/
    ......

client-dir is the SEJ directory name belonging to a client.

Documentation

Overview

Package sej implements file-based segmented journal (queue)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCRC is returned when the CRC of an message value does not match the stored CRC
	ErrCRC = errors.New("CRC mismatch")
	// ErrTimeout is returned when no message can be obtained within Reader.Timeout
	ErrTimeout = errors.New("read timeout")
)
View Source
var (
	// ErrLocked is returned when another writer has already gotten the lock
	ErrLocked = errors.New("file is already locked")
)
View Source
var (
	// NotifyTimeout is the timeout value in rare cases that the OS notification fails
	// to capture the file/directory change events
	NotifyTimeout = time.Hour
)

Functions

func JournalDirPath

func JournalDirPath(dir string) string

func OffsetDirPath

func OffsetDirPath(dir string) string

func ReadOffset

func ReadOffset(r io.ReadSeeker) (offset uint64, err error)

ReadOffset reads the offset stored in an ofs file (r)

func WatchRootDir

func WatchRootDir(rootDir string, watchInterval time.Duration, open OpenFunc) error

WatchRootDir watches the directory and calls open only once for each shard

func WriteMessage

func WriteMessage(w io.Writer, buf []byte, m *Message) (int64, error)

WriteMessage writes the message buf should be at least 8 bytes and is used to avoid allocation

Types

type CorruptionError

type CorruptionError struct {
	File      string
	Offset    uint64
	Timestamp time.Time
	Message   []byte
	Err       error
	FixErr    error
}

CorruptionError is returned when the last message of a segmented journal file is corrupted

func (*CorruptionError) Error

func (e *CorruptionError) Error() string

type DefaultOffset

type DefaultOffset int
const (
	FirstOffset DefaultOffset = iota
	LastOffset
)

type JournalDir

type JournalDir struct {
	Files JournalFiles
	// contains filtered or unexported fields
}

func OpenJournalDir

func OpenJournalDir(dir string) (*JournalDir, error)

func (*JournalDir) First

func (d *JournalDir) First() *JournalFile

func (*JournalDir) Last

func (d *JournalDir) Last() *JournalFile

type JournalFile

type JournalFile struct {
	FirstOffset uint64
	FileName    string
}

JournalFile represents a unopened journal file

func ParseJournalFileName

func ParseJournalFileName(dir, name string) (*JournalFile, error)

ParseJournalFileName parses a journal file name and returns an JournalFile object

func (*JournalFile) FirstMessage

func (journalFile *JournalFile) FirstMessage() (*Message, error)

func (*JournalFile) LastMessage

func (journalFile *JournalFile) LastMessage() (*Message, error)

func (*JournalFile) LastOffset

func (journalFile *JournalFile) LastOffset() (uint64, error)

LatestOffset returns the offset after the last message in a journal file

func (*JournalFile) LastReadableOffset

func (journalFile *JournalFile) LastReadableOffset() (uint64, error)

type JournalFiles

type JournalFiles []JournalFile

func (JournalFiles) Len

func (a JournalFiles) Len() int

func (JournalFiles) Less

func (a JournalFiles) Less(i, j int) bool

func (JournalFiles) Swap

func (a JournalFiles) Swap(i, j int)

type Message

type Message struct {
	Offset    uint64
	Timestamp time.Time
	Type      byte
	Key       []byte
	Value     []byte
}

Message in a segmented journal file

func (*Message) Copy

func (m *Message) Copy() Message

func (*Message) IsNull

func (m *Message) IsNull() bool

func (*Message) ReadFrom

func (m *Message) ReadFrom(r io.Reader) (n int64, err error)

ReadFrom reads a message from a io.ReadSeeker. When an error occurs, it will rollback the seeker and then returns the original error.

type Offset

type Offset struct {
	Syncing bool
	// contains filtered or unexported fields
}

Offset is used to manage a disk-persisted offset

func NewOffset

func NewOffset(dir, name string, defaultOffset DefaultOffset) (*Offset, error)

NewOffset creates a new Offset object persisted to dir/ofs/name.ofs

func OpenReadonlyOffset

func OpenReadonlyOffset(dir, name string) (*Offset, error)

func (*Offset) Close

func (o *Offset) Close() error

Close closes opened resources

func (*Offset) Commit

func (o *Offset) Commit(offset uint64) error

Commit saves and syncs the offset to disk

func (*Offset) Value

func (o *Offset) Value() uint64

Value gets the current offset value

type OpenFunc

type OpenFunc func(journalDir string)

OpenFunc callback

type ScanOffsetError

type ScanOffsetError struct {
	File           string
	Offset         uint64
	Timestamp      time.Time
	ExpectedOffset uint64
}

func (*ScanOffsetError) Error

func (e *ScanOffsetError) Error() string

type ScanTruncatedError

type ScanTruncatedError struct {
	File       string
	Size       int64
	FileOffset int64
}

func (*ScanTruncatedError) Error

func (e *ScanTruncatedError) Error() string

type Scanner

type Scanner struct {
	Timeout time.Duration // read timeout when no data arrived, default 0
	// contains filtered or unexported fields
}

Scanner implements reading of messages from segmented journal files

func NewScanner

func NewScanner(dir string, offset uint64) (*Scanner, error)

NewScanner creates a scanner for reading dir/jnl starting from offset Default Timeout is 1 second

func (*Scanner) Close

func (r *Scanner) Close() error

Close closes the reader

func (*Scanner) Err

func (r *Scanner) Err() error

func (*Scanner) Message

func (r *Scanner) Message() *Message

func (*Scanner) Offset

func (r *Scanner) Offset() uint64

Offset returns the current offset of the reader, i.e. last_message.offset + 1

func (*Scanner) Scan

func (r *Scanner) Scan() bool

Scan scans the next message and increment the offset

type Test

type Test struct {
	testing.TB
}

Test is a collection of test utility methods

func (Test) Main

func (Test) Main(m *testing.M)

Main should be called to clear test directories

func (Test) NewDir

func (t Test) NewDir() string

NewDir creates a new test directory the path will be deleted automatically after the tests

func (Test) VerifyMessageValues

func (t Test) VerifyMessageValues(path string, messages ...string)

func (Test) VerifyMessages

func (t Test) VerifyMessages(path string, messages []Message)

type Writer

type Writer struct {
	SegmentSize int
	// contains filtered or unexported fields
}

Writer writes to segmented journal files

func NewWriter

func NewWriter(dir string) (*Writer, error)

NewWriter creates a new writer for writing to dir/jnl with file size at least segmentSize

func (*Writer) Append

func (w *Writer) Append(msg *Message) error

Append appends a message to the journal

func (*Writer) Close

func (w *Writer) Close() error

Close closes the writer, flushes the buffer and syncs the file to the hard drive

func (*Writer) Flush

func (w *Writer) Flush() error

Flush writes any buffered data from memory to the underlying file

func (*Writer) Offset

func (w *Writer) Offset() uint64

Offset returns the latest offset of the journal

func (*Writer) Sync

func (w *Writer) Sync() error

Sync calls File.Sync of the current file

Directories

Path Synopsis
Package hub is a generated protocol buffer package.
Package hub is a generated protocol buffer package.
internal
reader
Package bufio implements buffered I/O. It wraps an io.Reader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Package bufio implements buffered I/O. It wraps an io.Reader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL