aeron

package
v0.0.0-...-f6902ab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2023 License: Apache-2.0 Imports: 26 Imported by: 36

Documentation

Index

Constants

View Source
const (
	// NotConnected indicates that this Publication is not connected to the driver
	NotConnected int64 = -1
	// BackPressured indicates that sending ring buffer is full
	BackPressured int64 = -2
	// AdminAction indicates that terms needs to be rotated. User should retry the Offer
	AdminAction int64 = -3
	// PublicationClosed indicates that this Publication is closed and no further Offers shall succeed
	PublicationClosed int64 = -4
	// MaxPositionExceeded indicates that ...
	MaxPositionExceeded int64 = -5
)
View Source
const (
	ChannelStatusNoIdAllocated = -1 // Channel status counter not allocated for IPC channels
	ChannelStatusErrored       = -1 // Channel has errored. Check logs for information
	ChannelStatusInitializing  = 0  // Channel is being initialized
	ChannelStatusActive        = 1  // Channel has finished initialization and is active
	ChannelStatusClosing       = 2  // Channel is being closed
)
View Source
const (
	ChannelStatusIdOffset          = 0
	LocalSocketAddressLengthOffset = ChannelStatusIdOffset + 4
	LocalSocketAddressStringOffset = LocalSocketAddressLengthOffset + 4
)

From LocalSocketAddressStatus.Java

View Source
const AeronPrefix = "aeron:"
View Source
const AeronScheme = "aeron"
View Source
const AliasParamName = "alias"
View Source
const ChannelRcvTimestampOffsetParamName = "channel-rcv-ts-offset"
View Source
const ChannelSndTimestampOffsetParamName = "channel-snd-ts-offset"
View Source
const CongestionControlParamName = "cc"
View Source
const (
	DefaultFragmentAssemblyBufferLength = int32(4096)
)
View Source
const EndpointParamName = "endpoint"
View Source
const EosParamName = "eos"
View Source
const FlowControlParamName = "fc"
View Source
const GroupParamName = "group"
View Source
const GroupTagParamName = "gtag"
View Source
const InitialTermIdParamName = "init-term-id"
View Source
const InterfaceParamName = "interface"
View Source
const IpcChannel = "aeron:ipc"
View Source
const IpcMedia = "ipc"
View Source
const LingerParamName = "linger"
View Source
const LocalSocketAddressStatusCounterTypeId = 14
View Source
const MdcControlModeDynamic = "dynamic"
View Source
const MdcControlModeManual = "manual"
View Source
const MdcControlModeParamName = "control-mode"
View Source
const MdcControlParamName = "control"
View Source
const MediaRcvTimestampOffsetParamName = "media-rcv-ts-offset"
View Source
const MtuLengthParamName = "mtu"
View Source
const NullValue = -1

NullValue is used to represent a null value for when some value is not yet set.

View Source
const ReceiverWindowLengthParamName = "rcv-wnd"
View Source
const RejoinParamName = "rejoin"
View Source
const ReliableStreamParamName = "reliable"
View Source
const SessionIdParamName = "session-id"
View Source
const SocketRcvbufParamName = "so-rcvbuf"
View Source
const SocketSndbufParamName = "so-sndbuf"
View Source
const SparseParamName = "sparse"
View Source
const SpiesSimulateConnectionParamName = "ssc"
View Source
const SpyPrefix = "aeron-spy:"
View Source
const SpyQualifier = "aeron-spy"
View Source
const TagPrefix = "tag:"
View Source
const TagsParamName = "tags"
View Source
const TermIdParamName = "term-id"
View Source
const TermLengthParamName = "term-length"
View Source
const TermOffsetParamName = "term-offset"
View Source
const TetherParamName = "tether"
View Source
const TtlParamName = "ttl"
View Source
const UdpMedia = "udp"

Variables

View Source
var DefaultAeronDir string = "/dev/shm"

DefaultAeronDir is the location of media driver files

View Source
var RegistrationStatus = struct {
	AwaitingMediaDriver   int
	RegisteredMediaDriver int
	ErroredMediaDriver    int
}{
	0,
	1,
	2,
}
View Source
var UserName = os.Getenv("USER")

UserName is used to determine default directory for driver files

Functions

func ChannelStatusString

func ChannelStatusString(channelStatus int) string

ChannelStatusString provides a convenience method for logging and error handling

func IsConnectedTo

func IsConnectedTo(sub *Subscription, pub *Publication) bool

IsConnectedTo is a helper function used primarily by tests, which is used within the same process to verify that subscription is connected to a specific publication.

func NewImage

func NewImage(sessionID int32, correlationID int64, logBuffers *logbuffer.LogBuffers) *image

NewImage wraps around provided LogBuffers setting up the structures for polling

Types

type Aeron

type Aeron struct {
	// contains filtered or unexported fields
}

Aeron is the primary interface to the media driver for managing subscriptions and publications

func Connect

func Connect(ctx *Context) (*Aeron, error)

Connect is the factory method used to create a new instance of Aeron based on Context settings

func (*Aeron) AddAvailableCounterHandler

func (aeron *Aeron) AddAvailableCounterHandler(handler AvailableCounterHandler) int64

AddAvailableCounterHandler adds a handler to the list to be called when a counter becomes available. Return the registrationID to use to remove the handler.

func (*Aeron) AddCounter

func (aeron *Aeron) AddCounter(
	typeId int32,
	keyBuffer *atomic.Buffer,
	keyOffset int32,
	keyLength int32,
	labelBuffer *atomic.Buffer,
	labelOffset int32,
	labelLength int32) (int64, error)

AddCounter allocates a counter on the media driver and returns its registrationId. The Counter should be freed by calling Counter.Close().

func (*Aeron) AddCounterByLabel

func (aeron *Aeron) AddCounterByLabel(typeId int32, label string) (int64, error)

AddCounterByLabel allocates a counter on the media driver and returns its registrationId. The Counter should be freed by calling Counter.Close().

func (*Aeron) AddExclusivePublication

func (aeron *Aeron) AddExclusivePublication(channel string, streamID int32) (*Publication, error)

AddExclusivePublication will add a new exclusive publication to the driver. If such publication already exists within ClientConductor the same instance will be returned.

func (*Aeron) AddExclusivePublicationDeprecated

func (aeron *Aeron) AddExclusivePublicationDeprecated(channel string, streamID int32) chan *Publication

AddExclusivePublicationDeprecated will add a new exclusive publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation

func (*Aeron) AddPublication

func (aeron *Aeron) AddPublication(channel string, streamID int32) (*Publication, error)

AddPublication will add a new publication to the driver. If such publication already exists within ClientConductor the same instance will be returned.

func (*Aeron) AddPublicationDeprecated

func (aeron *Aeron) AddPublicationDeprecated(channel string, streamID int32) chan *Publication

AddPublicationDeprecated will add a new publication to the driver. If such publication already exists within ClientConductor the same instance will be returned. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation

func (*Aeron) AddSubscription

func (aeron *Aeron) AddSubscription(channel string, streamID int32) (*Subscription, error)

AddSubscription will add a new subscription to the driver and wait until it is ready.

func (*Aeron) AddSubscriptionDeprecated

func (aeron *Aeron) AddSubscriptionDeprecated(channel string, streamID int32) chan *Subscription

AddSubscriptionDeprecated will add a new subscription to the driver. Returns a channel, which can be used for either blocking or non-blocking want for media driver confirmation

func (*Aeron) AddSubscriptionWithHandlers

func (aeron *Aeron) AddSubscriptionWithHandlers(channel string, streamID int32,
	onAvailableImage AvailableImageHandler, onUnavailableImage UnavailableImageHandler) (*Subscription, error)

AddSubscriptionWithHandlers will add a new subscription to the driver and wait until it is ready. It will use the specified Handlers for available/unavailable Images instead of the default handlers.

func (*Aeron) AddUnavailableCounterHandler

func (aeron *Aeron) AddUnavailableCounterHandler(handler UnavailableCounterHandler) int64

AddUnavailableCounterHandler adds a handler to the list to be called when Counters become unavailable. Return the registrationID to use to remove the handler.

func (*Aeron) AsyncAddExclusivePublication

func (aeron *Aeron) AsyncAddExclusivePublication(channel string, streamID int32) (int64, error)

AsyncAddExclusivePublication will add a new exclusive publication to the driver and return its registration ID. That ID can be used to get the added exclusive Publication with GetExclusivePublication().

func (*Aeron) AsyncAddPublication

func (aeron *Aeron) AsyncAddPublication(channel string, streamID int32) (int64, error)

AsyncAddPublication will add a new publication to the driver and return its registration ID. That ID can be used to get the added Publication with GetPublication().

func (*Aeron) AsyncAddSubscription

func (aeron *Aeron) AsyncAddSubscription(channel string, streamID int32) (int64, error)

AsyncAddSubscription will add a new subscription to the driver and return its registration ID. That ID can be used to get the Subscription with GetSubscription().

func (*Aeron) AsyncAddSubscriptionWithHandlers

func (aeron *Aeron) AsyncAddSubscriptionWithHandlers(channel string, streamID int32,
	onAvailableImage AvailableImageHandler, onUnavailableImage UnavailableImageHandler) (int64, error)

AsyncAddSubscriptionWithHandlers will add a new subscription to the driver and return its registration ID. That ID can be used to get the Subscription with GetSubscription(). This call will use the specified Handlers for available/unavailable Images instead of the default handlers.

func (*Aeron) ClientID

func (aeron *Aeron) ClientID() int64

ClientID returns the client identity that has been allocated for communicating with the media driver.

func (*Aeron) Close

func (aeron *Aeron) Close() error

Close will terminate client conductor and remove all publications and subscriptions from the media driver

func (*Aeron) CounterReader

func (aeron *Aeron) CounterReader() *counters.Reader

CounterReader returns Aeron's clientconductor's counterReader

func (*Aeron) FindCounter

func (aeron *Aeron) FindCounter(registrationID int64) (*Counter, error)

FindCounter retrieves the Counter associated with the given registrationID. This function is non-blocking. The value returned is dependent on what has occurred with respect to the media driver:

- If the registrationID is unknown, an error is returned. - If the media driver has not answered the command, (nil,nil) is returned. - If the media driver has successfully added the Counter then what is returned is the Counter. - If the media driver has returned an error, that error will be returned.

func (*Aeron) GetExclusivePublication

func (aeron *Aeron) GetExclusivePublication(registrationID int64) (*Publication, error)

GetExclusivePublication will attempt to get an exclusive Publication from a registrationID. See AsyncAddExclusivePublication. A pending Publication will return nil,nil signifying that there is neither a Publication nor an error. Also note that while aeron-go currently handles GetPublication and GetExclusivePublication the same way, they may diverge in the future. Other Aeron languages already handle these calls differently.

func (*Aeron) GetPublication

func (aeron *Aeron) GetPublication(registrationID int64) (*Publication, error)

GetPublication will attempt to get a Publication from a registrationID. See AsyncAddPublication. A pending Publication will return nil,nil signifying that there is neither a Publication nor an error.

func (*Aeron) GetSubscription

func (aeron *Aeron) GetSubscription(registrationID int64) (*Subscription, error)

GetSubscription will attempt to get a Subscription from a registrationID. See AsyncAddSubscription. A pending Subscription will return nil,nil signifying that there is neither a Subscription nor an error.

func (*Aeron) IsClosed

func (aeron *Aeron) IsClosed() bool

IsClosed returns true if this connection is closed.

func (*Aeron) NextCorrelationID

func (aeron *Aeron) NextCorrelationID() int64

NextCorrelationID generates the next correlation id that is unique for the connected Media Driver. This is useful generating correlation identifiers for pairing requests with responses in a clients own application protocol.

This method is thread safe and will work across processes that all use the same media driver.

func (*Aeron) RemoveAvailableCounterHandler

func (aeron *Aeron) RemoveAvailableCounterHandler(handler AvailableCounterHandler) bool

RemoveAvailableCounterHandler removes a previously added handler from the list to be called when Counters become available. Returns true iff the handler was found and removed.

func (*Aeron) RemoveAvailableCounterHandlerById

func (aeron *Aeron) RemoveAvailableCounterHandlerById(registrationId int64) bool

RemoveAvailableCounterHandlerById removes a previously added handler from the list to be called when Counters become available. Returns true iff the handler was found and removed.

func (*Aeron) RemoveUnavailableCounterHandler

func (aeron *Aeron) RemoveUnavailableCounterHandler(handler UnavailableCounterHandler) bool

RemoveUnavailableCounterHandler removes a previously added handler from the list to be called when Counters become unavailable. Returns true iff the handler was found and removed.

func (*Aeron) RemoveUnavailableCounterHandlerById

func (aeron *Aeron) RemoveUnavailableCounterHandlerById(registrationId int64) bool

RemoveUnavailableCounterHandlerById removes a previously added handler from the list to be called when Counters become unavailable. Returns true iff the handler was found and removed.

type AvailableCounterHandler

type AvailableCounterHandler interface {
	Handle(countersReader *counters.Reader, registrationId int64, counterId int32)
}

AvailableCounterHandler is the function called by Aeron to deliver notification of a Counter being available. Implementations should do the minimum work for passing off state to another thread for later processing and should not make a reentrant call back into the Aeron instance. Note that this is an interface instead of a function in order to support RemoveAvailableCounterHandler.

type AvailableImageHandler

type AvailableImageHandler func(Image)

AvailableImageHandler is the handler type for image available notification from the media driver

type ChannelUri

type ChannelUri struct {
	// contains filtered or unexported fields
}

ChannelUri is a parser for Aeron channel URIs. The format is: aeron-uri = "aeron:" media [ "?" param *( "|" param ) ] media = *( "[^?:]" ) param = key "=" value key = *( "[^=]" ) value = *( "[^|]" )

Multiple params with the same key are allowed, the last value specified takes precedence.

func ParseChannelUri

func ParseChannelUri(uriStr string) (uri ChannelUri, err error)

ParseChannelUri parses a string which contains an Aeron URI.

func (ChannelUri) Clone

func (uri ChannelUri) Clone() (res ChannelUri)

Clone returns a deep copy of a ChannelUri.

func (ChannelUri) Get

func (uri ChannelUri) Get(key string) string

func (ChannelUri) IsIpc

func (uri ChannelUri) IsIpc() bool

func (ChannelUri) IsUdp

func (uri ChannelUri) IsUdp() bool

func (ChannelUri) Media

func (uri ChannelUri) Media() string

func (ChannelUri) Prefix

func (uri ChannelUri) Prefix() string

func (ChannelUri) Remove

func (uri ChannelUri) Remove(key string)

func (ChannelUri) Scheme

func (uri ChannelUri) Scheme() string

func (ChannelUri) Set

func (uri ChannelUri) Set(key string, value string)

func (*ChannelUri) SetControlMode

func (uri *ChannelUri) SetControlMode(controlMode string)

func (*ChannelUri) SetMedia

func (uri *ChannelUri) SetMedia(media string)

func (*ChannelUri) SetPrefix

func (uri *ChannelUri) SetPrefix(prefix string)

func (*ChannelUri) SetSessionID

func (uri *ChannelUri) SetSessionID(sessionID int32)

func (ChannelUri) String

func (uri ChannelUri) String() (result string)

type ClientConductor

type ClientConductor struct {
	// contains filtered or unexported fields
}

func (*ClientConductor) AddAvailableCounterHandler

func (cc *ClientConductor) AddAvailableCounterHandler(handler AvailableCounterHandler) int64

func (*ClientConductor) AddCounter

func (cc *ClientConductor) AddCounter(typeId int32, keyBuffer *atomic.Buffer, keyOffset int32, keyLength int32,
	labelBuffer *atomic.Buffer, labelOffset int32, labelLength int32) (int64, error)

func (*ClientConductor) AddCounterByLabel

func (cc *ClientConductor) AddCounterByLabel(typeId int32, label string) (int64, error)

func (*ClientConductor) AddDestination

func (cc *ClientConductor) AddDestination(registrationID int64, endpointChannel string) error

AddDestination sends the add destination command through the driver proxy

func (*ClientConductor) AddExclusivePublication

func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int32) (int64, error)

AddExclusivePublication sends the add publication command through the driver proxy

func (*ClientConductor) AddPublication

func (cc *ClientConductor) AddPublication(channel string, streamID int32) (int64, error)

AddPublication sends the add publication command through the driver proxy

func (*ClientConductor) AddRcvDestination

func (cc *ClientConductor) AddRcvDestination(registrationID int64, endpointChannel string) error

AddRcvDestination sends the add rcv destination command through the driver proxy

func (*ClientConductor) AddSubscription

func (cc *ClientConductor) AddSubscription(channel string, streamID int32) (int64, error)

AddSubscription sends the add subscription command through the driver proxy

func (*ClientConductor) AddSubscriptionWithHandlers

func (cc *ClientConductor) AddSubscriptionWithHandlers(channel string, streamID int32,
	onAvailableImage AvailableImageHandler, onUnavailableImage UnavailableImageHandler) (int64, error)

AddSubscriptionWithHandlers sends the add subscription command through the driver proxy. It will use the specified Handlers for available/unavailable Images instead of the default handlers.

func (*ClientConductor) AddUnavailableCounterHandler

func (cc *ClientConductor) AddUnavailableCounterHandler(handler UnavailableCounterHandler) int64

func (*ClientConductor) Close

func (cc *ClientConductor) Close() (err error)

Close will terminate the Run() goroutine body and close all active publications and subscription. Run() can be restarted in a another goroutine.

func (*ClientConductor) CounterReader

func (cc *ClientConductor) CounterReader() *ctr.Reader

func (*ClientConductor) FindCounter

func (cc *ClientConductor) FindCounter(registrationID int64) (*Counter, error)

func (*ClientConductor) FindPublication

func (cc *ClientConductor) FindPublication(registrationID int64) (*Publication, error)

func (*ClientConductor) FindSubscription

func (cc *ClientConductor) FindSubscription(registrationID int64) (*Subscription, error)

FindSubscription by Registration ID, which is returned by AddSubscription. Returns the Subscription or an error. A pending Subscription will return nil,nil signifying that there is neither a Subscription nor an error.

func (*ClientConductor) Init

func (cc *ClientConductor) Init(driverProxy DriverProxy, bcast *broadcast.CopyReceiver,
	interServiceTo, driverTo, pubConnectionTo, lingerTo time.Duration, counters *ctr.MetaDataFlyweight) *ClientConductor

Init is the primary initialization method for ClientConductor

func (*ClientConductor) OnAvailableCounter

func (cc *ClientConductor) OnAvailableCounter(registrationId int64, counterId int32)

func (*ClientConductor) OnAvailableImage

func (cc *ClientConductor) OnAvailableImage(streamID int32, sessionID int32, logFilename string, sourceIdentity string,
	subscriberPositionID int32, subsRegID int64, corrID int64)

func (*ClientConductor) OnChannelEndpointError

func (cc *ClientConductor) OnChannelEndpointError(corrID int64, errorMessage string)

func (*ClientConductor) OnClientTimeout

func (cc *ClientConductor) OnClientTimeout(clientID int64)

func (*ClientConductor) OnErrorResponse

func (cc *ClientConductor) OnErrorResponse(corrID int64, errorCode int32, errorMessage string)

func (*ClientConductor) OnNewExclusivePublication

func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID int32, posLimitCounterID int32,
	channelStatusIndicatorID int32, logFileName string, regID int64, origRegID int64)

TODO Implement logic specific to exclusive publications

func (*ClientConductor) OnNewPublication

func (cc *ClientConductor) OnNewPublication(streamID int32, sessionID int32, posLimitCounterID int32,
	channelStatusIndicatorID int32, logFileName string, regID int64, origRegID int64)

func (*ClientConductor) OnOperationSuccess

func (cc *ClientConductor) OnOperationSuccess(corrID int64)

func (*ClientConductor) OnSubscriptionReady

func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatusIndicatorID int32)

func (*ClientConductor) OnUnavailableCounter

func (cc *ClientConductor) OnUnavailableCounter(registrationId int64, counterId int32)

func (*ClientConductor) OnUnavailableImage

func (cc *ClientConductor) OnUnavailableImage(corrID int64, subscriptionRegistrationID int64)

func (*ClientConductor) RemoveAvailableCounterHandler

func (cc *ClientConductor) RemoveAvailableCounterHandler(handler AvailableCounterHandler) bool

func (*ClientConductor) RemoveAvailableCounterHandlerById

func (cc *ClientConductor) RemoveAvailableCounterHandlerById(registrationId int64) bool

func (*ClientConductor) RemoveDestination

func (cc *ClientConductor) RemoveDestination(registrationID int64, endpointChannel string) error

RemoveDestination sends the remove destination command through the driver proxy

func (*ClientConductor) RemoveRcvDestination

func (cc *ClientConductor) RemoveRcvDestination(registrationID int64, endpointChannel string) error

RemoveRcvDestination sends the remove rcv destination command through the driver proxy

func (*ClientConductor) RemoveUnavailableCounterHandler

func (cc *ClientConductor) RemoveUnavailableCounterHandler(handler UnavailableCounterHandler) bool

func (*ClientConductor) RemoveUnavailableCounterHandlerById

func (cc *ClientConductor) RemoveUnavailableCounterHandlerById(registrationId int64) bool

func (*ClientConductor) Start

func (cc *ClientConductor) Start(idleStrategy idlestrategy.Idler)

Start begins the main execution loop of ClientConductor on a goroutine.

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context configuration options are located here https://github.com/real-logic/Aeron/wiki/Configuration-Options#aeron-client-options

func NewContext

func NewContext() *Context

NewContext creates and initializes new Context for Aeron

func (*Context) AeronDir

func (ctx *Context) AeronDir(dir string) *Context

AeronDir sets the root directory for media driver files

func (*Context) AvailableCounterHandler

func (ctx *Context) AvailableCounterHandler(handler AvailableCounterHandler)

AvailableCounterHandler sets up a callback for when a Counter is available. This will be added to the list before additional handlers are added with Aeron.AddAvailableCounterHandler.

func (*Context) AvailableImageHandler

func (ctx *Context) AvailableImageHandler(handler func(Image)) *Context

AvailableImageHandler sets an optional default callback for available image notifications

func (*Context) CncFileName

func (ctx *Context) CncFileName() string

CncFileName returns the name of the Counters file

func (*Context) ErrorHandler

func (ctx *Context) ErrorHandler(handler func(error)) *Context

ErrorHandler sets the error handler callback

func (*Context) GetAvailableCounterHandler

func (ctx *Context) GetAvailableCounterHandler() AvailableCounterHandler

GetAvailableCounterHandler gets the callback handler for when a counter is available.

func (*Context) GetUnavailableCounterHandler

func (ctx *Context) GetUnavailableCounterHandler() UnavailableCounterHandler

func (*Context) IdleStrategy

func (ctx *Context) IdleStrategy(idleStrategy idlestrategy.Idler) *Context

IdleStrategy provides an IdleStrategy for the thread responsible for communicating with the Aeron Media Driver.

func (*Context) InterServiceTimeout

func (ctx *Context) InterServiceTimeout(to time.Duration) *Context

InterServiceTimeout sets the timeout for client heartbeat

func (*Context) MediaDriverTimeout

func (ctx *Context) MediaDriverTimeout(to time.Duration) *Context

MediaDriverTimeout sets the timeout for keep alives to media driver

func (*Context) NewPublicationHandler

func (ctx *Context) NewPublicationHandler(handler func(string, int32, int32, int64)) *Context

newPublicationHandler sets an optional callback for new publications

func (*Context) NewSubscriptionHandler

func (ctx *Context) NewSubscriptionHandler(handler func(string, int32, int64)) *Context

newSubscriptionHandler sets an optional callback for new subscriptions

func (*Context) PublicationConnectionTimeout

func (ctx *Context) PublicationConnectionTimeout(to time.Duration) *Context

func (*Context) ResourceLingerTimeout

func (ctx *Context) ResourceLingerTimeout(to time.Duration) *Context

ResourceLingerTimeout sets the timeout for resource cleanup after they're released

func (*Context) UnavailableCounterHandler

func (ctx *Context) UnavailableCounterHandler(handler UnavailableCounterHandler)

UnavailableCounterHandler sets up a callback for when a Counter is unavailable. This will be added to the list first before additional handlers are added with Aeron.AddUnavailableCounterHandler.

func (*Context) UnavailableImageHandler

func (ctx *Context) UnavailableImageHandler(handler func(Image)) *Context

UnavailableImageHandler sets an optional default callback for unavailable image notification

type ControlledFragmentAssembler

type ControlledFragmentAssembler struct {
	// contains filtered or unexported fields
}

ControlledFragmentAssembler that sits in a chain-of-responsibility pattern that reassembles fragmented messages so that the next handler in the chain only sees whole messages.

Unfragmented messages are delegated without copy. Fragmented messages are copied to a temporary buffer for reassembly before delegation.

The Header passed to the delegate on assembling a message will be that of the last fragment.

Session based buffers will be allocated and grown as necessary based on the length of messages to be assembled.

func NewControlledFragmentAssembler

func NewControlledFragmentAssembler(delegate term.ControlledFragmentHandler, initialBufferLength int32) *ControlledFragmentAssembler

NewControlledFragmentAssembler constructs an adapter to reassemble message fragments and delegate on whole messages.

func (*ControlledFragmentAssembler) OnFragment

func (f *ControlledFragmentAssembler) OnFragment(
	buffer *atomic.Buffer,
	offset int32,
	length int32,
	header *logbuffer.Header) (action term.ControlledPollAction)

OnFragment reassembles and forwards whole messages to the delegate.

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

func NewCounter

func NewCounter(
	registrationId int64,
	clientConductor *ClientConductor,
	counterId int32) (*Counter, error)

func (*Counter) Close

func (c *Counter) Close() error

func (*Counter) Counter

func (c *Counter) Counter() *counters.AtomicCounter

func (*Counter) Id

func (c *Counter) Id() int32

func (*Counter) IsClosed

func (c *Counter) IsClosed() bool

func (*Counter) RegistrationId

func (c *Counter) RegistrationId() int64

type DriverProxy

type DriverProxy interface {
	ClientID() int64
	TimeOfLastDriverKeepalive() int64
	NextCorrelationID() int64
	AddSubscription(channel string, streamID int32) (int64, error)
	RemoveSubscription(registrationID int64) error
	AddPublication(channel string, streamID int32) (int64, error)
	AddExclusivePublication(channel string, streamID int32) (int64, error)
	RemovePublication(registrationID int64) error
	ClientClose() error
	AddDestination(registrationID int64, channel string) (int64, error)
	RemoveDestination(registrationID int64, channel string) (int64, error)
	AddRcvDestination(registrationID int64, channel string) (int64, error)
	RemoveRcvDestination(registrationID int64, channel string) (int64, error)
	AddCounter(typeId int32, keyBuffer *atomic.Buffer, keyOffset int32, keyLength int32,
		labelBuffer *atomic.Buffer, labelOffset int32, labelLength int32) (int64, error)
	AddCounterByLabel(typeId int32, label string) (int64, error)
	RemoveCounter(registrationId int64) (int64, error)
}

type FragmentAssembler

type FragmentAssembler struct {
	// contains filtered or unexported fields
}

FragmentAssembler that sits in a chain-of-responsibility pattern that reassembles fragmented messages so that the next handler in the chain only sees whole messages.

Unfragmented messages are delegated without copy. Fragmented messages are copied to a temporary buffer for reassembly before delegation.

The Header passed to the delegate on assembling a message will be that of the last fragment.

Session based buffers will be allocated and grown as necessary based on the length of messages to be assembled.

func NewFragmentAssembler

func NewFragmentAssembler(delegate term.FragmentHandler, initialBufferLength int32) *FragmentAssembler

NewFragmentAssembler constructs an adapter to reassemble message fragments and delegate on whole messages.

func (*FragmentAssembler) Clear

func (f *FragmentAssembler) Clear()

Clear removes all existing session buffers.

func (*FragmentAssembler) OnFragment

func (f *FragmentAssembler) OnFragment(
	buffer *atomic.Buffer,
	offset int32,
	length int32,
	header *logbuffer.Header)

OnFragment reassembles and forwards whole messages to the delegate.

type IdAndAvailableCounterHandler

type IdAndAvailableCounterHandler struct {
	// contains filtered or unexported fields
}

func NewIdAndAvailablePair

func NewIdAndAvailablePair(registrationId int64, handler AvailableCounterHandler) *IdAndAvailableCounterHandler

type IdAndUnavailableCounterHandler

type IdAndUnavailableCounterHandler struct {
	// contains filtered or unexported fields
}

func NewIdAndUnavailablePair

func NewIdAndUnavailablePair(registrationId int64, handler UnavailableCounterHandler) *IdAndUnavailableCounterHandler

type Image

type Image interface {
	IsClosed() bool
	Poll(handler term.FragmentHandler, fragmentLimit int) int
	BoundedPoll(handler term.FragmentHandler, limitPosition int64, fragmentLimit int) int
	ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int
	Position() int64
	IsEndOfStream() bool
	SessionID() int32
	CorrelationID() int64
	SubscriptionRegistrationID() int64
	TermBufferLength() int32
	ActiveTransportCount() int32
	Close() error
}

Image is a Java-style interface for the image struct. This is to allow dependency injection and testing of the many structs that use image, without deviating from the existing function signatures and code structure.

func DefaultImageFactory

func DefaultImageFactory(sessionID int32, corrID int64, logFilename string, subRegId int64, sourceIdentity string,
	counterValuesBuffer *atomic.Buffer, subscriberPositionID int32) Image

type ImageFactory

type ImageFactory func(sessionID int32, corrID int64, logFilename string, subRegId int64, sourceIdentity string,
	counterValuesBuffer *atomic.Buffer, subscriberPositionID int32) Image

ImageFactory allows tests to use fake Images

type ImageList

type ImageList struct {
	// contains filtered or unexported fields
}

ImageList is a helper class to manage list of images atomically without locks

func NewImageList

func NewImageList() *ImageList

NewImageList is a factory method for ImageList

func (*ImageList) Empty

func (l *ImageList) Empty() (oldList []Image)

Empty is a convenience method to reset the contents of the list

func (*ImageList) Get

func (l *ImageList) Get() []Image

Get returns a pointer to the underlying image array loaded atomically

func (*ImageList) Set

func (l *ImageList) Set(imgs []Image)

Set atomically sets the reference to the underlying array

type MockImage

type MockImage struct {
	mock.Mock
}

MockImage is an autogenerated mock type for the Image type

func NewMockImage

func NewMockImage(t mockConstructorTestingTNewMockImage) *MockImage

NewMockImage creates a new instance of MockImage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockImage) ActiveTransportCount

func (_m *MockImage) ActiveTransportCount() int32

ActiveTransportCount provides a mock function with given fields:

func (*MockImage) BoundedPoll

func (_m *MockImage) BoundedPoll(handler term.FragmentHandler, limitPosition int64, fragmentLimit int) int

BoundedPoll provides a mock function with given fields: handler, limitPosition, fragmentLimit

func (*MockImage) Close

func (_m *MockImage) Close() error

Close provides a mock function with given fields:

func (*MockImage) ControlledPoll

func (_m *MockImage) ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int

ControlledPoll provides a mock function with given fields: handler, fragmentLimit

func (*MockImage) CorrelationID

func (_m *MockImage) CorrelationID() int64

CorrelationID provides a mock function with given fields:

func (*MockImage) IsClosed

func (_m *MockImage) IsClosed() bool

IsClosed provides a mock function with given fields:

func (*MockImage) IsEndOfStream

func (_m *MockImage) IsEndOfStream() bool

IsEndOfStream provides a mock function with given fields:

func (*MockImage) Poll

func (_m *MockImage) Poll(handler term.FragmentHandler, fragmentLimit int) int

Poll provides a mock function with given fields: handler, fragmentLimit

func (*MockImage) Position

func (_m *MockImage) Position() int64

Position provides a mock function with given fields:

func (*MockImage) SessionID

func (_m *MockImage) SessionID() int32

SessionID provides a mock function with given fields:

func (*MockImage) SubscriptionRegistrationID

func (_m *MockImage) SubscriptionRegistrationID() int64

SubscriptionRegistrationID provides a mock function with given fields:

func (*MockImage) TermBufferLength

func (_m *MockImage) TermBufferLength() int32

TermBufferLength provides a mock function with given fields:

type NewPublicationHandler

type NewPublicationHandler func(string, int32, int32, int64)

NewPublicationHandler is the handler type for new publication notification from the media driver

type NewSubscriptionHandler

type NewSubscriptionHandler func(string, int32, int64)

NewSubscriptionHandler is the handler type for new subscription notification from the media driver

type Position

type Position struct {
	// contains filtered or unexported fields
}

Position is a wrapper for a buffer location of a position counter

func NewPosition

func NewPosition(buffer *atomic.Buffer, id int32) Position

NewPosition is a factory method to create new Position wrappers

type Publication

type Publication struct {
	// contains filtered or unexported fields
}

Publication is a sender structure

func NewPublication

func NewPublication(logBuffers *logbuffer.LogBuffers) *Publication

NewPublication is a factory method create new publications

func (*Publication) Channel

func (pub *Publication) Channel() string

Channel returns the media address for delivery to the channel.

func (*Publication) ChannelStatusID

func (pub *Publication) ChannelStatusID() int32

ChannelStatusID returns the counter used to represent the channel status for this publication.

func (*Publication) Close

func (pub *Publication) Close() error

Close will close this publication with the driver. This is a blocking call.

func (*Publication) InitialTermID

func (pub *Publication) InitialTermID() int32

InitialTermID returns the initial term id assigned when this publication was created. This can be used to determine how many terms have passed since creation.

func (*Publication) IsClosed

func (pub *Publication) IsClosed() bool

IsClosed returns whether this Publication has been closed

func (*Publication) IsConnected

func (pub *Publication) IsConnected() bool

IsConnected returns whether this publication is connected to the driver (not whether it has any Subscriptions)

func (*Publication) IsOriginal

func (pub *Publication) IsOriginal() bool

IsOriginal return true if this instance is the first added otherwise false.

func (*Publication) Offer

func (pub *Publication) Offer(buffer *atomic.Buffer, offset int32, length int32, reservedValueSupplier term.ReservedValueSupplier) int64

Offer is the primary send mechanism on Publication

func (*Publication) Offer2

func (pub *Publication) Offer2(
	bufferOne *atomic.Buffer, offsetOne int32, lengthOne int32,
	bufferTwo *atomic.Buffer, offsetTwo int32, lengthTwo int32,
	reservedValueSupplier term.ReservedValueSupplier,
) int64

Offer2 attempts to publish a message composed of two parts, e.g. a header and encapsulated payload.

func (*Publication) OriginalRegistrationID

func (pub *Publication) OriginalRegistrationID() int64

OriginalRegistrationID returns the original registration id.

func (*Publication) Position

func (pub *Publication) Position() int64

Position returns the current position to which the publication has advanced for this stream or PublicationClosed if closed.

func (*Publication) RegistrationID

func (pub *Publication) RegistrationID() int64

RegistrationID returns the registration id.

func (*Publication) SessionID

func (pub *Publication) SessionID() int32

SessionID returns the session id for this publication.

func (*Publication) StreamID

func (pub *Publication) StreamID() int32

StreamID returns Stream identity for scoping within the channel media address.

func (*Publication) TryClaim

func (pub *Publication) TryClaim(length int32, bufferClaim *logbuffer.Claim) int64

type ReceivingConductor

type ReceivingConductor interface {
	CounterReader() *ctr.Reader

	AddRcvDestination(registrationID int64, endpointChannel string) error
	RemoveRcvDestination(registrationID int64, endpointChannel string) error
	// contains filtered or unexported methods
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is the object responsible for receiving messages from media driver. It is specific to a channel and stream ID combination.

func NewSubscription

func NewSubscription(
	conductor ReceivingConductor,
	channel string,
	registrationID int64,
	streamID int32,
	channelStatusID int32,
	availableImagehandler AvailableImageHandler,
	unavailableImageHandler UnavailableImageHandler) *Subscription

NewSubscription is a factory method to create new subscription to be added to the media driver

func (*Subscription) AddDestination

func (sub *Subscription) AddDestination(endpointChannel string) error

AddDestination adds a destination manually to a multi-destination Subscription.

func (*Subscription) AvailableImageHandler

func (sub *Subscription) AvailableImageHandler() AvailableImageHandler

AvailableImageHandler returns a callback used to indicate when an Image becomes available under this Subscription. The handler may be nil.

func (*Subscription) Channel

func (sub *Subscription) Channel() string

Channel returns the media address for delivery to the channel.

func (*Subscription) ChannelStatus

func (sub *Subscription) ChannelStatus() int

ChannelStatus returns the status of the media channel for this Subscription. The status will be ChannelStatusErrored if a socket exception on setup or ChannelStatusActive if all is well.

func (*Subscription) ChannelStatusId

func (sub *Subscription) ChannelStatusId() int32

ChannelStatusId returns the counter ID used to represent the channel status of this Subscription.

func (*Subscription) Close

func (sub *Subscription) Close() error

Close will release all images in this subscription, send command to the driver and block waiting for response from the media driver. Images will be lingered by the ClientConductor.

func (*Subscription) ControlledPoll

func (sub *Subscription) ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int

ControlledPoll polls in a controlled manner the image s under the subscription for available message fragments. Control is applied to fragments in the stream. If more fragments can be read on another stream they will even if BREAK or ABORT is returned from the fragment handler.

Each fragment read will be a whole message if it is under MTU length. If larger than MTU then it will come as a series of fragments ordered within a session. Returns the number of fragments received.

func (*Subscription) HasImages

func (sub *Subscription) HasImages() bool

HasImages is a helper method checking whether this subscription has any images associated with it.

func (*Subscription) ImageBySessionID

func (sub *Subscription) ImageBySessionID(sessionID int32) Image

ImageBySessionID returns the associated with the given sessionId.

func (*Subscription) ImageCount

func (sub *Subscription) ImageCount() int

ImageCount count of images associated with this subscription.

func (*Subscription) IsClosed

func (sub *Subscription) IsClosed() bool

IsClosed returns whether this subscription has been closed.

func (*Subscription) IsConnected

func (sub *Subscription) IsConnected() bool

IsConnected returns if this subscription is connected by having at least one open publication image.

func (*Subscription) LocalSocketAddresses

func (sub *Subscription) LocalSocketAddresses() []string

LocalSocketAddresses fetches the local socket addresses for this subscription.

func (*Subscription) Poll

func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) int

Poll is the primary receive mechanism on subscription.

func (*Subscription) RegistrationID

func (sub *Subscription) RegistrationID() int64

RegistrationID returns the registration id.

func (*Subscription) RemoveDestination

func (sub *Subscription) RemoveDestination(endpointChannel string) error

RemoveDestination removes a destination manually from a multi-destination Subscription.

func (*Subscription) ResolvedEndpoint

func (sub *Subscription) ResolvedEndpoint() string

ResolvedEndpoint finds the resolved endpoint for the channel. This may be nil if MDS is used and no destination is yet added. The result is simply the first in the list of addresses found if multiple addresses exist

func (*Subscription) StreamID

func (sub *Subscription) StreamID() int32

StreamID returns Stream identity for scoping within the channel media address.

func (*Subscription) TryResolveChannelEndpointPort

func (sub *Subscription) TryResolveChannelEndpointPort() string

TryResolveChannelEndpointPort resolves the channel endpoint and replaces it with the port from the ephemeral range when 0 was provided. If there are no addresses, or if there is more than one, returned from LocalSocketAddresses() then the original channel is returned. If the channel is not ACTIVE, then empty string will be returned.

func (*Subscription) UnavailableImageHandler

func (sub *Subscription) UnavailableImageHandler() UnavailableImageHandler

UnavailableImageHandler returns a callback used to indicate when an Image goes unavailable under this Subscription. The handler may be nil.

type UnavailableCounterHandler

type UnavailableCounterHandler interface {
	Handle(countersReader *counters.Reader, registrationId int64, counterId int32)
}

UnavailableCounterHandler is for notification of Counters being removed via an Aeron client. Note that this is an interface instead of a function in order to support RemoveAvailableCounterHandler.

type UnavailableImageHandler

type UnavailableImageHandler func(Image)

UnavailableImageHandler is the handler type for image unavailable notification from the media driver

Directories

Path Synopsis
Provides a transition layer from "github.com/op/go-logging" to "go.uber.org/zap" to simply resolve some reentrancy issues in go-logging.
Provides a transition layer from "github.com/op/go-logging" to "go.uber.org/zap" to simply resolve some reentrancy issues in go-logging.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL