hydra

package module
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2021 License: MIT Imports: 9 Imported by: 0

README

The Go library for parallel data pipelines.

Installation

go get github.com/agrebenn1kov/hydra

Usage

//main.go

package main

import (
	"fmt"
	"log"

	"github.com/agrebenn1kov/hydra/scheduler"
	"github.com/agrebenn1kov/hydra/task"
	"github.com/agrebenn1kov/hydra/transfer"
	"github.com/agrebenn1kov/hydra/utils"
	"github.com/agrebenn1kov/hydra/worker"
)
var (

	mysqlConn    = "root:password@/database"
	pgConn = "user=postgres password=awesomepassword dbname=test sslmode=disable"
)

func hello() error {
	fmt.Println("Hello world")
	return nil
}

func main() {
	
	wrk1, err := worker.NewWorker("Test_Worker1", "2021-05-04 12:26:00", "1h32m", "Europe/Moscow")
	if err != nil {
		log.Fatal(err)
    }
	wrk1.AddTasks(
		task.FuncTask(hello).AsParallel(),
		task.SqlTask("mysql", mysqlConn, "./path/to/query0.sql"),
		task.SqlTask("mysql", mysqlConn, "./path/to/query1.sql"),
		task.SqlTask("mysql", mysqlConn, "./path/to/query0.sql"),
		transfer.MysqlQueryToPgsql("./path/to/query.sql", mysqlConn, "targetTableName", pgConn).AsParallel(),
		task.SqlTask("postgres", pgConn, "./path/to/query2.sql"),
		task.MailTask(
			utils.NewMailConfig("username@gmail.com", "smtp.gmail.com", "25", "myawesomepassword"),
			"from@gmail.com",
			"Test_Worker1 successfully completed",
			[]string{"foo@mail.com","bar@gmail.com","baz@mail.com"}).WaitAll(),
	)

	wrk2, err := worker.NewWorker("Test_Worker2", "2021-05-04 12:26:00", "4h", "America/New_York")
	wrk2.AddTasks(
		task.ShellTask("./path/to/script.sh").AsParallel(),
		task.SqlTask("mysql", mysqlConn, "./path/to/query3.sql"),
		task.SqlTask("mysql", mysqlConn, "./path/to/query4.sql"),
		task.SqlTask("mysql", mysqlConn, "./path/to/query5.sql"),
		transfer.PgsqlTableToMysql("sourceTableName", pgConn, "targetTableName", mysqlConn).AsParallel(),
		task.SqlTask("postgres", pgConn, "./path/to/query6.sql"),
		task.MailTask(
			utils.NewMailConfig("username@gmail.com", "smtp.gmail.com", "25", "myawesomepassword"),
			"from@gmail.com",
			"Test_Worker2 successfully completed",
			[]string{"foo@mail.com","bar@gmail.com","baz@mail.com"}).WaitAll(),
	)

	scheduler, err := scheduler.NewScheduler([]*worker.Worker{wrk1, wrk2}, "user=postgres password=password dbname=logs sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	if err:= scheduler.Start(); err != nil {
		log.Fatal(err)
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Base

type Base interface {
	Type() task.TaskType
	CheckError() bool
	ContainsTempFiles() bool
}

type ErrorChecker

type ErrorChecker interface {
	CheckTaskError() error
}

type Executor

type Executor interface {
	Execute() error
}

type Job

type Job struct {
	// contains filtered or unexported fields
}

func NewJob

func NewJob(name, runAt, interval, timeZone string) (*Job, error)

func (*Job) AddTasks

func (w *Job) AddTasks(t ...Task)

func (*Job) Name

func (w *Job) Name() string

type Logger

type Logger interface {
	Info(args ...interface{})
	Debug(args ...interface{})
	Warn(args ...interface{})
	Error(args ...interface{})
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(conn string, logger Logger) (*Scheduler, error)

func (*Scheduler) AddJobs

func (s *Scheduler) AddJobs(jobs ...*Job) error

func (*Scheduler) SetLogger

func (s *Scheduler) SetLogger(log Logger)

func (*Scheduler) Start

func (s *Scheduler) Start(workersCount int) error

type Task

type Task interface {
	Executor
	Base
}

type TempFilesRemover

type TempFilesRemover interface {
	TempFile() *os.File
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL