Documentation
¶
Index ¶
- Constants
- Variables
- func ErrWake(err error) error
- func ParseBool(arg []byte) (bool, error)
- func Register(cmd Command)
- func ReplyEquals(reply CommandReply, reply2 CommandReply) bool
- func ReplyType(reply CommandReply) string
- type ApplyCommand
- type Array
- func (arr Array) Handle(ctx *Context) CommandReply
- func (arr Array) Help() string
- func (arr Array) IsError() bool
- func (arr Array) IsWorker() bool
- func (arr Array) Marshal(b []byte) []byte
- func (arr Array) MarshalReply(b []byte) []byte
- func (arr Array) Name() string
- func (arr Array) Parse(args [][]byte) Command
- func (arr Array) UnmarshalReply(packet []byte, args [][]byte) error
- type Bulk
- func (by Bulk) Handle(ctx *Context) CommandReply
- func (by Bulk) Help() string
- func (by Bulk) IsError() bool
- func (by Bulk) IsWorker() bool
- func (by Bulk) Marshal(b []byte) []byte
- func (by Bulk) MarshalReply(b []byte) []byte
- func (by Bulk) Name() string
- func (by Bulk) Parse(args [][]byte) Command
- func (by Bulk) UnmarshalReply(packet []byte, args [][]byte) error
- type BulkString
- func (s BulkString) Handle(ctx *Context) CommandReply
- func (s BulkString) Help() string
- func (s BulkString) IsError() bool
- func (s BulkString) IsWorker() bool
- func (s BulkString) Marshal(b []byte) []byte
- func (s BulkString) MarshalReply(b []byte) []byte
- func (s BulkString) Name() string
- func (s BulkString) Parse(args [][]byte) Command
- func (s BulkString) UnmarshalReply(packet []byte, args [][]byte) error
- type Command
- type CommandConn
- type CommandReply
- type CommandStats
- type ConnKind
- type Context
- type ContextInput
- type ContextPurpose
- type ContextReader
- type ContextStats
- type Database
- type Databases
- type Durability
- type Err
- func (e Err) Error() string
- func (e Err) Handle(ctx *Context) CommandReply
- func (e Err) Help() string
- func (e Err) IsError() bool
- func (e Err) IsWorker() bool
- func (e Err) Marshal(b []byte) []byte
- func (e Err) MarshalReply(b []byte) []byte
- func (e Err) Name() string
- func (e Err) Parse(args [][]byte) Command
- func (e Err) UnmarshalReply(packet []byte, args [][]byte) error
- type EvCloser
- type EvConn
- type EvData
- type EvDetacher
- type Float
- func (c Float) Handle(ctx *Context) CommandReply
- func (c Float) Help() string
- func (c Float) IsError() bool
- func (c Float) IsWorker() bool
- func (c Float) Marshal(b []byte) []byte
- func (c Float) MarshalReply(b []byte) []byte
- func (c Float) Name() string
- func (c Float) Parse(args [][]byte) Command
- func (c Float) UnmarshalReply(packet []byte, args [][]byte) error
- type ICluster
- type IDrives
- type Int
- func (c Int) Handle(ctx *Context) CommandReply
- func (c Int) Help() string
- func (c Int) IsError() bool
- func (c Int) IsWorker() bool
- func (c Int) Marshal(b []byte) []byte
- func (c Int) MarshalReply(b []byte) []byte
- func (c Int) Name() string
- func (c Int) Parse(args [][]byte) Command
- func (c Int) UnmarshalReply(packet []byte, args [][]byte) error
- type Multi
- type Nil
- func (n Nil) Handle(ctx *Context) CommandReply
- func (n Nil) Help() string
- func (n Nil) IsError() bool
- func (n Nil) IsWorker() bool
- func (n Nil) Marshal(b []byte) []byte
- func (n Nil) MarshalReply(b []byte) []byte
- func (n Nil) Name() string
- func (n Nil) Parse(args [][]byte) Command
- func (n Nil) UnmarshalReply(packet []byte, args [][]byte) error
- type Node
- type Ok
- func (e Ok) Handle(ctx *Context) CommandReply
- func (c Ok) Help() string
- func (c Ok) IsError() bool
- func (o Ok) IsMatch(command CommandReply) bool
- func (c Ok) IsWorker() bool
- func (e Ok) Marshal(b []byte) []byte
- func (e Ok) MarshalReply(b []byte) []byte
- func (c Ok) Name() string
- func (e Ok) Parse(args [][]byte) Command
- func (e Ok) UnmarshalReply(packet []byte, args [][]byte) error
- type Pong
- func (p Pong) Handle(ctx *Context) CommandReply
- func (p Pong) Help() string
- func (p Pong) IsError() bool
- func (p Pong) IsWorker() bool
- func (p Pong) Marshal(b []byte) []byte
- func (p Pong) MarshalReply(b []byte) []byte
- func (p Pong) Name() string
- func (p Pong) Parse(args [][]byte) Command
- func (p Pong) UnmarshalReply(packet []byte, args [][]byte) error
- type ProtocolError
- type Queued
- func (e Queued) Handle(ctx *Context) CommandReply
- func (c Queued) Help() string
- func (c Queued) IsError() bool
- func (c Queued) IsWorker() bool
- func (e Queued) Marshal(b []byte) []byte
- func (e Queued) MarshalReply(b []byte) []byte
- func (c Queued) Name() string
- func (e Queued) Parse(args [][]byte) Command
- func (e Queued) UnmarshalReply(packet []byte, args [][]byte) error
- type RaftFSM
- type RaftID
- type RaftService
- type ReplyReader
- type SimpleString
- func (s SimpleString) Handle(ctx *Context) CommandReply
- func (s SimpleString) Help() string
- func (s SimpleString) IsError() bool
- func (s SimpleString) IsWorker() bool
- func (s SimpleString) Marshal(b []byte) []byte
- func (s SimpleString) MarshalReply(b []byte) []byte
- func (s SimpleString) Name() string
- func (s SimpleString) Parse(args [][]byte) Command
- func (s SimpleString) UnmarshalReply(packet []byte, args [][]byte) error
- type Slice
- type Store
- type Topic
- type Transaction
- type WithConnection
- type Worker
- type WorkerJob
- type WorkerPool
Constants ¶
const ( GetName = "GET" SetName = "SET" DelName = "DEL" TxModeName = "TX" CreateDatabase = "CREATEDB" DeleteDatabase = "DELETEDB" RaftInstallSnapshotName = "I" RaftAppendName = "A" RaftVoteName = "V" RaftChunkName = "C" RaftDoneName = "D" RaftSnapshotName = "RSNAPNAME" RaftSnapshotsName = "RSNAPS" RaftSlice = "RSLICE" RaftBootstrap = "BOOTSTRAP" RaftJoinName = "JOIN" RaftDemote = "DEMOTE" RaftJoinSlaveName = "RJOINSLAVE" RaftRemoveName = "REMOVE" RaftStatsName = "RAFTSTATS" RaftStateName = "RAFTSTATE" RaftConfigName = "RCONFIG" RaftLeaderName = "LEADER" RaftShrinkName = "RSHRINK" )
const ( Idle int32 = iota Active = 1 Closing = 2 Closed = 3 )
Variables ¶
var ( RaftInstall = []byte(RaftInstallSnapshotName) RaftAppend = []byte(RaftAppendName) RaftVote = []byte(RaftVoteName) RaftChunk = []byte(RaftChunkName) RaftDone = []byte(RaftDoneName) RaftJoin = []byte(RaftJoinName) RaftStats = []byte(RaftStatsName) RaftState = []byte(RaftStateName) RaftLeader = []byte(RaftLeaderName) RaftShrink = []byte(RaftShrinkName) )
var ( OK = Ok{} PONG = Pong{} PING = BulkString("PING") QUEUED = Queued{} NIL = Nil{} )
var ApplyCommands map[string]ApplyCommand
var Commands map[string]Command
Global registry of commands
var ErrInvalidParam = errors.New("invalid param")
var GlobalRaftID = RaftID{
DatabaseID: -1,
SliceID: -1,
}
var (
Workers = NewWorkerPool(context.Background(), 0, 0)
)
Functions ¶
func ReplyEquals ¶
func ReplyEquals(reply CommandReply, reply2 CommandReply) bool
func ReplyType ¶
func ReplyType(reply CommandReply) string
Types ¶
type ApplyCommand ¶
type ApplyCommand interface {
Marshal(b []byte) []byte
Unmarshal(b []byte) error
Handle() CommandReply
}
type Array ¶
type Array []CommandReply
func (Array) Handle ¶
func (arr Array) Handle(ctx *Context) CommandReply
func (Array) MarshalReply ¶
type Bulk ¶
type Bulk []byte
func (Bulk) Handle ¶
func (by Bulk) Handle(ctx *Context) CommandReply
func (Bulk) MarshalReply ¶
type BulkString ¶
type BulkString string
func (BulkString) Handle ¶
func (s BulkString) Handle(ctx *Context) CommandReply
func (BulkString) Help ¶
func (s BulkString) Help() string
func (BulkString) IsError ¶
func (s BulkString) IsError() bool
func (BulkString) IsWorker ¶
func (s BulkString) IsWorker() bool
func (BulkString) Marshal ¶
func (s BulkString) Marshal(b []byte) []byte
func (BulkString) MarshalReply ¶
func (s BulkString) MarshalReply(b []byte) []byte
func (BulkString) Name ¶
func (s BulkString) Name() string
func (BulkString) Parse ¶
func (s BulkString) Parse(args [][]byte) Command
func (BulkString) UnmarshalReply ¶
func (s BulkString) UnmarshalReply(packet []byte, args [][]byte) error
type Command ¶
type Command interface {
Name() string
Help() string
IsError() bool
// Flag to determine whether the command and be handled inline or
// if it requires a worker.
IsWorker() bool
//
Marshal(b []byte) []byte
// Parses from a Redcon connection
Parse(args [][]byte) Command
// Invoke happens on the EvLoop
Handle(ctx *Context) CommandReply
}
func ParseCommand ¶
type CommandConn ¶
type CommandConn interface {
EvConn
EvCloser
EvData
EvDetacher
GetKind() ConnKind
SetKind(kind ConnKind)
//
GetDurability() Durability
//
GetRaft() RaftService
//
SetRaft(raft RaftService)
}
type CommandReply ¶
type CommandStats ¶
type CommandStats struct {
Name string
}
type Context ¶
type Context struct {
Reason error
Action evio.Action
Kind ConnKind
// Durability setting
Durability Durability
Multi Multi
// Assigned raft context
// This is used for the RaftTransport to support multiple Raft clusters
// over the same port
Raft RaftService // Raft service for Raft connections
Parse func(packet []byte, args [][]byte) Command
}
Represents a series of pipelined Commands in the context of a single call for a single connection or for Raft log applying.
func (*Context) GetDurability ¶
func (c *Context) GetDurability() Durability
func (*Context) GetRaft ¶
func (c *Context) GetRaft() RaftService
func (*Context) SetRaft ¶
func (c *Context) SetRaft(raft RaftService)
type ContextInput ¶
type ContextInput struct {
Context
// Buffers
In []byte // in/ingress or "read" buffer
Out []byte // out/egress or "write" buffer
// Backlog
Multi bool
MultiList []Command
Backlog []Command // commands that must wait until current worker finishes
// contains filtered or unexported fields
}
func (*ContextInput) Input ¶
func (c *ContextInput) Input(in []byte) (o []byte)
Parses a raw stream into a series of commands
func (*ContextInput) InputReply ¶
func (c *ContextInput) InputReply(in []byte) (o []CommandReply)
type ContextPurpose ¶
type ContextPurpose byte
const ( Parse ContextPurpose = 0 Apply ContextPurpose = 1 Log ContextPurpose = 2 )
type ContextReader ¶
type ContextReader struct {
}
type ContextStats ¶
type ContextStats struct {
}
type Database ¶
type Database interface {
Name()
Slices()
CreateTopic()
DeleteTopic()
CreateQueue()
DeleteQueue()
CreateTable()
DeleteTable()
}
type Durability ¶
type Durability int32
const ( Low Durability = -1 Medium Durability = 0 High Durability = 1 )
type Err ¶
type Err string
func (Err) Handle ¶
func (e Err) Handle(ctx *Context) CommandReply
func (Err) MarshalReply ¶
type EvDetacher ¶
type EvDetacher interface {
Detach() error
OnDetach(rwc io.ReadWriteCloser)
}
type Float ¶
type Float float64
func (Float) Handle ¶
func (c Float) Handle(ctx *Context) CommandReply
func (Float) MarshalReply ¶
type Int ¶
type Int int64
func (Int) Handle ¶
func (c Int) Handle(ctx *Context) CommandReply
func (Int) MarshalReply ¶
type Nil ¶
type Nil struct{}
func (Nil) Handle ¶
func (n Nil) Handle(ctx *Context) CommandReply
func (Nil) MarshalReply ¶
type Ok ¶
type Ok struct{}
func (Ok) Handle ¶
func (e Ok) Handle(ctx *Context) CommandReply
func (Ok) IsMatch ¶
func (o Ok) IsMatch(command CommandReply) bool
func (Ok) MarshalReply ¶
type Pong ¶
type Pong struct{}
func (Pong) Handle ¶
func (p Pong) Handle(ctx *Context) CommandReply
func (Pong) MarshalReply ¶
type ProtocolError ¶
type ProtocolError string
func (ProtocolError) Error ¶
func (pe ProtocolError) Error() string
type Queued ¶
type Queued struct{}
func (Queued) Handle ¶
func (e Queued) Handle(ctx *Context) CommandReply
func (Queued) MarshalReply ¶
type RaftService ¶
type RaftService interface {
IsLeader() bool
Leader() raft.ServerAddress
Stats() map[string]string
State() raft.RaftState
Append(payload []byte) CommandReply
Vote(payload []byte) CommandReply
Install(ctx *Context, arg []byte) Command
Bootstrap() error
Join(nodeID string, voter bool) error
Demote(nodeID string) error
Leave(nodeID string) error
Configuration() (raft.ConfigurationFuture, error)
}
func GetRaftService ¶
func GetRaftService(id RaftID) RaftService
type ReplyReader ¶
type ReplyReader struct {
// contains filtered or unexported fields
}
func NewReplyReader ¶
func NewReplyReader(b []byte) *ReplyReader
func (*ReplyReader) Next ¶
func (rr *ReplyReader) Next() (CommandReply, error)
func (*ReplyReader) Reset ¶
func (rr *ReplyReader) Reset(b []byte)
type SimpleString ¶
type SimpleString string
func (SimpleString) Handle ¶
func (s SimpleString) Handle(ctx *Context) CommandReply
func (SimpleString) Help ¶
func (s SimpleString) Help() string
func (SimpleString) IsError ¶
func (s SimpleString) IsError() bool
func (SimpleString) IsWorker ¶
func (s SimpleString) IsWorker() bool
func (SimpleString) Marshal ¶
func (s SimpleString) Marshal(b []byte) []byte
func (SimpleString) MarshalReply ¶
func (s SimpleString) MarshalReply(b []byte) []byte
func (SimpleString) Name ¶
func (s SimpleString) Name() string
func (SimpleString) Parse ¶
func (s SimpleString) Parse(args [][]byte) Command
func (SimpleString) UnmarshalReply ¶
func (s SimpleString) UnmarshalReply(packet []byte, args [][]byte) error
type Slice ¶
type Slice interface {
// Each slice has it's own Raft cluster
Raft() RaftService
Schema() Database
Handle(ctx *Context, command Command)
TopicAppend()
QueueAppend()
Set()
Get()
Iterate()
Incr()
Decr()
}
A slice is an instance of a Database that does not share state with any other slice within the schema or otherwise. It can be thought of as an independent database. State is consistently maintained through a Raft log. (CA of CAP) system.
Object definitions are inherited from it's Database. A Slice represents some range(s) of the total slots (16384) of the Database. Each record in a slice is assigned a Slot number usually based on the hash of it's key. A Slice's log is serialized but it's reads may happen in parallel.
To scale a Database add another Slice and let the Database re-balance.
type Store ¶
type Store interface {
Raft() RaftService
//
CreateDatabase()
DeleteDatabase()
Database() []Database
GetSchema(id int32) Database
}
Cluster wide registry of schemas and Raft services.
type Transaction ¶
type Transaction struct {
Requests []ApplyCommand
// contains filtered or unexported fields
}
Transaction is a single log entry in the Raft log.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Maintains a cached goroutine that can process one job at a time and is returned back to a sync.Pool
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
func NewWorkerPool ¶
func NewWorkerPool(ctx context.Context, min, max int) *WorkerPool
func (*WorkerPool) Dispatch ¶
func (w *WorkerPool) Dispatch(job WorkerJob) bool
func (*WorkerPool) Get ¶
func (w *WorkerPool) Get() *Worker
func (*WorkerPool) Name ¶
func (w *WorkerPool) Name() string
func (*WorkerPool) PrintStats ¶
func (w *WorkerPool) PrintStats()
func (*WorkerPool) Put ¶
func (w *WorkerPool) Put(worker *Worker)
func (*WorkerPool) Register ¶
func (w *WorkerPool) Register(registry metrics.Registry)
func (*WorkerPool) Stop ¶
func (w *WorkerPool) Stop()