frame

package
v0.0.0-...-941bdb3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2020 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package frame provides the ability to encode and decode to and from Pulsar's custom binary protocol. The protocol is a light wrapper around protobuf messages.

Index

Constants

View Source
const MaxFrameSize = 5 * 1024 * 1024 // 5mb

MaxFrameSize is defined by the Pulsar spec with a single sentence: "The maximum allowable size of a single frame is 5 MB."

https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Framing-5l6bym

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncResp

type AsyncResp struct {
	Resp chan<- Frame
	Done <-chan struct{}
}

AsyncResp manages the state between a request and Response. Requestors wait on the `Resp` channel for the corResponding Response frame to their request. If they are no longer interested in the Response (timeout), then the `done` channel is closed, signaling to the Response side that the Response is not expected/needed.

type CmdSender

type CmdSender interface {
	SendSimpleCmd(cmd api.BaseCommand) error
	SendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMetadata, payload []byte) error
	Closed() <-chan struct{} // closed unblocks when the connection has been closed
}

CmdSender is an interface that is capable of sending commands to Pulsar. It allows abstraction of a core.

type Dispatcher

type Dispatcher struct {
	// Connected and Pong Responses have no requestID,
	// therefore a single channel is used as their
	// Respective FrameDispatcher. If the channel is
	// nil, there's no outstanding request.
	GlobalMu sync.Mutex // protects following
	Global   *AsyncResp

	// All Responses that are correlated by their
	// requestID
	ReqIDMu sync.Mutex // protects following
	ReqIDs  map[uint64]AsyncResp

	// All Responses that are correlated by their
	// (producerID, sequenceID) tuple
	ProdSeqIDsMu sync.Mutex // protects following
	ProdSeqIDs   map[ProdSeqKey]AsyncResp
}

Dispatcher is Responsible for handling the request/Response state of outstanding requests. It allows for users of this type to present a synchronous interface to an asynchronous process.

func NewFrameDispatcher

func NewFrameDispatcher() *Dispatcher

NewFrameDispatcher returns an instantiated FrameDispatcher.

func (*Dispatcher) NotifyGlobal

func (f *Dispatcher) NotifyGlobal(frame Frame) error

NotifyGlobal should be called with Response frames that have no identifying id (Pong, Connected).

func (*Dispatcher) NotifyProdSeqIDs

func (f *Dispatcher) NotifyProdSeqIDs(producerID, sequenceID uint64, frame Frame) error

NotifyProdSeqIDs should be called with Response frames that have (producerID, sequenceID) id tuples to correlate them to their requests.

func (*Dispatcher) NotifyReqID

func (f *Dispatcher) NotifyReqID(requestID uint64, frame Frame) error

NotifyReqID should be called with Response frames that have a requestID to correlate them to their requests.

func (*Dispatcher) RegisterGlobal

func (f *Dispatcher) RegisterGlobal() (Response <-chan Frame, cancel func(), err error)

RegisterGlobal is used to wait for Responses that have no identifying id (Pong, Connected Responses). Only one outstanding global request is allowed at a time. Callers should always call cancel, specifically when they're not interested in the Response.

func (*Dispatcher) RegisterProdSeqIDs

func (f *Dispatcher) RegisterProdSeqIDs(producerID, sequenceID uint64) (Response <-chan Frame, cancel func(), err error)

RegisterProdSeqID is used to wait for Responses that have (producerID, sequenceID) id tuples to correlate them to their request. Callers should always call cancel, specifically when they're not interested in the Response. It is an error to have multiple outstanding requests with the same id tuple.

func (*Dispatcher) RegisterReqID

func (f *Dispatcher) RegisterReqID(requestID uint64) (Response <-chan Frame, cancel func(), err error)

RegisterReqID is used to wait for Responses that have a requestID id to correlate them to their request. Callers should always call cancel, specifically when they're not interested in the Response. It is an error to have multiple outstanding requests with the id.

type Frame

type Frame struct {
	// BaseCmd is a required field
	BaseCmd *api.BaseCommand

	// The following fields are optional.
	// If present, the frame is a "Payload"
	// command, as opposed to a "Simple" command
	// if there's only the BaseCmd.
	Metadata *api.MessageMetadata
	Payload  []byte
}

Frame represents a pulsar message frame. It can be used to encode and decode messages to and from the Pulsar binary wire format.

The binary protocol is outlined here: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/ But the Java source should be considered the canonical format.

All sizes are passed as 4-byte unsigned big endian integers.

"Simple" command frame format:

+------------------------------------------------------------------------+
| totalSize (uint32) | commandSize (uint32) | message (protobuf encoded) |
|       4 bytes      |       4 bytes        |         var length         |
|====================|======================|============================|
| size of everything | size of the message  |                            |
| following these 4  |                      |                            |
| bytes              |                      |                            |
+------------------------------------------------------------------------+

"Payload" command frame format (It has the same 3 fields as a "simple" command, plus the following):

+-------------------------------------------------------------------------------------------------------------------------------------------------+
| "Simple" fields | magicNumber (0x0e01) | checksum (CRC32-C) | metadataSize (uint32) | metadata (protobuf encoded) |       payload (bytes)       |
|   var length    |        2 bytes       |       4 bytes      |       4 bytes         |          var length         |   totalSize - (SUM others)  |
|=================|======================|====================|=======================|=============================|=============================|
|                 | OPTIONAL If present, | OPTIONAL Checksum  | size of the metadata  |                             | Any sequence of bytes,      |
|                 | indicates following  | of the following   |                       |                             | possibly compressed and     |
|                 | 4 bytes are checksum | bytes              |                       |                             | or encrypted (see metadata) |
+-------------------------------------------------------------------------------------------------------------------------------------------------+

func (*Frame) Decode

func (f *Frame) Decode(r io.Reader) error

Decode the pulsar binary protocol from r into the receiver frame. Returns any errors encountered.

func (*Frame) Encode

func (f *Frame) Encode(w io.Writer) error

Encode writes the pulsar binary protocol encoded frame into w.

func (*Frame) Equal

func (f *Frame) Equal(other Frame) bool

Equal returns true if the other Frame is equal to the receiver frame, false otherwise.

type MockSender

type MockSender struct {
	Mu      sync.Mutex // protects following
	Frames  []Frame
	Closedc chan struct{}
}

MockSender implements the sender interface

func (*MockSender) Closed

func (m *MockSender) Closed() <-chan struct{}

func (*MockSender) GetFrames

func (m *MockSender) GetFrames() []Frame

func (*MockSender) SendPayloadCmd

func (m *MockSender) SendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMetadata, payload []byte) error

func (*MockSender) SendSimpleCmd

func (m *MockSender) SendSimpleCmd(cmd api.BaseCommand) error

type ProdSeqKey

type ProdSeqKey struct {
	ProducerID uint64
	SequenceID uint64
}

prodSeqKey is a composite lookup key for the dispatchers that use producerID and sequenceID to correlate Responses, which are the SendReceipt and SendError Responses.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL