server

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2021 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConnStart = iota
	ConnStartOK
	ConnSecure
	ConnSecureOK
	ConnTune
	ConnTuneOK
	ConnOpen
	ConnOpenOK
	ConnCloseOK
	ConnClosed
)

connection status list

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is an implementation of the AMQP-channel entity Within a single socket connection, there can be multiple independent threads of control, called "channels"

func NewChannel

func NewChannel(id uint16, conn *Connection) *Channel

NewChannel returns new instance of Channel

func (*Channel) AddUnackedMessage

func (channel *Channel) AddUnackedMessage(dTag uint64, cTag string, queue string, message *amqp.Message)

AddUnackedMessage add message to unacked queue

func (*Channel) GetConsumersCount

func (channel *Channel) GetConsumersCount() int

GetConsumersCount returns consumers count on channel

func (*Channel) GetQos

func (channel *Channel) GetQos() *qos.AmqpQos

func (*Channel) NextDeliveryTag

func (channel *Channel) NextDeliveryTag() uint64

NextDeliveryTag returns next delivery tag for current channel

func (*Channel) SendContent

func (channel *Channel) SendContent(method amqp.Method, message *amqp.Message)

SendContent send message to consumers or returns to publishers

func (*Channel) SendMethod

func (channel *Channel) SendMethod(method amqp.Method)

SendMethod send method to client Method will be packed into frame and send to outgoing channel

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents AMQP-connection

func NewConnection

func NewConnection(server *Server, netConn *net.TCPConn) (connection *Connection)

NewConnection returns new instance of amqp Connection

func (*Connection) GetChannels

func (conn *Connection) GetChannels() map[uint16]*Channel

func (*Connection) GetID

func (conn *Connection) GetID() uint64

func (*Connection) GetRemoteAddr

func (conn *Connection) GetRemoteAddr() net.Addr

func (*Connection) GetUsername

func (conn *Connection) GetUsername() string

func (*Connection) GetVirtualHost

func (conn *Connection) GetVirtualHost() *VirtualHost

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server implements AMQP server

func NewServer

func NewServer(host string, port string, protoVersion string, config *config.Config) (server *Server)

NewServer returns new instance of AMQP Server

func (*Server) GetConnections

func (srv *Server) GetConnections() map[uint64]*Connection

func (*Server) GetProtoVersion

func (srv *Server) GetProtoVersion() string

func (*Server) GetStatus

func (srv *Server) GetStatus() ServerState

func (*Server) GetVhost

func (srv *Server) GetVhost(name string) *VirtualHost

func (*Server) GetVhosts

func (srv *Server) GetVhosts() map[string]*VirtualHost

func (*Server) Start

func (srv *Server) Start()

Start start main server loop

func (*Server) Stop

func (srv *Server) Stop()

Stop stop server and all vhosts

type ServerState

type ServerState int
const (
	Stopped ServerState = iota
	Running
	Stopping
)

server state statuses

type UnackedMessage

type UnackedMessage struct {
	// contains filtered or unexported fields
}

UnackedMessage represents the unacknowledged message

type VirtualHost

type VirtualHost struct {
	// contains filtered or unexported fields
}

VirtualHost represents AMQP virtual host Each virtual host is "parent" for its queues and exchanges

func NewVhost

func NewVhost(name string, system bool, srv *Server) *VirtualHost

NewVhost returns instance of VirtualHost When instantiating virtual host we 1) init system exchanges 2) load durable exchanges, queues and bindings from server storage 3) load persisted messages from message store into all initiated queues 4) run confirm loop Only after that vhost is in state running msgStoragePersistent, msgStorageTransient

func (*VirtualHost) AppendExchange

func (vhost *VirtualHost) AppendExchange(ex *exchange.Exchange)

AppendExchange append new exchange and persist if it is durable

func (*VirtualHost) AppendQueue

func (vhost *VirtualHost) AppendQueue(qu *queue.Queue) error

AppendQueue append new queue and persist if it is durable and bindings into default exchange

func (*VirtualHost) DeleteQueue

func (vhost *VirtualHost) DeleteQueue(queueName string, ifUnused bool, ifEmpty bool) (uint64, error)

DeleteQueue delete queue from virtual host and all bindings to that queue Also queue will be removed from server storage

func (*VirtualHost) GetDefaultExchange

func (vhost *VirtualHost) GetDefaultExchange() *exchange.Exchange

GetDefaultExchange returns default exchange

func (*VirtualHost) GetExchange

func (vhost *VirtualHost) GetExchange(name string) *exchange.Exchange

GetExchange returns exchange by name or nil if not exists

func (*VirtualHost) GetExchanges

func (vhost *VirtualHost) GetExchanges() map[string]*exchange.Exchange

func (*VirtualHost) GetName

func (vhost *VirtualHost) GetName() string

func (*VirtualHost) GetQueue

func (vhost *VirtualHost) GetQueue(name string) *queue.Queue

GetQueue returns queue by name or nil if not exists

func (*VirtualHost) GetQueues

func (vhost *VirtualHost) GetQueues() map[string]*queue.Queue

GetQueues return all vhost's queues

func (*VirtualHost) NewQueue

func (vhost *VirtualHost) NewQueue(name string, connID uint64, exclusive bool, autoDelete bool, durable bool, shardSize int) *queue.Queue

NewQueue returns new instance of queue by params we can't use just queue.NewQueue, cause we need to set msgStorage to queue

func (*VirtualHost) PersistBinding

func (vhost *VirtualHost) PersistBinding(binding *binding.Binding)

PersistBinding store binding into server storage

func (*VirtualHost) RemoveBindings

func (vhost *VirtualHost) RemoveBindings(bindings []*binding.Binding)

RemoveBindings remove given bindings from server storage

func (*VirtualHost) Stop

func (vhost *VirtualHost) Stop() error

Stop properly stop virtual host TODO: properly stop confirm loop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL