server

package
v0.0.0-...-fd73c8f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2025 License: MPL-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Returned if the same proxy client attempts to connect with the controller
	ErrProxyExists = errors.New("proxy with id exists")
	ErrNotLeader   = errors.New("current node is not a leader")
)

Functions

This section is empty.

Types

type ClusterServer

type ClusterServer struct {
	v1.UnimplementedClusterServer
	// contains filtered or unexported fields
}

func NewClusterServer

func NewClusterServer(rc RaftCluster) *ClusterServer

func (*ClusterServer) IsNodeLeader

func (c *ClusterServer) IsNodeLeader(ctx context.Context, req *v1.Empty) (*v1.IsNodeLeaderResponse, error)

func (*ClusterServer) Join

func (c *ClusterServer) Join(ctx context.Context, req *v1.JoinRequest) (*v1.Empty, error)

Join joins a node, identified by nodeID and located at addr, to this cluster. The node must be ready to respond to Raft communications at that address.

It is required that the node that this is called into is a leader node.

func (*ClusterServer) Leave

func (c *ClusterServer) Leave(ctx context.Context, req *v1.LeaveRequest) (*v1.Empty, error)

Leave leaves a node, identified by nodeID and located at addr, to this store.

It is required that the node that this is called into is a leader node.

func (*ClusterServer) Snapshot

func (c *ClusterServer) Snapshot(ctx context.Context, req *v1.Empty) (*v1.Empty, error)

type HealthCheckServer

type HealthCheckServer struct {
	healthV1.UnimplementedHealthServer
	// contains filtered or unexported fields
}

func NewHealthCheckServer

func NewHealthCheckServer(s ServiceReadiness) *HealthCheckServer

func (*HealthCheckServer) Check

func (*HealthCheckServer) Watch

type JSMServer

type JSMServer struct {
	v1.UnimplementedJobStateMachineServer
	// contains filtered or unexported fields
}

JSMServer exports the ReplicatedJSM as a callable server.

func NewJSMServer

func NewJSMServer(r ReplicatedJsm) *JSMServer

NewJSMServer returns a pointer to a new JSMServer struct.

func (*JSMServer) Bury

func (j *JSMServer) Bury(ctx context.Context, req *v1.BuryRequest) (*v1.Empty, error)

Bury allows a specific job to be buried

func (*JSMServer) CheckClientState

CheckClientState returns the current state of a specific proxy client.

func (*JSMServer) Delete

func (j *JSMServer) Delete(ctx context.Context, req *v1.DeleteRequest) (*v1.Empty, error)

Delete removes a job

func (*JSMServer) GetJob

func (j *JSMServer) GetJob(ctx context.Context, req *v1.GetJobRequest) (*v1.GetJobResponse, error)

GetJob returns a job

func (*JSMServer) GetStatsJobYaml

GetStatsJobYaml returns a specific job's statistics as YAML formatted response.

func (*JSMServer) GetStatsTubeYaml

GetStatsTubeYaml returns a specific tube's statistics as a YAML formatted response.

func (*JSMServer) GetStatsYaml

func (j *JSMServer) GetStatsYaml(ctx context.Context, req *v1.Empty) (*v1.GetStatsYamlResponse, error)

GetStatsYaml returns the server statistics as a YAML formatted response.

func (*JSMServer) Kick

func (j *JSMServer) Kick(ctx context.Context, req *v1.KickRequest) (*v1.Empty, error)

Kick moves a specific buried job to back to the ready queue

func (*JSMServer) KickN

func (j *JSMServer) KickN(ctx context.Context, req *v1.KickNRequest) (*v1.KickNResponse, error)

KickN moves N jobs from the top of the buried queue to the ready queue.

func (*JSMServer) ListTubes

func (j *JSMServer) ListTubes(ctx context.Context, req *v1.Empty) (*v1.ListTubesResponse, error)

ListTubes returns a list of tubes currently available.

func (*JSMServer) PeekBuried

func (j *JSMServer) PeekBuried(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)

PeekBuried peeks and returns the first buried job in the specified tube

func (*JSMServer) PeekDelayed

func (j *JSMServer) PeekDelayed(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)

PeekDelayed peeks and returns the first delayed job in the specified tube

func (*JSMServer) PeekReady

func (j *JSMServer) PeekReady(ctx context.Context, req *v1.PeekRequest) (*v1.PeekResponse, error)

PeekReady peeks and returns the first ready job in the specified tube

func (*JSMServer) Put

func (j *JSMServer) Put(ctx context.Context, req *v1.PutRequest) (*v1.PutResponse, error)

Put creates a new job.

func (*JSMServer) Release

func (j *JSMServer) Release(ctx context.Context, req *v1.ReleaseRequest) (*v1.Empty, error)

Release allows a client to return a job from being reserved.

func (*JSMServer) Reserve

func (j *JSMServer) Reserve(ctx context.Context, req *v1.ReserveRequest) (*v1.ReserveResponse, error)

Reserve allows a client to reserve a job for processing.

func (*JSMServer) RunController

func (j *JSMServer) RunController()

RunController runs the ReservationsController in a separate go-routine.

func (*JSMServer) StreamReserveUpdates

StreamReserveUpdates returns back a continuous stream of reservation updates from the cluster node back to a specific proxy.

func (*JSMServer) Tick

func (j *JSMServer) Tick() (*v1.TickResponse, error)

Tick progresses the underlying job state machine

func (*JSMServer) Touch

func (j *JSMServer) Touch(ctx context.Context, req *v1.TouchRequest) (*v1.Empty, error)

Touch allows a client to continue to its reservation by the job's TTR

type JsmTick

type JsmTick interface {
	Tick() (*v1.TickResponse, error)
}

type ProxyJoinReq

type ProxyJoinReq struct {
	// contains filtered or unexported fields
}

type ProxyLeaveReq

type ProxyLeaveReq struct {
	// contains filtered or unexported fields
}

type ProxyResp

type ProxyResp struct {
	RespType     ProxyRespType
	Reservations []*v1.Reservation
	Err          error
}

type ProxyRespType

type ProxyRespType int
const (
	Unknown ProxyRespType = iota
	Join
	Leave
	Reservation
)

func (ProxyRespType) String

func (p ProxyRespType) String() string

type RaftCluster

type RaftCluster interface {
	// Join, joins this node, identified by nodeID and reachable at addr,
	// to an existing Raft cluster.
	Join(nodeID, addr string) error

	// Leave, leave this specified node, identified by nodeID from
	// an existing Raft cluster.
	Leave(nodeID string) error

	// Returns true if this specified node is a Leader
	IsLeader() bool

	// Ask the node to take a snapshot
	Snapshot() error
}

type ReplicatedJsm

type ReplicatedJsm interface {
	// Apply the provided request
	ApplyOp(req *v1.ApplyOpRequest) *v1.ApplyOpResponse

	// Ask the server for the current clock (now in secs)
	NowSeconds() int64

	// Returns true if this node is a leader
	IsLeader() bool
}

ReplicatedJsm represents a JobStateMachine that is replicated via RAFT

type ReservationsController

type ReservationsController struct {
	// contains filtered or unexported fields
}

reservationsController provides the ability to stream reservation updates from the job state machine (jsm) back to the connected clients (aka. "proxy" clients)

A high level overview:

┌----------------┐ ┌----------------┐ │ State Proxy │ │ State Proxy │ │ Client │ ...... │ Client │ └----------------┘ └----------------┘

^                           ^
|                           |
|                           | (stream Reservations)
|                           |

┌---------------------------------------------------┐ │ reservationsController │ └---------------------------------------------------┘

|                      ^
| (every 1s)           | Reservations
|                      |
V                      |

┌---------------------------------------------------┐ │ JSM.Tick() │ └---------------------------------------------------┘

func NewReservationsController

func NewReservationsController(jsmTick JsmTick) *ReservationsController

func (*ReservationsController) Register

func (rctrl *ReservationsController) Register(proxyID string) (<-chan *ProxyResp, error)

Register makes a request to add this proxy client (identified by proxyID)

Register returns back a read only channel to receive updates

func (*ReservationsController) Run

func (rctrl *ReservationsController) Run() error

Run, runs this controller. the control loop does not return immediately (unless there is an error)

Run performs the following functions

  1. Periodically (for every second), queries the underlying JSM (job state machine) for any reservation updates. (These could include newly assigned jobs, timeouts or deadline-soon to any Reservations request). These updates are dispatched to the appropriate client proxy (if they are connected.
  2. Processes any register (join) or un-register (leave) requests from proxies

func (*ReservationsController) Stop

func (rctrl *ReservationsController) Stop()

func (*ReservationsController) UnRegister

func (rctrl *ReservationsController) UnRegister(proxyID string, respCh <-chan *ProxyResp)

UnRegister makes a request to remove this proxy client (identified by the proxyID)

Additionally, once the unRegister is complete, it drains the response channel

type ServiceReadiness

type ServiceReadiness interface {
	// Ready indicates if this service is ready to accept traffic
	Ready() bool
}

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL