watchsvc

package
v0.0.0-...-c0686e8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitWatchProcessor

func InitWatchProcessor(
	cfg Config,
	parent tally.Scope,
)

InitWatchProcessor initializes WatchProcessor singleton.

func NewWatchID

func NewWatchID(clientType ClientType) string

NewWatchID creates a new watch id UUID string for the specific watch client type

Types

type ClientType

type ClientType string

ClientType is a enum string to be embedded in the watch id returned to the client, used to indicate the watch client type.

const (
	// ClientTypeTask indicates the watch id belongs to a task watch client
	ClientTypeTask ClientType = "task"
	// ClientTypeJob indicates the watch id belongs to a job watch client
	ClientTypeJob ClientType = "job"
)

func (ClientType) String

func (t ClientType) String() string

type Config

type Config struct {
	// Size of per-client internal buffer
	BufferSize int `yaml:"buffer_size"`

	// Maximum number of concurrent watch clients
	MaxClient int `yaml:"max_client"`
}

Config for Watch API

type JobClient

type JobClient struct {
	Input  chan *stateless.JobSummary
	Signal chan StopSignal
	// contains filtered or unexported fields
}

JobClient represents a client which interested in job event changes.

type Metrics

type Metrics struct {
	WatchPodCancel   tally.Counter
	WatchPodOverflow tally.Counter

	WatchJobCancel   tally.Counter
	WatchJobOverflow tally.Counter

	CancelNotFound tally.Counter

	// Time takes to acquire lock in watch processor
	ProcessorLockDuration tally.Timer
}

Metrics is a placeholder for all metrics in watch api.

func NewMetrics

func NewMetrics(scope tally.Scope) *Metrics

NewMetrics returns a new instance of watchsvc.Metrics.

type ServiceHandler

type ServiceHandler struct {
	// contains filtered or unexported fields
}

ServiceHandler implements peloton.api.v1alpha.watch.svc.WatchService

func NewServiceHandler

func NewServiceHandler(
	metrics *Metrics,
	processor WatchProcessor,
) *ServiceHandler

NewServiceHandler initializes a new instance of ServiceHandler

func NewTestServiceHandler

func NewTestServiceHandler() *ServiceHandler

NewTestServiceHandler returns an empty new ServiceHandler ptr for testing.

func (*ServiceHandler) Cancel

func (h *ServiceHandler) Cancel(
	ctx context.Context,
	req *svc.CancelRequest,
) (*svc.CancelResponse, error)

Cancel cancels a watch. The watch stream will get an error indicating watch was cancelled and the stream will be closed.

func (*ServiceHandler) Watch

Watch creates a watch to get notified about changes to Peloton objects. Changed objects are streamed back to the caller till the watch is cancelled.

type StopSignal

type StopSignal int

StopSignal is an event sent through task / job client Signal channel indicating a stnop event for the specific watcher.

const (
	// StopSignalUnknown indicates a unspecified StopSignal.
	StopSignalUnknown StopSignal = iota
	// StopSignalCancel indicates the watch is cancelled by the user.
	StopSignalCancel
	// StopSignalOverflow indicates the watch is aborted due to event
	// overflow.
	StopSignalOverflow
)

func (StopSignal) String

func (s StopSignal) String() string

String returns a user-friendly name for the specific StopSignal

type TaskClient

type TaskClient struct {
	Input  chan *pod.PodSummary
	Signal chan StopSignal
	// contains filtered or unexported fields
}

TaskClient represents a client which interested in task event changes.

type WatchListener

type WatchListener struct {
	// contains filtered or unexported fields
}

WatchListener is a job / task runtime event listener which implements cached.JobTaskListener interface, used by watch api.

func NewWatchListener

func NewWatchListener(processor WatchProcessor) WatchListener

NewWatchListener returns a new instance of watchsvc.WatchListener

func (WatchListener) BatchJobSummaryChanged

func (l WatchListener) BatchJobSummaryChanged(
	jobID *v0peloton.JobID,
	jobSummary *job.JobSummary,
)

func (WatchListener) Name

func (l WatchListener) Name() string

Name returns a user-friendly name for the listener

func (WatchListener) PodSummaryChanged

func (l WatchListener) PodSummaryChanged(
	jobType job.JobType,
	summary *pod.PodSummary,
	labels []*v1peloton.Label,
)

PodSummaryChanged is invoked when the summary for a pod is updated in cache and persistent store.

func (WatchListener) StatelessJobSummaryChanged

func (l WatchListener) StatelessJobSummaryChanged(
	jobSummary *stateless.JobSummary,
)

JobSummaryChanged is invoked when the runtime for a job is updated in cache and persistent store.

type WatchProcessor

type WatchProcessor interface {
	// NewTaskClient creates a new watch client for task event changes.
	// Returns the watch id and a new instance of TaskClient.
	NewTaskClient(filter *watch.PodFilter) (string, *TaskClient, error)

	// StopTaskClient stops a task watch client. Returns "not-found" error
	// if the corresponding watch client is not found.
	StopTaskClient(watchID string) error

	// StopTaskClients stops all the task clients on leadership change.
	StopTaskClients()

	// NotifyPodChange receives pod event, and notifies all the clients
	// which are interested in the pod.
	NotifyPodChange(pod *pod.PodSummary, podLabels []*peloton.Label)

	// NewJobClient creates a new watch client for job event changes.
	// Returns the watch id and an new instance of JobClient.
	NewJobClient(filter *watch.StatelessJobFilter) (string, *JobClient, error)

	// StopJobClient stops a job watch client. Returns "not-found" error
	// if the corresponding watch client is not found.
	StopJobClient(watchID string) error

	// StopJobClients stops all the job clients on leadership change.
	StopJobClients()

	// NotifyJobChange receives job event, and notifies all the clients
	// which are interested in the job.
	NotifyJobChange(job *stateless.JobSummary)
}

WatchProcessor interface is a central controller which handles watch client lifecycle, and task / job event fan-out.

func GetWatchProcessor

func GetWatchProcessor() WatchProcessor

GetWatchProcessor returns WatchProcessor singleton.

func InitV1AlphaWatchServiceHandler

func InitV1AlphaWatchServiceHandler(
	d *yarpc.Dispatcher,
	parent tally.Scope,
	config Config,
) WatchProcessor

InitV1AlphaWatchServiceHandler initializes the Watch Service Handler, and registers with yarpc dispatcher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL