bakapy

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2015 License: GPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	STATE_WAIT_TASK_ID = iota
	STATE_WAIT_FILENAME
	STATE_WAIT_DATA
	STATE_RECEIVING
	STATE_END
)

Storage connection states

View Source
const JOB_FINISH = "_@!_JOB_FINISH_!@_"
View Source
const STORAGE_AUTH_TIMEOUT = 30 // seconds

Waiting for client authentication

View Source
const STORAGE_FILENAME_LEN_LEN = 4

Length of filename length header

View Source
const STORAGE_READ_BUFSIZE = 4096
View Source
const STORAGE_TASK_ID_LEN = 36

Variables

View Source
var JOB_TEMPLATE = template.Must(template.New("job").Parse(`
##
# Common header
##
set -e

TASK_NAME='{{.Job.Name}}'

_send_file(){
    local name="$1"

    exec 3<>/dev/tcp/{{.ToHost}}/{{.ToPort}}
    echo -n {{.Job.TaskId}}$(printf "%0{{.FILENAME_LEN_LEN}}d" ${#name})${name} >&3
    cat - >&3
    exec 3>&-
}

_finish(){
    echo > /dev/null
}

_fail(){
    test ! -z "$1" && echo "command failed at line $1" >&2
    exit 1
}

function error_exit {
    if [ "$?" != "0" ]; then
        echo "FAILED"
        _fail
        exit 1
    fi
}

trap '_fail ${LINENO}' ERR

##
# Command
##
`))
View Source
var MAIL_TEMPLATE_JOB_FAILED = template.Must(template.New("mail").Parse(`From: {{ .From }}
To: {{.To}}
Subject: {{.Subject}}
Content-Type: text/plain;charset=utf8

Job {{.JobName}} failed:
{{.Message}}

Output:
-----------------------------
{{.Output}}
-----------------------------

Errput:
-----------------------------
{{.Errput}}
-----------------------------
`))

Functions

func RunJob

func RunJob(jobName string, jConfig *JobConfig, gConfig *Config, storage *Storage) string

func SendFailedJobNotification

func SendFailedJobNotification(cfg SMTPConfig, meta *JobMetadata) error

func SetupLogging

func SetupLogging(logLevel string) error

Types

type BashExecutor added in v0.8.0

type BashExecutor struct {
	Args map[string]string
	Host string
	Port uint
	Sudo bool
	// contains filtered or unexported fields
}

func NewBashExecutor added in v0.8.0

func NewBashExecutor(args map[string]string, host string, port uint, sudo bool) *BashExecutor

func (*BashExecutor) Execute added in v0.8.0

func (e *BashExecutor) Execute(script []byte, output io.Writer, errput io.Writer) error

func (*BashExecutor) GetCmd added in v0.8.0

func (e *BashExecutor) GetCmd() (*exec.Cmd, error)

type Config

type Config struct {
	IncludeJobs []string `yaml:"include_jobs"`
	Listen      string
	StorageDir  string     `yaml:"storage_dir"`
	MetadataDir string     `yaml:"metadata_dir"`
	CommandDir  string     `yaml:"command_dir"`
	SMTP        SMTPConfig `yaml:"smtp"`
	Jobs        map[string]*JobConfig
}

func NewConfig

func NewConfig() *Config

func ParseConfig

func ParseConfig(configPath string) (*Config, error)

func (*Config) PrettyFmt

func (cfg *Config) PrettyFmt() []byte

type Executer added in v0.8.0

type Executer interface {
	Execute(script []byte, output io.Writer, errput io.Writer) error
}

type Job

type Job struct {
	Name        string
	TaskId      TaskId
	StorageAddr string
	CommandDir  string
	// contains filtered or unexported fields
}

func NewJob

func NewJob(name string, cfg *JobConfig, StorageAddr string, commandDir string, jober Jober, executor Executer) *Job

func (*Job) Run

func (job *Job) Run() *JobMetadata

type JobConfig

type JobConfig struct {
	Sudo       bool
	Disabled   bool
	Gzip       bool
	MaxAgeDays int           `yaml:"max_age_days"`
	MaxAge     time.Duration `yaml:"max_age"`
	Namespace  string
	Host       string
	Port       uint
	Command    string
	Args       map[string]string
	RunAt      RunAtSpec `yaml:"run_at"`
	// contains filtered or unexported fields
}

func (*JobConfig) Sanitize

func (jobConfig *JobConfig) Sanitize() error

type JobMetadata

type JobMetadata struct {
	JobName    string
	Gzip       bool
	Namespace  string
	TaskId     TaskId
	Command    string
	Success    bool
	Message    string
	TotalSize  int64
	StartTime  time.Time
	EndTime    time.Time
	ExpireTime time.Time
	Files      []JobMetadataFile
	Pid        int
	RetCode    uint
	Script     []byte
	Output     []byte
	Errput     []byte
	Config     JobConfig
}

func LoadJobMetadata

func LoadJobMetadata(path string) (*JobMetadata, error)

func (*JobMetadata) AvgSpeed

func (metadata *JobMetadata) AvgSpeed() int64

func (*JobMetadata) Duration

func (metadata *JobMetadata) Duration() time.Duration

func (*JobMetadata) Save

func (metadata *JobMetadata) Save(saveTo string) error

type JobMetadataFile

type JobMetadataFile struct {
	Name       string
	Size       int64
	SourceAddr string
	StartTime  time.Time
	EndTime    time.Time
}

func (*JobMetadataFile) String

func (m *JobMetadataFile) String() string

type JobTemplateContext

type JobTemplateContext struct {
	Job              *Job
	FILENAME_LEN_LEN uint
}

func (*JobTemplateContext) ToHost

func (jctx *JobTemplateContext) ToHost() string

func (*JobTemplateContext) ToPort

func (jctx *JobTemplateContext) ToPort() string

type Jober added in v0.8.0

type Jober interface {
	AddJob(currentJob *StorageCurrentJob)
	RemoveJob(id TaskId)
	WaitJob(taskId TaskId)
}

type NotificationTemplateContext

type NotificationTemplateContext struct {
	From    string
	To      string
	Subject string
	JobName string
	Message string
	Output  string
	Errput  string
}

type RemoteReader added in v0.8.0

type RemoteReader interface {
	Read(p []byte) (n int, err error)
	RemoteAddr() net.Addr
}

type RunAtSpec

type RunAtSpec struct {
	Second  string
	Minute  string
	Hour    string
	Day     string
	Month   string
	Weekday string
}

func (*RunAtSpec) SchedulerString

func (r *RunAtSpec) SchedulerString() string

type SMTPConfig

type SMTPConfig struct {
	Host string
	Port int
}

type Storage

type Storage struct {
	*StorageJobManager
	RootDir     string
	MetadataDir string
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(cfg *Config) *Storage

func (*Storage) CleanupExpired

func (stor *Storage) CleanupExpired() error

func (*Storage) HandleConnection added in v0.8.0

func (stor *Storage) HandleConnection(conn StorageProtocolHandler) error

func (*Storage) Listen

func (stor *Storage) Listen() net.Listener

func (*Storage) Serve

func (stor *Storage) Serve(ln net.Listener)

func (*Storage) Start

func (stor *Storage) Start()

type StorageConn

type StorageConn struct {
	RemoteReader

	State StorageConnState
	// contains filtered or unexported fields
}

func NewStorageConn

func NewStorageConn(rReader RemoteReader, logger *logging.Logger) *StorageConn

func (*StorageConn) ReadContent added in v0.8.0

func (sc *StorageConn) ReadContent(output io.Writer) (int64, error)

func (*StorageConn) ReadFilename

func (sc *StorageConn) ReadFilename() (string, error)

func (*StorageConn) ReadTaskId

func (sc *StorageConn) ReadTaskId() (TaskId, error)

type StorageConnState

type StorageConnState uint8

type StorageCurrentJob

type StorageCurrentJob struct {
	TaskId      TaskId
	FileAddChan chan JobMetadataFile
	Namespace   string
	Gzip        bool
}

type StorageJobManager

type StorageJobManager struct {
	// contains filtered or unexported fields
}

func NewStorageJobManager

func NewStorageJobManager() *StorageJobManager

func (*StorageJobManager) AddConnection

func (m *StorageJobManager) AddConnection(id TaskId)

func (*StorageJobManager) AddJob

func (m *StorageJobManager) AddJob(job *StorageCurrentJob)

func (*StorageJobManager) GetJob

func (*StorageJobManager) JobConnectionCount

func (m *StorageJobManager) JobConnectionCount(taskId TaskId) int

func (*StorageJobManager) RemoveConnection

func (m *StorageJobManager) RemoveConnection(id TaskId)

func (*StorageJobManager) RemoveJob

func (m *StorageJobManager) RemoveJob(id TaskId)

func (*StorageJobManager) WaitJob added in v0.8.0

func (m *StorageJobManager) WaitJob(taskId TaskId)

type StorageProtocolHandler added in v0.8.0

type StorageProtocolHandler interface {
	ReadTaskId() (TaskId, error)
	ReadFilename() (string, error)
	ReadContent(output io.Writer) (int64, error)
	RemoteAddr() net.Addr
}

type TaskId

type TaskId string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL