cluster

package
v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2014 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PER_SERVER_BUFFER_SIZE  = 10
	LOCAL_WRITE_BUFFER_SIZE = 10
)
View Source
const (
	FIRST_LOWER_CASE_CHARACTER = uint8('a')
)
View Source
const (
	HEARTBEAT_TIMEOUT = 100 * time.Millisecond
)

Variables

View Source
var HEARTBEAT_TYPE = protocol.Request_HEARTBEAT

Functions

func HashPassword

func HashPassword(password string) ([]byte, error)

func SortShardsByTimeAscending

func SortShardsByTimeAscending(shards []*ShardData)

func SortShardsByTimeDescending

func SortShardsByTimeDescending(shards []*ShardData)

Types

type ByShardTimeAsc

type ByShardTimeAsc struct{ ShardCollection }

func (ByShardTimeAsc) Less

func (s ByShardTimeAsc) Less(i, j int) bool

type ByShardTimeDesc

type ByShardTimeDesc struct{ ShardCollection }

func (ByShardTimeDesc) Less

func (s ByShardTimeDesc) Less(i, j int) bool

type ClusterAdmin

type ClusterAdmin struct {
	CommonUser `json:"common"`
}

func (*ClusterAdmin) HasReadAccess

func (self *ClusterAdmin) HasReadAccess(_ string) bool

func (*ClusterAdmin) HasWriteAccess

func (self *ClusterAdmin) HasWriteAccess(_ string) bool

func (*ClusterAdmin) IsClusterAdmin

func (self *ClusterAdmin) IsClusterAdmin() bool

type ClusterConfiguration

type ClusterConfiguration struct {
	DatabaseReplicationFactors map[string]uint8

	ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery

	LocalServer *ClusterServer

	LocalRaftName string
	// contains filtered or unexported fields
}

This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running.

func NewClusterConfiguration

func NewClusterConfiguration(
	config *configuration.Configuration,
	wal WAL,
	shardStore LocalShardStore,
	connectionCreator func(string) ServerConnection) *ClusterConfiguration

func (*ClusterConfiguration) AddPotentialServer

func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)

func (*ClusterConfiguration) AddShards

func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardData, error)

Add shards expects all shards to be of the same type (long term or short term) and have the same start and end times. This is called to add the shard set for a given duration. If existing shards have the same times, those are returned.

func (*ClusterConfiguration) AuthenticateClusterAdmin

func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error)

func (*ClusterConfiguration) AuthenticateDbUser

func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error)

func (*ClusterConfiguration) ChangeDbUserPassword

func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error

func (*ClusterConfiguration) ChangeDbUserPermissions added in v0.5.11

func (self *ClusterConfiguration) ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error

func (*ClusterConfiguration) ChangeProtobufConnectionString added in v0.5.12

func (self *ClusterConfiguration) ChangeProtobufConnectionString(server *ClusterServer)

func (*ClusterConfiguration) CreateCheckpoint added in v0.5.2

func (self *ClusterConfiguration) CreateCheckpoint() error

func (*ClusterConfiguration) CreateContinuousQuery

func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error

func (*ClusterConfiguration) CreateDatabase

func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor uint8) error

func (*ClusterConfiguration) CreateFutureShardsAutomaticallyBeforeTimeComes

func (self *ClusterConfiguration) CreateFutureShardsAutomaticallyBeforeTimeComes()

called by the server, this will wake up every 10 mintues to see if it should create a shard for the next window of time. This way shards get created before a bunch of writes stream in and try to create it all at the same time.

func (*ClusterConfiguration) DatabaseExists added in v0.6.2

func (self *ClusterConfiguration) DatabaseExists(name string) bool

func (*ClusterConfiguration) DeleteContinuousQuery

func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error

func (*ClusterConfiguration) DropDatabase

func (self *ClusterConfiguration) DropDatabase(name string) error

func (*ClusterConfiguration) DropShard

func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) error

func (*ClusterConfiguration) GetAllShards

func (self *ClusterConfiguration) GetAllShards() []*ShardData

func (*ClusterConfiguration) GetClusterAdmin

func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin

func (*ClusterConfiguration) GetClusterAdmins

func (self *ClusterConfiguration) GetClusterAdmins() (names []string)

func (*ClusterConfiguration) GetContinuousQueries

func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery

func (*ClusterConfiguration) GetDatabases

func (self *ClusterConfiguration) GetDatabases() []*Database

func (*ClusterConfiguration) GetDbUser

func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser

func (*ClusterConfiguration) GetDbUsers

func (self *ClusterConfiguration) GetDbUsers(db string) []common.User

func (*ClusterConfiguration) GetLocalConfiguration added in v0.5.12

func (self *ClusterConfiguration) GetLocalConfiguration() *configuration.Configuration

func (*ClusterConfiguration) GetLocalShardById

func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData

This function is for the request handler to get the shard to write a request to locally.

func (*ClusterConfiguration) GetLongTermShards

func (self *ClusterConfiguration) GetLongTermShards() []*ShardData

func (*ClusterConfiguration) GetMapForJsonSerialization

func (self *ClusterConfiguration) GetMapForJsonSerialization() map[string]interface{}

func (*ClusterConfiguration) GetServerById

func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer

func (*ClusterConfiguration) GetServerByProtobufConnectionString

func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer

func (*ClusterConfiguration) GetServerByRaftName

func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer

func (*ClusterConfiguration) GetShardToWriteToBySeriesAndTime

func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error)

func (*ClusterConfiguration) GetShards

func (self *ClusterConfiguration) GetShards(querySpec *parser.QuerySpec) []*ShardData

func (*ClusterConfiguration) GetShortTermShards

func (self *ClusterConfiguration) GetShortTermShards() []*ShardData

func (*ClusterConfiguration) HasContinuousQueries

func (self *ClusterConfiguration) HasContinuousQueries() bool

func (*ClusterConfiguration) HasUncommitedWrites added in v0.5.7

func (self *ClusterConfiguration) HasUncommitedWrites() bool

Return per shard request numbers for the local server and all remote servers

func (*ClusterConfiguration) HashDbAndSeriesToInt

func (self *ClusterConfiguration) HashDbAndSeriesToInt(database, series string) int

func (*ClusterConfiguration) IsSingleServer

func (self *ClusterConfiguration) IsSingleServer() bool

func (*ClusterConfiguration) LastContinuousQueryRunTime

func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time

func (*ClusterConfiguration) MarshalNewShardArrayToShards

func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewShardData) ([]*ShardData, error)

func (*ClusterConfiguration) RecoverFromWAL

func (self *ClusterConfiguration) RecoverFromWAL() error

func (*ClusterConfiguration) Recovery

func (self *ClusterConfiguration) Recovery(b []byte) error

func (*ClusterConfiguration) Save

func (self *ClusterConfiguration) Save() ([]byte, error)

func (*ClusterConfiguration) SaveClusterAdmin

func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin)

func (*ClusterConfiguration) SaveDbUser

func (self *ClusterConfiguration) SaveDbUser(u *DbUser)

func (*ClusterConfiguration) ServerId

func (self *ClusterConfiguration) ServerId() uint32

func (*ClusterConfiguration) Servers

func (self *ClusterConfiguration) Servers() []*ClusterServer

func (*ClusterConfiguration) SetContinuousQueryTimestamp

func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error

func (*ClusterConfiguration) SetLastContinuousQueryRunTime

func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time)

func (*ClusterConfiguration) SetShardCreator

func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator)

func (*ClusterConfiguration) WaitForLocalServerLoaded

func (self *ClusterConfiguration) WaitForLocalServerLoaded()

This function will wait until the configuration has received an addPotentialServer command for this local server.

type ClusterServer

type ClusterServer struct {
	Id                       uint32
	RaftName                 string
	State                    ServerState
	RaftConnectionString     string
	ProtobufConnectionString string

	HeartbeatInterval time.Duration
	Backoff           time.Duration
	MinBackoff        time.Duration
	MaxBackoff        time.Duration
	// contains filtered or unexported fields
}

func NewClusterServer

func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, config *c.Configuration) *ClusterServer

func (*ClusterServer) BufferWrite

func (self *ClusterServer) BufferWrite(request *protocol.Request)

func (*ClusterServer) Connect

func (self *ClusterServer) Connect()

func (*ClusterServer) GetId

func (self *ClusterServer) GetId() uint32

func (*ClusterServer) IsUp

func (self *ClusterServer) IsUp() bool

func (*ClusterServer) MakeRequest

func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response)

func (*ClusterServer) SetWriteBuffer

func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer)

func (*ClusterServer) StartHeartbeat

func (self *ClusterServer) StartHeartbeat()

func (*ClusterServer) Write

func (self *ClusterServer) Write(request *protocol.Request) error

type CommonUser

type CommonUser struct {
	Name          string `json:"name"`
	Hash          string `json:"hash"`
	IsUserDeleted bool   `json:"is_deleted"`
	CacheKey      string `json:"cache_key"`
}

func (*CommonUser) ChangePassword

func (self *CommonUser) ChangePassword(hash string) error

func (*CommonUser) GetDb

func (self *CommonUser) GetDb() string

func (*CommonUser) GetName

func (self *CommonUser) GetName() string

func (*CommonUser) HasReadAccess

func (self *CommonUser) HasReadAccess(name string) bool

func (*CommonUser) HasWriteAccess

func (self *CommonUser) HasWriteAccess(name string) bool

func (*CommonUser) IsClusterAdmin

func (self *CommonUser) IsClusterAdmin() bool

func (*CommonUser) IsDbAdmin

func (self *CommonUser) IsDbAdmin(db string) bool

func (*CommonUser) IsDeleted

func (self *CommonUser) IsDeleted() bool

type ContinuousQuery

type ContinuousQuery struct {
	Id    uint32
	Query string
}

type Database

type Database struct {
	Name              string `json:"name"`
	ReplicationFactor uint8  `json:"replicationFactor"`
}

type DbUser

type DbUser struct {
	CommonUser `json:"common"`
	Db         string     `json:"db"`
	ReadFrom   []*Matcher `json:"read_matchers"`
	WriteTo    []*Matcher `json:"write_matchers"`
	IsAdmin    bool       `json:"is_admin"`
}

func (*DbUser) ChangePermissions added in v0.5.11

func (self *DbUser) ChangePermissions(readPermissions, writePermissions string)

func (*DbUser) GetDb

func (self *DbUser) GetDb() string

func (*DbUser) HasReadAccess

func (self *DbUser) HasReadAccess(name string) bool

func (*DbUser) HasWriteAccess

func (self *DbUser) HasWriteAccess(name string) bool

func (*DbUser) IsDbAdmin

func (self *DbUser) IsDbAdmin(db string) bool

type LocalShardDb

type LocalShardDb interface {
	Write(database string, series *p.Series) error
	Query(*parser.QuerySpec, QueryProcessor) error
	DropDatabase(database string) error
	IsClosed() bool
}

type LocalShardStore

type LocalShardStore interface {
	Write(request *p.Request) error
	SetWriteBuffer(writeBuffer *WriteBuffer)
	BufferWrite(request *p.Request)
	GetOrCreateShard(id uint32) (LocalShardDb, error)
	ReturnShard(id uint32)
	DeleteShard(shardId uint32) error
}

type Matcher

type Matcher struct {
	IsRegex bool
	Name    string
}

func (*Matcher) Matches

func (self *Matcher) Matches(name string) bool

type NewShardData

type NewShardData struct {
	Id            uint32 `json:",omitempty"`
	StartTime     time.Time
	EndTime       time.Time
	ServerIds     []uint32
	Type          ShardType
	DurationSplit bool `json:",omitempty"`
}

type QueryProcessor

type QueryProcessor interface {
	// This method returns true if the query should continue. If the query should be stopped,
	// like maybe the limit was hit, it should return false
	YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool
	YieldSeries(seriesIncoming *p.Series) bool
	Close()

	// Set by the shard, so EXPLAIN query can know query against which shard is being measured
	SetShardInfo(shardId int, shardLocal bool)

	// Let QueryProcessor identify itself. What if it is a spy and we can't check that?
	GetName() string
}

Passed to a shard (local datastore or whatever) that gets yielded points from series.

type QuerySpec

type QuerySpec interface {
	GetStartTime() time.Time
	GetEndTime() time.Time
	Database() string
	TableNames() []string
	GetGroupByInterval() *time.Duration
	AllShardsQuery() bool
	IsRegex() bool
	ShouldQueryShortTermAndLongTerm() (shouldQueryShortTerm bool, shouldQueryLongTerm bool)
}

defined by cluster config (in cluster package)

type SavedConfiguration

type SavedConfiguration struct {
	Databases         map[string]uint8
	Admins            map[string]*ClusterAdmin
	DbUsers           map[string]map[string]*DbUser
	Servers           []*ClusterServer
	ShortTermShards   []*NewShardData
	LongTermShards    []*NewShardData
	ContinuousQueries map[string][]*ContinuousQuery
}

type ServerConnection

type ServerConnection interface {
	Connect()
	Close()
	ClearRequests()
	MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error
}

type ServerState

type ServerState int
const (
	LoadingRingData ServerState = iota
	SendingRingData
	DeletingOldData
	Running
	Potential
)

type Shard

type Shard interface {
	Id() uint32
	StartTime() time.Time
	EndTime() time.Time
	Write(*p.Request) error
	SyncWrite(*p.Request) error
	Query(querySpec *parser.QuerySpec, response chan *p.Response)
	IsMicrosecondInRange(t int64) bool
}

A shard imements an interface for writing and querying data. It can be copied to multiple servers or the local datastore. Shard contains data from [startTime, endTime) Ids are unique across the cluster

type ShardCollection

type ShardCollection []*ShardData

func (ShardCollection) Len

func (s ShardCollection) Len() int

func (ShardCollection) Swap

func (s ShardCollection) Swap(i, j int)

type ShardCreator

type ShardCreator interface {
	// the shard creator expects all shards to be of the same type (long term or short term) and have the same
	// start and end times. This is called to create the shard set for a given duration.
	CreateShards(shards []*NewShardData) ([]*ShardData, error)
}

type ShardData

type ShardData struct {
	IsLocal bool
	// contains filtered or unexported fields
}

func NewShard

func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, durationIsSplit bool, wal WAL) *ShardData

func (*ShardData) DropDatabase

func (self *ShardData) DropDatabase(database string, sendToServers bool)

func (*ShardData) EndMicro

func (self *ShardData) EndMicro() int64

func (*ShardData) EndTime

func (self *ShardData) EndTime() time.Time

func (*ShardData) HandleDestructiveQuery

func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool)

func (*ShardData) Id

func (self *ShardData) Id() uint32

func (*ShardData) IsMicrosecondInRange

func (self *ShardData) IsMicrosecondInRange(t int64) bool

func (*ShardData) LogAndHandleDestructiveQuery

func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool)

func (*ShardData) Query

func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Response)

func (*ShardData) QueryResponseBufferSize added in v0.5.4

func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batchPointSize int) int

func (*ShardData) ServerIds

func (self *ShardData) ServerIds() []uint32

func (*ShardData) SetLocalStore

func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error

func (*ShardData) SetServers

func (self *ShardData) SetServers(servers []*ClusterServer)

func (*ShardData) ShouldAggregateLocally

func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool

func (*ShardData) StartMicro

func (self *ShardData) StartMicro() int64

func (*ShardData) StartTime

func (self *ShardData) StartTime() time.Time

func (*ShardData) String

func (self *ShardData) String() string

func (*ShardData) SyncWrite added in v0.5.11

func (self *ShardData) SyncWrite(request *p.Request) error

func (*ShardData) ToNewShardData

func (self *ShardData) ToNewShardData() *NewShardData

used to serialize shards when sending around in raft or when snapshotting in the log

func (*ShardData) Write

func (self *ShardData) Write(request *p.Request) error

func (*ShardData) WriteLocalOnly

func (self *ShardData) WriteLocalOnly(request *p.Request) error

type ShardType

type ShardType int
const (
	LONG_TERM ShardType = iota
	SHORT_TERM
)

type WAL

type WAL interface {
	AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error)
	Commit(requestNumber uint32, serverId uint32) error
	CreateCheckpoint() error
	RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
	RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
}

type WriteBuffer

type WriteBuffer struct {
	// contains filtered or unexported fields
}

Acts as a buffer for writes

func NewWriteBuffer

func NewWriteBuffer(writerInfo string, writer Writer, wal WAL, serverId uint32, bufferSize int) *WriteBuffer

func (*WriteBuffer) HasUncommitedWrites added in v0.5.7

func (self *WriteBuffer) HasUncommitedWrites() bool

func (*WriteBuffer) ShardsRequestNumber added in v0.5.7

func (self *WriteBuffer) ShardsRequestNumber() map[uint32]uint32

func (*WriteBuffer) Write

func (self *WriteBuffer) Write(request *protocol.Request)

This method never blocks. It'll buffer writes until they fill the buffer then drop the on the floor and let the background goroutine replay from the WAL

type Writer

type Writer interface {
	Write(request *protocol.Request) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL