Documentation ¶
Overview ¶
Package rmux provides a connection-pooling, multiplexing redis server. Commands are parsed, and multiplexed out based on their arguments. Package rmux/main includes a working server implementation, if no customization is needed
Index ¶
- Variables
- type Client
- type RedisMultiplexer
- func (myRedisMultiplexer *RedisMultiplexer) AddConnection(remoteProtocol, remoteEndpoint string)
- func (myRedisMultiplexer *RedisMultiplexer) GetSubscription(channelName string) (mySubscription *Subscription)
- func (myRedisMultiplexer *RedisMultiplexer) HandleClientRequests(myClient *Client)
- func (myRedisMultiplexer *RedisMultiplexer) Start() (err error)
- type Subscription
- func (mySubscription *Subscription) AddClient(newClient *Client)
- func (mySubscription *Subscription) BroadcastMessage(broadcastMessage []byte)
- func (mySubscription *Subscription) BroadcastMessages() (err error)
- func (mySubscription *Subscription) RemoveClient(oldClient *Client)
- func (mySubscription *Subscription) UpdateConnection(myConnectionKey string, myConnectionPool *connection.ConnectionPool)
Constants ¶
This section is empty.
Variables ¶
var ( //Error for unsupported (deemed unsafe for multiplexing) commands ERR_COMMAND_UNSUPPORTED = []byte("-ERR This command is not supported") //Error for when we receive bad arguments (for multiplexing) accompanying a command ERR_BAD_ARGUMENTS = []byte("-ERR Wrong number of arguments supplied for this command") )
var ( //Response code for when a command (that operates on multiple keys) is used on a server that is multiplexing MULTIPLEX_OPERATION_UNSUPPORTED_RESPONSE = []byte("-ERR This command is not supported for multiplexing servers") //Response code for when a client can't connect to any target servers CONNECTION_DOWN_RESPONSE = []byte("-ERR Connection down") )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { //The underlying ReadWriter for this connection *bufio.ReadWriter //The connection wrapper for our net connection ConnectionReadWriter *protocol.TimedNetReadWriter //The Database that our client thinks we're connected to DatabaseId int //Whether or not this client connection is active or not //Upon QUIT command, this gets toggled off Active bool //The current active connection ActiveConnection *connection.Connection //The current active subscription Subscriptions map[string]bool // contains filtered or unexported fields }
Represents a redis client that is connected to our rmux server
func NewClient ¶
Initializes a new client, for the given established net connection, with the specified read/write timeouts
func (*Client) HandleRequest ¶
func (myClient *Client) HandleRequest(connectionPool *connection.ConnectionPool, firstLine []byte) (err error)
Handles sending a single client request out across connectionPool, and copying the response back into our local buffer
func (*Client) ParseCommand ¶
func (myClient *Client) ParseCommand(firstLine []byte, isMultiplexing bool) (responded bool, err error)
Parses the current command, starting with firstLine. isMultiplexing is supplied to let the client know if single-server-only commands should be supported or not
func (*Client) SendSubscriptionResponse ¶
Sends a subscription success response, to the current client
type RedisMultiplexer ¶
type RedisMultiplexer struct { HashRing *connection.HashRing //hashmap of [connection endpoint] -> connectionPools ConnectionCluster []*connection.ConnectionPool //hashmap of [subscription names] -> subscriptions SubscriptionCluster map[string]*Subscription //The net.listener for our server Listener net.Listener //The amount of connections to store, in each of our connectionpools PoolSize int //The primary connection key to use. If we're not operating on a key-based operation, it will go here PrimaryConnectionPool *connection.ConnectionPool //And overridable connect timeout. Defaults to EXTERN_CONNECT_TIMEOUT EndpointConnectTimeout time.Duration //An overridable read timeout. Defaults to EXTERN_READ_TIMEOUT EndpointReadTimeout time.Duration //An overridable write timeout. Defaults to EXTERN_WRITE_TIMEOUT EndpointWriteTimeout time.Duration //An overridable read timeout. Defaults to EXTERN_READ_TIMEOUT ClientReadTimeout time.Duration //An overridable write timeout. Defaults to EXTERN_WRITE_TIMEOUT ClientWriteTimeout time.Duration // contains filtered or unexported fields }
The main RedisMultiplexer Listens on a specified socket or port, and assigns out queries to any number of connection pools If more than one connection pool is given multi-key operations are blocked
func NewRedisMultiplexer ¶
func NewRedisMultiplexer(listenProtocol, listenEndpoint string, poolSize int) (newRedisMultiplexer *RedisMultiplexer, err error)
Initializes a new redis multiplexer, listening on the given protocol/endpoint, with a set connectionPool size ex: "unix", "/tmp/myAwesomeSocket", 50
func (*RedisMultiplexer) AddConnection ¶
func (myRedisMultiplexer *RedisMultiplexer) AddConnection(remoteProtocol, remoteEndpoint string)
Adds a connection to the redis multiplexer, for the given protocol and endpoint
func (*RedisMultiplexer) GetSubscription ¶
func (myRedisMultiplexer *RedisMultiplexer) GetSubscription(channelName string) (mySubscription *Subscription)
Gets an active subscription for our client to connect to
func (*RedisMultiplexer) HandleClientRequests ¶
func (myRedisMultiplexer *RedisMultiplexer) HandleClientRequests(myClient *Client)
Handles requests for a client. Inspects all incoming commands, to find if they are key-driven or not. If they are, finds the appropriate connection pool, and passes the request off to it.
func (*RedisMultiplexer) Start ¶
func (myRedisMultiplexer *RedisMultiplexer) Start() (err error)
Called when a rmux server is ready to begin accepting connections
type Subscription ¶
type Subscription struct { //The current active connection that we are listening on ActiveConnection *connection.Connection //The connectionKey that our activeConnection's connectionPool is using. ActiveConnectionKey string //When a subscription runs out of connected clients, it is requested to stop //This tells the subscription to shut down and stop listening for new messages RequestStop bool // contains filtered or unexported fields }
An individual subscription, that our server is managing
func NewSubscription ¶
func NewSubscription(capacity int, channelName string) (newSubscription *Subscription)
Initializes a new subscription, for the given default capacity, and channel name
func (*Subscription) AddClient ¶
func (mySubscription *Subscription) AddClient(newClient *Client)
Adds a client to our clients slice. If there is not room in the slice, the slice is double+1'd in size
func (*Subscription) BroadcastMessage ¶
func (mySubscription *Subscription) BroadcastMessage(broadcastMessage []byte)
Broadcasts an individual message to all connected clients
func (*Subscription) BroadcastMessages ¶
func (mySubscription *Subscription) BroadcastMessages() (err error)
Listens for mesasges from the ActiveConnection, that should be broadcasted out When a message is received, it is passed off to BroadcastMessage, where it gets sent out
func (*Subscription) RemoveClient ¶
func (mySubscription *Subscription) RemoveClient(oldClient *Client)
Removes a client from our clients slice, and compacts the remaining clients.
func (*Subscription) UpdateConnection ¶
func (mySubscription *Subscription) UpdateConnection(myConnectionKey string, myConnectionPool *connection.ConnectionPool)
Updates the subscription's connection, to use a new connectionKey and connectionPool If the connectionKey has not changed, nothing should happen
Directories ¶
Path | Synopsis |
---|---|
Package rmux/connection provides a way to open outbound connections to redis servers.
|
Package rmux/connection provides a way to open outbound connections to redis servers. |
Package rmux/protocol provides a standard way to listen in on the redis protocol, look ahead to what commands are about to be executed, and ignore them or pass them on to another buffer, as desired
|
Package rmux/protocol provides a standard way to listen in on the redis protocol, look ahead to what commands are about to be executed, and ignore them or pass them on to another buffer, as desired |