scheduler

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2023 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MilestoneKeyPrefix  = "milestone-"
	MilestoneLatest     = MilestoneKeyPrefix + "latest"
	MilestoneDispatched = MilestoneKeyPrefix + "dispatched"
	MilestoneStarted    = MilestoneKeyPrefix + "started"
)
View Source
const (
	TaskDispatched = "TaskDispatched"
)

Variables

View Source
var (
	DefaultDBPath               = "./events.db"
	DefaultDBSnapshotPrefix     = "events-db-snapshot"
	DefaultEventCompactDuration = 60 // 60 seconds
	DefaultTaskCounterTTL       = 30 // 30 seconds
)
View Source
var BoltDBOption = &bbolt.Options{
	Timeout:      time.Second,
	NoGrowSync:   false,
	FreelistType: bbolt.FreelistArrayType,
}

Functions

This section is empty.

Types

type EventManager

type EventManager struct {
	// contains filtered or unexported fields
}

func NewEventManager

func NewEventManager(sc config.SnapshotConfig, schedulerId string, lg types.Logger) (*EventManager, error)

func (*EventManager) Backup

func (m *EventManager) Backup() (string, error)

func (*EventManager) CountRunningTasks

func (m *EventManager) CountRunningTasks(tenantId string) (n int, err error)

func (*EventManager) CreateTenantBucket

func (m *EventManager) CreateTenantBucket(tenantId string)

func (*EventManager) Delete

func (m *EventManager) Delete(tenantId, taskId string) error

func (*EventManager) Insert

func (m *EventManager) Insert(e *TaskEvent) error

func (*EventManager) Iterate

func (m *EventManager) Iterate(tenantId, taskId string, fn func(e *TaskEvent) bool) error

func (*EventManager) Latest

func (m *EventManager) Latest(tenantId, taskId string) (ev *TaskEvent, err error)

func (*EventManager) Tasks

func (m *EventManager) Tasks(tenantId string) (ids []string, err error)

type Options

type Options struct {
	Name                    string                 // Scheduler name, also used as partition name
	Zone                    string                 // Zone name
	ScheduleInterval        int64                  // Interval in seconds for checking active tenants & new tasks
	StaleCheckDelay         int64                  // Time in seconds for checking stale tasks
	TaskEventUpdateDeadline int64                  // Deadline in seconds for the scheduler to receive task update events
	Snapshot                config.SnapshotConfig  // Scheduler state snapshot configurations
	Transport               config.TransportConfig // Transport config
	ServerConfig            config.ServerConfig    // http and grpc config
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func New

func New(opt *Options, db types.DB, lg types.Logger, ls types.Listener) (s *Scheduler, err error)

func (*Scheduler) IsTaskTimeout

func (s *Scheduler) IsTaskTimeout(ev *TaskEvent) bool

func (*Scheduler) SchedulerId

func (s *Scheduler) SchedulerId() string

func (*Scheduler) Start

func (s *Scheduler) Start()

type Server

type Server struct {
	pb.UnimplementedScheduleServiceServer
	// contains filtered or unexported fields
}

func NewServer

func NewServer(db types.DB, sched *Scheduler, config config.ServerConfig, lg types.Logger, ls types.Listener) *Server

func (*Server) CreateTask

func (s *Server) CreateTask(ctx context.Context, req *pb.CreateTaskRequest) (*pb.Response, error)

func (*Server) CreateTenant

func (s *Server) CreateTenant(ctx context.Context, req *pb.CreateTenantRequest) (*pb.Response, error)

func (*Server) PauseTask

func (s *Server) PauseTask(ctx context.Context, req *pb.PauseTaskRequest) (*pb.Response, error)

func (*Server) QueryTaskStatus

func (s *Server) QueryTaskStatus(ctx context.Context, req *pb.QueryTaskStatusRequest) (*pb.QueryTaskStatusResponse, error)

func (*Server) QueryTenantTaskConcurrency

func (s *Server) QueryTenantTaskConcurrency(ctx context.Context, req *pb.QueryTenantTaskConcurrencyRequest) (resp *pb.QueryTenantTaskConcurrencyResponse, err error)

func (*Server) RestartTask

func (s *Server) RestartTask(ctx context.Context, req *pb.RestartTaskRequest) (*pb.Response, error)

func (*Server) Start

func (s *Server) Start()

func (*Server) StopTask

func (s *Server) StopTask(ctx context.Context, req *pb.StopTaskRequest) (*pb.Response, error)

type TaskEvent

type TaskEvent struct {
	EventType string          `json:"eventType"`
	WorkerId  string          `json:"workerId"`
	TenantId  string          `json:"tenantId"`
	TaskId    string          `json:"taskId"`
	TaskType  enum.TaskType   `json:"taskType"`
	Timestamp time.Time       `json:"timestamp"`
	Value     json.RawMessage `json:"value"`
}

func NewEventFromMessage

func NewEventFromMessage(m *types.TaskMessage) *TaskEvent

func NewEventFromUserTask

func NewEventFromUserTask(typ string, t *entity.UserTask) *TaskEvent

func (*TaskEvent) Key

func (ev *TaskEvent) Key() []byte

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL