publisher

package
v2.0.6+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2020 License: Apache-2.0 Imports: 14 Imported by: 8

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type NullEventSpool

type NullEventSpool struct {
}

NullEventSpool is a dummy registrar used by publisher when there is no registrar to send acknowledgements to for persistence. It simply discards all acknowledgement requests it is given. Currently used during stdin pipe processing where persistence becomes irrelevant.

func (*NullEventSpool) Add

func (s *NullEventSpool) Add(event registrar.EventProcessor)

Add does nothing - it's a dummy registrar

func (*NullEventSpool) Close

func (s *NullEventSpool) Close()

Close does nothing - it's a dummy registrar

func (*NullEventSpool) Send

func (s *NullEventSpool) Send()

Send does nothing - it's a dummy registrar

type Publisher

type Publisher struct {
	core.PipelineSegment
	core.PipelineConfigReceiver
	// contains filtered or unexported fields
}

Publisher handles payloads and is responsible for passing ordered acknowledgements to the Registrar It makes all the load balancing and distribution decisions, leaving transport state management to the EndpointSink We have always used a Push mechanism for load balancing, in the sense that the Publisher will push out events to transports and potentially pull them back if it deems there's a problem, rather than letting transports pull the events from the Publisher and then the transport making decisions on whether there is a problem. This pattern continues that tradition but with there now potentially being multiple transports rather than just one TODO: Extrapolate the load balance / failover logic to other interfaces?

I'm thinking not, as the difference is very little

func NewPublisher

func NewPublisher(pipeline *core.Pipeline, config *config.Config, registrar registrar.Registrator) *Publisher

NewPublisher creates a new publisher instance on the given pipeline

func (*Publisher) Connect

func (p *Publisher) Connect() chan<- []*core.EventDescriptor

Connect is used by Spooler TODO: Spooler doesn't need to know of publisher, only of events

func (*Publisher) OnAck

func (p *Publisher) OnAck(endpoint *endpoint.Endpoint, pendingPayload *payload.Payload, firstAck bool, lineCount int)

OnAck handles acknowledgements from endpoints It keeps track of how many out of sync acknowldgements have been made so shutdown can be postponed if we've received Acks for newer events before older events. It also serialises the Ack offsets for correct registrar storage to ensure the registrar offsets are always sequential

func (*Publisher) OnFail

func (p *Publisher) OnFail(endpoint *endpoint.Endpoint)

OnFail handles a failed endpoint

func (*Publisher) OnFinish

func (p *Publisher) OnFinish(endpoint *endpoint.Endpoint) bool

OnFinish handles when endpoints are finished Should return false if the endpoint is not to be reinitialised, such as when shutting down

func (*Publisher) OnPong

func (p *Publisher) OnPong(endpoint *endpoint.Endpoint)

OnPong handles when endpoints receive a pong message

func (*Publisher) OnStarted

func (p *Publisher) OnStarted(endpoint *endpoint.Endpoint)

OnStarted handles an endpoint that has moved from idle to now active

func (*Publisher) Run

func (p *Publisher) Run()

Run starts the publisher, it handles endpoint status changes send from the EndpointSink so it can make payload distribution decisions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL