Documentation
¶
Index ¶
- Variables
- type ClusterServer
- func (c *ClusterServer) IsNodeLeader(ctx context.Context, req *v1.Empty) (*v1.IsNodeLeaderResponse, error)
- func (c *ClusterServer) Join(ctx context.Context, req *v1.JoinRequest) (*v1.Empty, error)
- func (c *ClusterServer) Leave(ctx context.Context, req *v1.LeaveRequest) (*v1.Empty, error)
- func (c *ClusterServer) Snapshot(ctx context.Context, req *v1.Empty) (*v1.Empty, error)
- type HealthCheckServer
- type JSMServer
- func (j *JSMServer) Bury(ctx context.Context, req *v1.BuryRequest) (*v1.Empty, error)
- func (j *JSMServer) CheckClientState(ctx context.Context, req *v1.CheckClientStateRequest) (*v1.CheckClientStateResponse, error)
- func (j *JSMServer) Delete(ctx context.Context, req *v1.DeleteRequest) (*v1.Empty, error)
- func (j *JSMServer) GetJob(ctx context.Context, req *v1.GetJobRequest) (*v1.GetJobResponse, error)
- func (j *JSMServer) GetStatsJobYaml(ctx context.Context, req *v1.GetStatsJobYamlRequest) (*v1.GetStatsJobYamlResponse, error)
- func (j *JSMServer) GetStatsTubeYaml(ctx context.Context, req *v1.GetStatsTubeYamlRequest) (*v1.GetStatsTubeYamlResponse, error)
- func (j *JSMServer) GetStatsYaml(ctx context.Context, req *v1.Empty) (*v1.GetStatsYamlResponse, error)
- func (j *JSMServer) Kick(ctx context.Context, req *v1.KickRequest) (*v1.Empty, error)
- func (j *JSMServer) KickN(ctx context.Context, req *v1.KickNRequest) (*v1.KickNResponse, error)
- func (j *JSMServer) ListTubes(ctx context.Context, req *v1.Empty) (*v1.ListTubesResponse, error)
- func (j *JSMServer) PeekBuried(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
- func (j *JSMServer) PeekDelayed(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
- func (j *JSMServer) PeekReady(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
- func (j *JSMServer) Put(ctx context.Context, req *v1.PutRequest) (*v1.PutResponse, error)
- func (j *JSMServer) Release(ctx context.Context, req *v1.ReleaseRequest) (*v1.Empty, error)
- func (j *JSMServer) Reserve(ctx context.Context, req *v1.ReserveRequest) (*v1.ReserveResponse, error)
- func (j *JSMServer) RunController()
- func (j *JSMServer) StreamReserveUpdates(req *v1.ReserveUpdateRequest, ...) error
- func (j *JSMServer) Tick() (*v1.TickResponse, error)
- func (j *JSMServer) Touch(ctx context.Context, req *v1.TouchRequest) (*v1.Empty, error)
- type JsmTick
- type ProxyJoinReq
- type ProxyLeaveReq
- type ProxyResp
- type ProxyRespType
- type RaftCluster
- type ReplicatedJsm
- type ReservationsController
- type ServiceReadiness
Constants ¶
This section is empty.
Variables ¶
var ( // Returned if the same proxy client attempts to connect with the controller ErrProxyExists = errors.New("proxy with id exists") ErrNotLeader = errors.New("current node is not a leader") )
Functions ¶
This section is empty.
Types ¶
type ClusterServer ¶
type ClusterServer struct {
v1.UnimplementedClusterServer
// contains filtered or unexported fields
}
func NewClusterServer ¶
func NewClusterServer(rc RaftCluster) *ClusterServer
func (*ClusterServer) IsNodeLeader ¶
func (c *ClusterServer) IsNodeLeader(ctx context.Context, req *v1.Empty) (*v1.IsNodeLeaderResponse, error)
func (*ClusterServer) Join ¶
func (c *ClusterServer) Join(ctx context.Context, req *v1.JoinRequest) (*v1.Empty, error)
Join joins a node, identified by nodeID and located at addr, to this cluster. The node must be ready to respond to Raft communications at that address.
It is required that the node that this is called into is a leader node.
func (*ClusterServer) Leave ¶
func (c *ClusterServer) Leave(ctx context.Context, req *v1.LeaveRequest) (*v1.Empty, error)
Leave leaves a node, identified by nodeID and located at addr, to this store.
It is required that the node that this is called into is a leader node.
type HealthCheckServer ¶
type HealthCheckServer struct {
healthV1.UnimplementedHealthServer
// contains filtered or unexported fields
}
func NewHealthCheckServer ¶
func NewHealthCheckServer(s ServiceReadiness) *HealthCheckServer
func (*HealthCheckServer) Check ¶
func (h *HealthCheckServer) Check(ctx context.Context, req *healthV1.HealthCheckRequest) (*healthV1.HealthCheckResponse, error)
func (*HealthCheckServer) Watch ¶
func (h *HealthCheckServer) Watch(req *healthV1.HealthCheckRequest, stream healthV1.Health_WatchServer) error
type JSMServer ¶
type JSMServer struct {
v1.UnimplementedJobStateMachineServer
// contains filtered or unexported fields
}
JSMServer exports the ReplicatedJSM as a callable server.
func NewJSMServer ¶
func NewJSMServer(r ReplicatedJsm) *JSMServer
NewJSMServer returns a pointer to a new JSMServer struct.
func (*JSMServer) CheckClientState ¶
func (j *JSMServer) CheckClientState(ctx context.Context, req *v1.CheckClientStateRequest) (*v1.CheckClientStateResponse, error)
CheckClientState returns the current state of a specific proxy client.
func (*JSMServer) GetJob ¶
func (j *JSMServer) GetJob(ctx context.Context, req *v1.GetJobRequest) (*v1.GetJobResponse, error)
GetJob returns a job
func (*JSMServer) GetStatsJobYaml ¶
func (j *JSMServer) GetStatsJobYaml(ctx context.Context, req *v1.GetStatsJobYamlRequest) (*v1.GetStatsJobYamlResponse, error)
GetStatsJobYaml returns a specific job's statistics as YAML formatted response.
func (*JSMServer) GetStatsTubeYaml ¶
func (j *JSMServer) GetStatsTubeYaml(ctx context.Context, req *v1.GetStatsTubeYamlRequest) (*v1.GetStatsTubeYamlResponse, error)
GetStatsTubeYaml returns a specific tube's statistics as a YAML formatted response.
func (*JSMServer) GetStatsYaml ¶
func (j *JSMServer) GetStatsYaml(ctx context.Context, req *v1.Empty) (*v1.GetStatsYamlResponse, error)
GetStatsYaml returns the server statistics as a YAML formatted response.
func (*JSMServer) KickN ¶
func (j *JSMServer) KickN(ctx context.Context, req *v1.KickNRequest) (*v1.KickNResponse, error)
KickN moves N jobs from the top of the buried queue to the ready queue.
func (*JSMServer) PeekBuried ¶
func (j *JSMServer) PeekBuried(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
PeekBuried peeks and returns the first buried job in the specified tube
func (*JSMServer) PeekDelayed ¶
func (j *JSMServer) PeekDelayed(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
PeekDelayed peeks and returns the first delayed job in the specified tube
func (*JSMServer) PeekReady ¶
func (j *JSMServer) PeekReady(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)
PeekReady peeks and returns the first ready job in the specified tube
func (*JSMServer) Put ¶
func (j *JSMServer) Put(ctx context.Context, req *v1.PutRequest) (*v1.PutResponse, error)
Put creates a new job.
func (*JSMServer) Reserve ¶
func (j *JSMServer) Reserve(ctx context.Context, req *v1.ReserveRequest) (*v1.ReserveResponse, error)
Reserve allows a client to reserve a job for processing.
func (*JSMServer) RunController ¶
func (j *JSMServer) RunController()
RunController runs the ReservationsController in a separate go-routine.
func (*JSMServer) StreamReserveUpdates ¶
func (j *JSMServer) StreamReserveUpdates(req *v1.ReserveUpdateRequest, stream v1.JobStateMachine_StreamReserveUpdatesServer) error
StreamReserveUpdates returns back a continuous stream of reservation updates from the cluster node back to a specific proxy.
type JsmTick ¶
type JsmTick interface {
Tick() (*v1.TickResponse, error)
}
type ProxyJoinReq ¶
type ProxyJoinReq struct {
// contains filtered or unexported fields
}
type ProxyLeaveReq ¶
type ProxyLeaveReq struct {
// contains filtered or unexported fields
}
type ProxyResp ¶
type ProxyResp struct {
RespType ProxyRespType
Reservations []*v1.Reservation
Err error
}
type ProxyRespType ¶
type ProxyRespType int
const ( Unknown ProxyRespType = iota Join Leave Reservation )
func (ProxyRespType) String ¶
func (p ProxyRespType) String() string
type RaftCluster ¶
type RaftCluster interface {
// Join, joins this node, identified by nodeID and reachable at addr,
// to an existing Raft cluster.
Join(nodeID, addr string) error
// Leave, leave this specified node, identified by nodeID from
// an existing Raft cluster.
Leave(nodeID string) error
// Returns true if this specified node is a Leader
IsLeader() bool
// Ask the node to take a snapshot
Snapshot() error
}
type ReplicatedJsm ¶
type ReplicatedJsm interface {
// Apply the provided request
ApplyOp(req *v1.ApplyOpRequest) *v1.ApplyOpResponse
// Ask the server for the current clock (now in secs)
NowSeconds() int64
// Returns true if this node is a leader
IsLeader() bool
}
ReplicatedJsm represents a JobStateMachine that is replicated via RAFT
type ReservationsController ¶
type ReservationsController struct {
// contains filtered or unexported fields
}
reservationsController provides the ability to stream reservation updates from the job state machine (jsm) back to the connected clients (aka. "proxy" clients)
A high level overview:
┌----------------┐ ┌----------------┐ │ State Proxy │ │ State Proxy │ │ Client │ ...... │ Client │ └----------------┘ └----------------┘
^ ^ | | | | (stream Reservations) | |
┌---------------------------------------------------┐ │ reservationsController │ └---------------------------------------------------┘
| ^ | (every 1s) | Reservations | | V |
┌---------------------------------------------------┐ │ JSM.Tick() │ └---------------------------------------------------┘
func NewReservationsController ¶
func NewReservationsController(jsmTick JsmTick) *ReservationsController
func (*ReservationsController) Register ¶
func (rctrl *ReservationsController) Register(proxyID string) (<-chan *ProxyResp, error)
Register makes a request to add this proxy client (identified by proxyID)
Register returns back a read only channel to receive updates
func (*ReservationsController) Run ¶
func (rctrl *ReservationsController) Run() error
Run, runs this controller. the control loop does not return immediately (unless there is an error)
Run performs the following functions
- Periodically (for every second), queries the underlying JSM (job state machine) for any reservation updates. (These could include newly assigned jobs, timeouts or deadline-soon to any Reservations request). These updates are dispatched to the appropriate client proxy (if they are connected.
- Processes any register (join) or un-register (leave) requests from proxies
func (*ReservationsController) Stop ¶
func (rctrl *ReservationsController) Stop()
func (*ReservationsController) UnRegister ¶
func (rctrl *ReservationsController) UnRegister(proxyID string, respCh <-chan *ProxyResp)
UnRegister makes a request to remove this proxy client (identified by the proxyID)
Additionally, once the unRegister is complete, it drains the response channel
type ServiceReadiness ¶
type ServiceReadiness interface {
// Ready indicates if this service is ready to accept traffic
Ready() bool
}