journal

package
v0.0.0-...-e85c719 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2018 License: Apache-2.0 Imports: 13 Imported by: 2

Documentation

Overview

Package journal is an implementation and interface specification for an append-only journal with rotations. It contains a few simple implementations, as well.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrNotOpen = errors.New("journal is not open")

	// Logf is a function used to log warnings, etc. It can be overridden for
	// e.g., testing log output.
	Logf = func(fstr string, vals ...interface{}) {
		log.Printf(fstr, vals...)
	}
)
View Source
var (
	ErrCantClose = errors.New("cannot close this type of journal")
)

Functions

func TSFromName

func TSFromName(name string) (int64, error)

TSFromName gets a timestamp from the file name (it's a prefix).

Types

type Bytes

type Bytes struct {
	// contains filtered or unexported fields
}

func NewBytes

func NewBytes() *Bytes

func (Bytes) Append

func (j Bytes) Append(rec interface{}) error

func (Bytes) Bytes

func (j Bytes) Bytes() []byte

func (*Bytes) Close

func (j *Bytes) Close() error

func (*Bytes) IsOpen

func (j *Bytes) IsOpen() bool

func (*Bytes) JournalDecoder

func (j *Bytes) JournalDecoder() (Decoder, error)

func (*Bytes) SnapshotDecoder

func (j *Bytes) SnapshotDecoder() (Decoder, error)

func (*Bytes) StartSnapshot

func (j *Bytes) StartSnapshot(records <-chan interface{}, snapresp chan<- error) error

func (Bytes) String

func (j Bytes) String() string

type Count

type Count int64

func NewCount

func NewCount() *Count

func (*Count) Append

func (j *Count) Append(_ interface{}) error

func (Count) Close

func (j Count) Close() error

func (Count) IsOpen

func (j Count) IsOpen() bool

func (Count) JournalDecoder

func (j Count) JournalDecoder() (Decoder, error)

func (Count) ShardFinished

func (j Count) ShardFinished() bool

func (Count) SnapshotDecoder

func (j Count) SnapshotDecoder() (Decoder, error)

func (Count) StartSnapshot

func (j Count) StartSnapshot(records <-chan interface{}, snapresp chan<- error) error

func (Count) String

func (j Count) String() string

type Decoder

type Decoder interface {
	// Decode attempts to fill the elements of the underlying value of its
	// argument with the next item.
	Decode(interface{}) error
}

type DiskLog

type DiskLog struct {
	// contains filtered or unexported fields
}
Example
// Open up the log in directory "/tmp/disklog". Will create an error if it does not exist.
fs := NewMemFS("/tmp/disklog")
journal, err := OpenDiskLogInjectFS("/tmp/disklog", fs)
if err != nil {
	fmt.Println(err)
	return
}

// Data type can be anything. Here we're adding integers one at a time. We
// could also add the entire list at once, since it just gets gob-encoded.
data := []int{2, 3, 5, 7, 11, 13}
for _, d := range data {
	if err := journal.Append(d); err != nil {
		fmt.Printf("Failed to append %v: %v\n", d, err)
	}
}
// We didn't write enough to trigger a rotation, so everything should be in
// the current journal. Let's see if we get it back.
decoder, err := journal.JournalDecoder()
if err != nil {
	fmt.Printf("error getting decoder: %v\n", err)
	return
}
vals := make([]int, 0)
val := -1
for {
	err := decoder.Decode(&val)
	if err == io.EOF {
		break
	}
	if err != nil {
		fmt.Println("Error:", vals)
		fmt.Printf("failed to decode next item in journal: %v\n", err)
		return
	}
	vals = append(vals, val)
}
fmt.Println("Success", vals)
Output:


Success [2 3 5 7 11 13]

func OpenDiskLog

func OpenDiskLog(dir string) (*DiskLog, error)

func OpenDiskLogInjectFS

func OpenDiskLogInjectFS(dir string, fs FS) (*DiskLog, error)

func (*DiskLog) Append

func (d *DiskLog) Append(rec interface{}) error

Append adds a record to the end of the journal.

func (*DiskLog) Close

func (d *DiskLog) Close() error

Close gracefully shuts the journal down, finalizing the current journal log.

func (*DiskLog) Dir

func (d *DiskLog) Dir() string

Dir returns the file system directory for this journal.

func (*DiskLog) IsOpen

func (d *DiskLog) IsOpen() bool

func (*DiskLog) JournalDecoder

func (d *DiskLog) JournalDecoder() (Decoder, error)

JournalDecoder returns a Decoder whose Decode function can be called to get the next item from the journals that are newer than the most recent snapshot.

func (*DiskLog) JournalName

func (d *DiskLog) JournalName() string

Return the current journal name.

func (*DiskLog) Rotate

func (d *DiskLog) Rotate() error

Rotate closes the current log file and opens a new one.

func (*DiskLog) SnapshotDecoder

func (d *DiskLog) SnapshotDecoder() (Decoder, error)

SnapshotDecoder returns a decoder whose Decode function can be called to get the next item from the most recent frozen snapshot.

func (*DiskLog) StartSnapshot

func (d *DiskLog) StartSnapshot(elems <-chan interface{}, snapresp chan<- error) error

StartSnapshot triggers an immediate rotation, then consumes all of the elements on the channel and serializing them to a snapshot file with the same ID as the recently-closed log.

type EmptyDecoder

type EmptyDecoder struct{}

EmptyDecoder can be returned when there is nothing to decode, but it is safe to proceed.

func (EmptyDecoder) Decode

func (ed EmptyDecoder) Decode(interface{}) error

Decode with no elements - default behavior.

type FS

type FS interface {
	Create(name string) (File, error)
	Open(name string) (File, error)
	Rename(oldname, newname string) error
	Remove(name string) error
	RemoveAll(dirname string) error
	Mkdir(name string, perm os.FileMode) error
	MkdirAll(name string, perm os.FileMode) error
	Stat(name string) (os.FileInfo, error)
	FindMatching(glob string) ([]string, error)
}

type File

type File interface {
	io.ReadWriteCloser
	Name() string
	Sync() error
}

type Interface

type Interface interface {
	// Append appends a serialized version of the interface to the
	// current journal shard.
	Append(interface{}) error

	// StartSnapshot is given a data channel from which it is expected to
	// consume all values until closed. If it terminates early, it sends a
	// non-nil error back. When complete with no errors, the snapshot has been
	// successfully processed. Whether the current shard is full or not, this
	// function immediately trigger a shard rotation so that subsequent calls
	// to Append go to a new shard.
	StartSnapshot(records <-chan interface{}, resp chan<- error) error

	// SnapshotDecoder returns a decode function that can be called to decode
	// the next element in the most recent snapshot.
	SnapshotDecoder() (Decoder, error)

	// JournalDecoder returns a decode function that can be called to decode
	// the next element in the journal stream.
	JournalDecoder() (Decoder, error)

	// Close allows the journal to shut down (e.g., flush data) gracefully.
	Close() error

	// IsOpen indicates whether the journal is operational and has not been
	// closed.
	IsOpen() bool
}

type MemFS

type MemFS struct {
	// contains filtered or unexported fields
}

func NewMemFS

func NewMemFS(dirs ...string) *MemFS

func (*MemFS) Create

func (m *MemFS) Create(name string) (File, error)

func (*MemFS) FindMatching

func (m *MemFS) FindMatching(glob string) ([]string, error)

func (*MemFS) Mkdir

func (m *MemFS) Mkdir(name string, perm os.FileMode) error

func (*MemFS) MkdirAll

func (m *MemFS) MkdirAll(name string, perm os.FileMode) error

func (*MemFS) Open

func (m *MemFS) Open(name string) (File, error)

func (*MemFS) Remove

func (m *MemFS) Remove(name string) error

func (*MemFS) RemoveAll

func (m *MemFS) RemoveAll(dirname string) error

func (*MemFS) Rename

func (m *MemFS) Rename(oldname, newname string) error

func (*MemFS) Stat

func (m *MemFS) Stat(name string) (os.FileInfo, error)

func (*MemFS) String

func (m *MemFS) String() string

type OSFS

type OSFS struct{}

func (OSFS) Create

func (OSFS) Create(name string) (File, error)

func (OSFS) FindMatching

func (OSFS) FindMatching(glob string) ([]string, error)

func (OSFS) Getpid

func (OSFS) Getpid() int

func (OSFS) Mkdir

func (OSFS) Mkdir(name string, perm os.FileMode) error

func (OSFS) MkdirAll

func (OSFS) MkdirAll(name string, perm os.FileMode) error

func (OSFS) Open

func (OSFS) Open(name string) (File, error)

func (OSFS) Remove

func (OSFS) Remove(name string) error

func (OSFS) RemoveAll

func (OSFS) RemoveAll(dirname string) error

func (OSFS) Rename

func (OSFS) Rename(oldname, newname string) error

func (OSFS) Stat

func (OSFS) Stat(name string) (os.FileInfo, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL