core

package
v0.0.0-...-a7f6470 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2019 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const MIN_FLOAT = -1000000

MIN_FLOAT defin min float value if error occures while getting predictions set default value to MinFloat

Variables

View Source
var DB *sql.DB

DB is global pointer to sql database object, it is initialized once when server starts

View Source
var DBTYPE string

DBTYPE holds database type, e.g. sqlite3

View Source
var DefaultProcessor = &Processor{}

DefaultProcessor is a default processor instance

View Source
var RouterModel bool

RouterModel tells if agent enable router

View Source
var StorageQueue chan Job

StorageQueue is a buffered channel that we can send work requests on.

View Source
var TransferDelayThreshold int

TransferDelayThreshold controls maximum threshold in seconds TransferRequest will wait before giving up

View Source
var TransferQueue chan Job

TransferQueue is an instance of dispatcher to handle the transfer process

View Source
var TransferType string

TransferType decides which pull or push based model is used

Functions

func CheckAgent

func CheckAgent(agentUrl string) error

CheckAgent get status of the agent

func InitQueue

func InitQueue(transferQueueSize int, storageQueueSize int, mfile string, minterval int64, monitorTime int64, router bool)

InitQueue initializes RequestQueue, transferQueue and StorageQueue

func NewRouter

func NewRouter(interval string, agent *map[string]string, csvFile string) *cron.Cron

NewRouter returns new instance of Router type

func RedirectRequest

func RedirectRequest(t *TransferRequest) error

RedirectRequest sends request to appropriate agent(s) either based on routing predictions or to src/dst agents for push/pull model

func SubmitRequest

func SubmitRequest(j []Job, src string, dst string) error

SubmitRequest submits request to destination

Types

type AgentStatus

type AgentStatus struct {
	Url       string            `json:"url"`      // agent url
	Name      string            `json:"name"`     // agent name or alias
	TimeStamp int64             `json:"ts"`       // time stamp
	Catalog   string            `json:"catalog"`  // underlying TFC catalog
	Protocol  string            `json:"protocol"` // underlying transfer protocol
	Backend   string            `json:"backend"`  // underlying transfer backend
	Tool      string            `json:"tool"`     // underlying transfer tool, e.g. xrdcp
	ToolOpts  string            `json:"toolopts"` // options for backend tool
	Agents    map[string]string `json:"agents"`   // list of known agents
	Addrs     []string          `json:"addrs"`    // list of all IP addresses
	Metrics   map[string]int64  `json:"metrics"`  // agent metrics
	CpuUsage  float64           `json:"cpuusage"` // percentage of cpu used
	MemUsage  float64           `json:"memusage"` // Avg RAM used in MB
}

AgentStatus data type

func (*AgentStatus) String

func (a *AgentStatus) String() string

String provides string representation of given agent status

type CallerFunc

type CallerFunc func(agent, src, dst string)

CallerFunc type func(string, string, string)

func AuthzDecorator

func AuthzDecorator(fn CallerFunc, policy string) CallerFunc

AuthzDecorator provides skeleton for performing authorization check with given function

type Catalog

type Catalog struct {
	Type     string `json:"type"`     // catalog type, e.g. sqlite3, etc.
	Uri      string `json:"uri"`      // catalog uri, e.g. file.db
	Login    string `json:"login"`    // database login
	Password string `json:"password"` // database password
	Owner    string `json:"owner"`    // used by ORACLE DB, defines owner of the database
}

Catalog represents Trivial File Catalog (TFC) of the model

var TFC Catalog

TFC stands for Trivial File Catalog

func (*Catalog) Add

func (c *Catalog) Add(entry CatalogEntry) error

Add method adds entry to a catalog

func (*Catalog) Dump

func (c *Catalog) Dump() []byte

Dump method returns TFC dump in CSV format

func (*Catalog) Exec

func (c *Catalog) Exec(stm, status, rid string) error

Exec method update db upto three times

func (*Catalog) Files

func (c *Catalog) Files(dataset, block, lfn string) []string

Files returns list of files for specified conditions

func (*Catalog) GetStatus

func (c *Catalog) GetStatus(rid string) (string, error)

GetStatus gets the status of request

func (*Catalog) GetTransfers

func (c *Catalog) GetTransfers(time0, time1 string) ([]TransferData, error)

GetTransfers provide details about transfers in given time interval

func (*Catalog) InsertRequest

func (c *Catalog) InsertRequest(r TransferRequest) error

InsertRequest inserts new request

func (*Catalog) InsertTransfers

func (c *Catalog) InsertTransfers(time int64, cpuUsage float64, memUsage float64, throughput float64)

InsertTransfers inserts new row to TRANSFERS table

func (*Catalog) ListRequest

func (c *Catalog) ListRequest(query string) ([]TransferRequest, error)

ListRequest gets specific type of transfer requests according to status

func (*Catalog) PfnFiles

func (c *Catalog) PfnFiles(dataset, block, lfn string) []string

PfnFiles returns list of files for specified conditions

func (*Catalog) Records

func (c *Catalog) Records(req TransferRequest) []CatalogEntry

Records returns catalog records for a given transfer request

func (*Catalog) RetrieveRequest

func (c *Catalog) RetrieveRequest(r *TransferRequest) error

RetrieveRequest gets the request details based on request id

func (*Catalog) Snapshot

func (c *Catalog) Snapshot() map[string][]string

Snapshot returns a snapshot of the TFC catalog and return it as a map which holds table names and list of rows where each row is represented as a comma separated values

func (*Catalog) Transfers

func (c *Catalog) Transfers(time0, time1 string) []CatalogEntry

Transfers method returns transfers of the agent in given time interval

func (*Catalog) UpdateRequest

func (c *Catalog) UpdateRequest(rid string, status string) error

UpdateRequest updates the status of request

type CatalogEntry

type CatalogEntry struct {
	Lfn          string `json:"lfn"`          // lfn stands for Logical File Name
	Pfn          string `json:"pfn"`          // pfn stands for Physical File Name
	Dataset      string `json:"dataset"`      // dataset represents collection of blocks
	Block        string `json:"block"`        // block idetify single block within a dataset
	Bytes        int64  `json:"bytes"`        // size of the files in bytes
	Hash         string `json:"hash"`         // hash represents checksum of the pfn
	TransferTime int64  `json:"transferTime"` // transfer time
	Timestamp    int64  `json:"timestamp"`    // time stamp
}

CatalogEntry represents an entry in TFC

func GetRecords

func GetRecords(tr TransferRequest, agent string) ([]CatalogEntry, error)

GetRecords get catalog entries from given agent

func (*CatalogEntry) String

func (c *CatalogEntry) String() string

String provides string representation of CatalogEntry

type CentralCatalog

type CentralCatalog struct {
	Path string `json:"path"` // path to central catalog
}

CentralCatalog represent structure of Central Catalog it is represneted by list of tables where each table contains list of records

CC reprents isntance of CentralCatalog

func (*CentralCatalog) Get

func (c *CentralCatalog) Get(table string) ([]byte, error)

Get method gets records from Central Catalog for a given table

func (*CentralCatalog) Put

func (c *CentralCatalog) Put(table string, records []string) error

Put method puts given table-records into Central Catalog

type Decorator

type Decorator func(Request) Request

Decorator wraps a request with extra behavior

func Delete

func Delete() Decorator

Delete returns a Decorator that deletes request from heap

func Logging

func Logging(l *logs.Logger) Decorator

Logging returns a Decorator that logs client requests

func Pause

func Pause(interval time.Duration) Decorator

Pause returns a Decorator that pauses request for a given time interval

func PullTransfer

func PullTransfer() Decorator

PullTransfer returns a Decorator that performs request transfers by pull model

func PushTransfer

func PushTransfer() Decorator

PushTransfer returns a Decorator that performs request transfers

func Store

func Store() Decorator

Store returns a Decorator that stores request

func Tracer

func Tracer() Decorator

Tracer returns a Decorator that traces given request

type Dispatcher

type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	JobPool    chan chan Job
	MaxWorkers int
	BufferSize int
}

Dispatcher implementation

func NewDispatcher

func NewDispatcher(maxWorkers int, bufferSize int) *Dispatcher

NewDispatcher returns new instance of Dispatcher type

func (*Dispatcher) StorageRunner

func (d *Dispatcher) StorageRunner()

StorageRunner function starts the worker and dispatch it as go-routine

func (*Dispatcher) TransferRunner

func (d *Dispatcher) TransferRunner()

TransferRunner function starts the worker and dispatch it as go-routine

type FileSystemStager

type FileSystemStager struct {
	Pool    string  // pool area on file system
	Catalog Catalog // TFC catalog of the agent
}

FileSystemStager defines simple file-based stager

var AgentStager *FileSystemStager

AgentStager represent instance of agent's stager

func NewStager

func NewStager(pool string, catalog Catalog) *FileSystemStager

NewStager returns new instance of Dispatcher type

func (*FileSystemStager) Access

func (s *FileSystemStager) Access(lfn string) string

Access provides access path to the file

func (*FileSystemStager) Exist

func (s *FileSystemStager) Exist(lfn string) bool

Exist implements exists functionality of the Stager interface

func (*FileSystemStager) Read

func (s *FileSystemStager) Read(lfn string, chunk int64) ([]byte, error)

Read implements read functionality of the Stager interface this function get file associated with lfn from the pool area and return its content

func (*FileSystemStager) Stage

func (s *FileSystemStager) Stage(lfn string) error

Stage implements stage functionality of the Stager interface this function takes given lfn and place into internal pool area

func (*FileSystemStager) Write

func (s *FileSystemStager) Write(data []byte, lfn string) (string, int64, string, error)

Put implements put functionality of the Stager interface

type Item

type Item struct {
	Value TransferRequest
	// contains filtered or unexported fields
}

An Item is something we manage in a priority queue.

type Job

type Job struct {
	TransferRequest TransferRequest `json:"request"` // TransferRequest
	Action          string          `json:"action"`  // Action to apply to TransferRequest, e.g. delete or transfer
}

Job represents the job to be run

func (*Job) RequestFails

func (j *Job) RequestFails()

RequestFails function to handle failed jobs

func (*Job) RequestSuccess

func (j *Job) RequestSuccess()

RequestSuccess function to handle success jobs

func (*Job) String

func (j *Job) String() string

String method return string representation of transfer request

func (*Job) UpdateRequest

func (j *Job) UpdateRequest(status string)

UpdateRequest sends request to main agent to update request status in its persistent store (REQUESTS table)

type Metrics

type Metrics struct {
	In         metrics.Counter      // number of live transfer requests
	Failed     metrics.Counter      // number of failed transfer requests
	Total      metrics.Counter      // total number of transfer requests
	TotalBytes metrics.Counter      // total number of bytes by this agent
	Bytes      metrics.Counter      // number of bytes in progress
	CpuUsage   metrics.GaugeFloat64 // CPU usage in percentage
	MemUsage   metrics.GaugeFloat64 // Memory usage in MB
	Tick       metrics.Counter      // Store cpu ticks
	MaxTick    int64                // Max tick after which reset metrics
}

Metrics of the agent

var AgentMetrics Metrics

AgentMetrics defines various metrics about the agent work

func (*Metrics) GetCurrentStats

func (m *Metrics) GetCurrentStats()

GetCurrentStats function to get current system usage

func (*Metrics) GetUsage

func (m *Metrics) GetUsage() (float64, float64, error)

GetUsage method to get cpu and Memory usage

func (*Metrics) String

func (m *Metrics) String() string

String representation of Metrics

func (*Metrics) ToDict

func (m *Metrics) ToDict() map[string]int64

ToDict converts Metrics structure to a map

type PriorityQueue

type PriorityQueue []*Item

A PriorityQueue implements heap.Interface and holds Items.

var RequestQueue PriorityQueue

RequestQueue is a queue to sort the requests according to priority.

func (*PriorityQueue) Delete

func (pq *PriorityQueue) Delete(rid string) bool

Delete request from PriorityQueue. The complexity is O(n) where n = heap.Len()

func (*PriorityQueue) GetAllRequest

func (pq *PriorityQueue) GetAllRequest() []TransferRequest

GetAllRequest gets the entire list of requests

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

Len provides length of PriorityQueue

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

Less provides less function for PriorityQueue

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

Pop provides pop function for PriorityQueue

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

Push provides push function for PriorityQueue

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

Swap provides swap function for PriorityQueue

type Processor

type Processor struct {
}

Processor is an object who process' given task The logic of the Processor should be implemented.

func (*Processor) Process

func (e *Processor) Process(t *TransferRequest) error

Process defines execution process for a given task

type Record

type Record map[string]interface{}

Record represent main DB record we work with

var DBSQL Record

DBSQL represent common record we get from DB SQL statement

func LoadSQL

func LoadSQL(dbtype, owner string) Record

LoadSQL is a helper function to load DBS SQL statements

type Request

type Request interface {
	Process(*TransferRequest) error
}

Request interface defines a task process

func Decorate

func Decorate(r Request, ds ...Decorator) Request

Decorate decorates a Request r with all given Decorators

type RequestFunc

type RequestFunc func(*TransferRequest) error

RequestFunc is a function type that implements the Request interface

func (RequestFunc) Process

func (f RequestFunc) Process(t *TransferRequest) error

Process is a method of TransferRequest

type Router

type Router struct {
	CronInterval     string                 // Helps to set hourly based cron job
	LinearRegression *regression.Regression // machine learning model
	CSVfile          string                 // historical data file
	Agents           *map[string]string     // list of connected agents
}

Router structure defines attributes of the router

var AgentRouter Router

AgentRouter helps to call router's methods

func (*Router) FindSource

func (r *Router) FindSource(tr *TransferRequest) ([]SourceStats, int, error)

FindSource finds appropriate source agent(s) for given transfer request

func (*Router) InitialTrain

func (r *Router) InitialTrain()

InitialTrain function to train router by previous data(After restarting it)

type SourceStats

type SourceStats struct {
	SrcUrl   string
	SrcAlias string

	Jobs []Job
	// contains filtered or unexported fields
}

SourceStats structure to store source informations

func GetUnionCatalog

func GetUnionCatalog(tRequest *TransferRequest) (*set.SetNonTS, []SourceStats, map[string][]string)

GetUnionCatalog function to get the union of files

type Stager

type Stager interface {
	Stage(lfn string) error
	Read(lfn string, chunk int64) ([]byte, error)
	Write([]byte) (string, int64, string)
	Exist(lfn string) bool
	Access(lfn string) string
}

Stager interface defines abstract functionality of the file stage system

type TransferData

type TransferData struct {
	Timestamp  int64   `json:"timestamp"`  // Helps to get data historically
	CpuUsage   float64 `json:"cpu"`        // percentage of cpu used
	MemUsage   float64 `json:"ram"`        // ram used in MB
	Throughput float64 `json:"throughput"` // network throughput during transfer in MB
}

TransferData helps to structure the rows of transfers table

type TransferRequest

type TransferRequest struct {
	TimeStamp int64  `json:"ts"`       // timestamp of the request
	Lfn       string `json:"file"`     // LFN name to be transferred
	Block     string `json:"block"`    // block name to be transferred
	Dataset   string `json:"dataset"`  // dataset name to be transferred
	SrcUrl    string `json:"srcUrl"`   // source agent URL which initiate the transfer
	SrcAlias  string `json:"srcAlias"` // source agent name
	DstUrl    string `json:"dstUrl"`   // destination agent URL which will consume the transfer
	DstAlias  string `json:"dstAlias"` // destination agent name
	RegUrl    string `json:"regUrl"`   // registration agent url (main agent)
	RegAlias  string `json:"regAlias"` // registration agent name
	Delay     int    `json:"delay"`    // transfer delay time, i.e. post-pone transfer
	Id        string `json:"id"`       // unique id of each request
	Priority  int    `json:"priority"` // priority of request
	Status    string `json:"status"`   // Identify the category of request
}

TransferRequest data type

func ResolveRequest

func ResolveRequest(t TransferRequest) []TransferRequest

ResolveRequest will resolve input transfer request into series of requests with known lfn/block/dataset triplets

func (*TransferRequest) Clone

func (t *TransferRequest) Clone() TransferRequest

Clone provides copy of transfer request

func (*TransferRequest) Delete

func (t *TransferRequest) Delete() error

Delete performs deletion of transfer request

func (*TransferRequest) RunPull

func (t *TransferRequest) RunPull() error

RunPull method perform a job on transfer request. It will use pull model

func (*TransferRequest) RunPush

func (t *TransferRequest) RunPush() error

RunPush method perform a job on transfer request. It will use push model

func (*TransferRequest) Store

func (t *TransferRequest) Store() error

Store method stores a job in heap and db

func (*TransferRequest) String

func (t *TransferRequest) String() string

String method return string representation of transfer request

func (*TransferRequest) UUID

func (t *TransferRequest) UUID() string

UUID generates unique id for transfer request

type Worker

type Worker struct {
	Id         int
	JobPool    chan chan Job
	JobChannel chan Job
	// contains filtered or unexported fields
}

Worker represents the worker that executes the job

func NewWorker

func NewWorker(wid int, bufferSize int, jobPool chan chan Job) Worker

NewWorker return a new instance of the Worker type

func (Worker) Start

func (w Worker) Start()

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it

func (Worker) Stop

func (w Worker) Stop()

Stop signals the worker to stop listening for work requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL