store

package
v0.0.0-...-83e654d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2017 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrRequestDone = errors.New("Fetch request is done")

Functions

This section is empty.

Types

type FetchDirection

type FetchDirection int
const (
	DirectionOneMessage FetchDirection = 0
	DirectionForward    FetchDirection = 1
	DirectionBackwards  FetchDirection = -1

	// TODO Bogdan decide the channel size and if should be customizable
	FetchBufferSize = 10
)

type FetchRequest

type FetchRequest struct {
	sync.RWMutex

	// Partition is the Store name to search for messages
	Partition string

	// StartID is the message sequence id to start
	StartID uint64

	// EndID is the message sequence id to finish. If  will not be used.
	EndID uint64

	// Direction has 3 possible values:
	// Direction == 0: Only the Message with StartId
	// Direction == 1: Fetch also the next Count Messages with a higher MessageId
	// Direction == -1: Fetch also the next Count Messages with a lower MessageId
	Direction FetchDirection

	// Count is the maximum number of messages to return
	Count int

	// MessageC is the channel to send the message back to the receiver
	MessageC chan *FetchedMessage

	// ErrorC is a channel if an error occurs
	ErrorC chan error

	// StartC Through this channel , the total number or result
	// is returned, before sending the first message.
	// The Fetch() methods blocks on putting the number to the start channel.
	StartC chan int
	// contains filtered or unexported fields
}

FetchRequest is used for fetching messages in a MessageStore.

func NewFetchRequest

func NewFetchRequest(partition string, start, end uint64, direction FetchDirection, count int) *FetchRequest

NewFetchRequest creates a new FetchRequest pointer initialized with provided values if `count` is negative will be set to MaxInt32

func (*FetchRequest) Done

func (fr *FetchRequest) Done()

func (*FetchRequest) Error

func (fr *FetchRequest) Error(err error)

func (*FetchRequest) Errors

func (fr *FetchRequest) Errors() <-chan error

func (*FetchRequest) Init

func (fr *FetchRequest) Init()

func (*FetchRequest) IsDone

func (fr *FetchRequest) IsDone() bool

func (*FetchRequest) Messages

func (fr *FetchRequest) Messages() <-chan *FetchedMessage

func (*FetchRequest) Push

func (fr *FetchRequest) Push(id uint64, message []byte)

func (*FetchRequest) PushError

func (fr *FetchRequest) PushError(err error)

func (*FetchRequest) PushFetchMessage

func (fr *FetchRequest) PushFetchMessage(fm *FetchedMessage)

func (*FetchRequest) Ready

func (fr *FetchRequest) Ready() int

Ready returns the count of messages that will be returned meaning that the fetch is starting. It reads the number from the StartC channel.

type FetchedMessage

type FetchedMessage struct {
	ID      uint64
	Message []byte
}

FetchedMessage is a struct containing a pair: guble Message and its ID.

type MessagePartition

type MessagePartition interface {

	// Name returns the name of the partition
	Name() string

	// MaxMessageID return the last message ID stored in this partition
	MaxMessageID() uint64

	Count() uint64

	Store(uint64, []byte) error

	Fetch(req *FetchRequest)

	DoInTx(func(uint64) error) error
}

type MessageStore

type MessageStore interface {

	// Store a message within a partition.
	// The message id must be equal to MaxMessageId +1.
	// So the caller has to maintain the consistence between
	// fetching an id and storing the message.
	Store(partition string, messageID uint64, data []byte) error

	// Generates a new ID for the message if it's new and stores it
	// Returns the size of the new message or error
	// Takes the message and cluster node ID as parameters.
	StoreMessage(*protocol.Message, uint8) (int, error)

	// Fetch fetches a set of messages.
	// The results, as well as errors are communicated asynchronously using
	// the channels, supplied by the FetchRequest.
	Fetch(*FetchRequest)

	// MaxMessageId returns the highest message id for a particular partition
	MaxMessageID(partition string) (uint64, error)

	// DoInTx executes the supplied function within the locking context of the message partition.
	// This ensures, that wile the code is executed, no change to the supplied maxMessageId can occur.
	// The error result if the fnToExecute or an error while locking will be returned by DoInTx.
	DoInTx(partition string, fnToExecute func(uint64) error) error

	// GenerateNextMsgId generates a new message ID based on a timestamp in a strictly monotonically order
	GenerateNextMsgID(partition string, nodeID uint8) (uint64, int64, error)

	Partition(string) (MessagePartition, error)

	// Partitions returns a slice of `MessagePartition` available in the store
	Partitions() ([]MessagePartition, error)
}

MessageStore is an interface for a persistence backend storing topics.

Directories

Path Synopsis
Package filestore is a filesystem-based implementation of the MessageStore interface.
Package filestore is a filesystem-based implementation of the MessageStore interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL