common_journal

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var File_journal_proto protoreflect.FileDescriptor

Functions

func NewZKEventCancelContext

func NewZKEventCancelContext(ctx context.Context, notify <-chan zk.Event) context.Context

Returns a Context which will be asynchronously cancelled by a channel emitting zk.Event. Such a context can be used to cancel blocking FetchEntry operations when a secondary node becomes a primary one.

Types

type Checkpoint

type Checkpoint struct {
	LastEntryOffset int64 `protobuf:"varint,1,opt,name=lastEntryOffset,proto3" json:"lastEntryOffset,omitempty"`
	NextEntryOffset int64 `protobuf:"varint,2,opt,name=nextEntryOffset,proto3" json:"nextEntryOffset,omitempty"`
	// contains filtered or unexported fields
}

func (*Checkpoint) Descriptor deprecated

func (*Checkpoint) Descriptor() ([]byte, []int)

Deprecated: Use Checkpoint.ProtoReflect.Descriptor instead.

func (*Checkpoint) GetLastEntryOffset

func (x *Checkpoint) GetLastEntryOffset() int64

func (*Checkpoint) GetNextEntryOffset

func (x *Checkpoint) GetNextEntryOffset() int64

func (*Checkpoint) ProtoMessage

func (*Checkpoint) ProtoMessage()

func (*Checkpoint) ProtoReflect

func (x *Checkpoint) ProtoReflect() protoreflect.Message

func (*Checkpoint) Reset

func (x *Checkpoint) Reset()

func (*Checkpoint) String

func (x *Checkpoint) String() string

type InvalidVersionError

type InvalidVersionError struct {
	// contains filtered or unexported fields
}

func NewInvalidVersionError

func NewInvalidVersionError(version int64) *InvalidVersionError

func (*InvalidVersionError) Error

func (i *InvalidVersionError) Error() string

type NoMoreMessageError

type NoMoreMessageError struct {
}

func (*NoMoreMessageError) Error

func (n *NoMoreMessageError) Error() string

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver wraps around a kafka.Reader, offering methods for a secondary node to receive and replicate journal entries.

func NewReceiver

func NewReceiver(server, topic string) (*Receiver, error)

Initialize a Receiver.

This function tries to create a single partition topic with given name, if it exists, do nothing, and if other errors happened during creation, return it.

@param

server: address to a Kafka server.
topic: Kafka topic name used to store messages.

@return

error: not nil if failed to ensure the topic is created.

func (*Receiver) FetchEntry

func (r *Receiver) FetchEntry(ctx context.Context) ([]byte, *Checkpoint, error)

Blocking until fetch an entry from Kafka successfully or an error raised. This method should be used when a secondary node pulls entries and applies them continuously as a secondary one.

When a message got fetched, this method checks whether the message is a Checkpoint or not. If so, it will Unmarshall this message and return an unmarshalled *Checkpoint. If not, it just returns raw byte content of the message. Applications should parse those bytes by themselves.

@param

ctx: Context used to cancel operation asynchronously.

@return

[]byte: raw bytes content of journal entry if success
*Checkpoint: points to an unmarshalled Checkpoint object if success and the entry fetched
is a Checkpoint entry.
error: not nil if failed to fetch message or unmarshall Checkpoint,

func (*Receiver) SetOffset

func (r *Receiver) SetOffset(offset int64) error

Set offset of wrapped kafka.Reader. A secondary node can call this method to set current offset to a checkpoint, skipping replicated and persisted jouranl entries.

func (*Receiver) TryFetchEntry

func (r *Receiver) TryFetchEntry(ctx context.Context) ([]byte, *Checkpoint, error)

Fetch a journal entry from kafka. If there are more messages to be consumed in kafka, it will block until fetch a message successfully or fail to do that. And it will return immediately with a *NoMoreMessageError if there is no more message in kafka currently by computing lag of internal kafka.Reader. This error can be regarded as an indication that a secondary node has performed all necessary preparations to turn into a primary node.

@param

ctx: Context used to cancel operation asynchronously.

@return

[]byte: raw bytes content of journal entry if success
*Checkpoint: points to an unmarshalled Checkpoint object if success and the entry fetched
is a Checkpoint entry.
error: not nil if failed to fetch message or unmarshall Checkpoint,

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer abstracts Kafka operations. The primary node can utilize this type to send journal entries it produced to Kafka, making corresponding secondary nodes to be able to replicate them and achieves master-backup fault tolerance.

Writer also provides a version-based Writer.Checkpoint API. It will send a special checkpoint journal entry to Kafka, which can be used to represent a complete checkpoint operation conducted by the primary node. After receiving such an entry, a secondary node should also perform checkpointing too.

Internally, Writer uses two different event keys to represent general journal entries which is opaque to Writer, and those special Checkpoint entries. Because Kafka only guarantees that write operations to the same partition of the same topic are ordered, Writer applies a FixedBalancer to its internal kafka.Writer, which makes the two keys are always routed to the same partition.

func NewWriter

func NewWriter(server, topic string) (*Writer, error)

Initialize a Writer.

This function tries to create a single partition topic with given name, if it exists, do nothing, and if other errors happened during creation, return it.

@param

server: address to a Kafka server.
topic: Kafka topic name used to store messages.

@return

error: not nil if failed to ensure the topic is created.

func (*Writer) Checkpoint

func (w *Writer) Checkpoint(ctx context.Context) (int64, error)

Commit a checkpoint entry to Kafka. This method should be invoked by a primary node who has already performs a checkpoint operation. And an offset is returned by this method which indicates the offset of the first entry after the new checkpoint entry, this offset can be used by the primary node to recover from journals after crash.

func (*Writer) CommitEntry

func (w *Writer) CommitEntry(ctx context.Context, entry []byte) error

Commit a general journal entry to Kafka. Writer employs synchronous mode to write messages, so this method will block until the new journal entry is written successfully or exceeds max attempt times.

@param

ctx: Context used to cancel operation asynchronously.
entry: data of journal entry.

@return

error: not nil if failed to write message.

func (*Writer) ExitCheckpoint

func (w *Writer) ExitCheckpoint()

Unlock the RWMutex, to allow further CommitEntry to be executed.

func (*Writer) PrepareCheckpoint

func (w *Writer) PrepareCheckpoint() int64

There may be many goroutines are using a Writer in a primary node, but when the latter wants to perform Checkpoint operation, it needs to ensure all pending CommitEntry have finished and there is no more incoming CommitEntry. This can be achieved by calling PrepareCheckpoint, this method guarantees that when it returns, all pending CommitEntry are done and no more incoming ones.

Internally, we use a writer-preferred RWMutex here, to make Checkpoint operation blocks all incoming CommitEntry, and all CommitEntry don't interfere with each other. PrepareCheckpoint will Lock this RWMutex, and CommitEntry will RLock it.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL