The Go library for parallel data pipelines.
Installation
go get github.com/agrebenn1kov/hydra
Usage
//main.go
package main
import (
"fmt"
"log"
"github.com/agrebenn1kov/hydra/scheduler"
"github.com/agrebenn1kov/hydra/task"
"github.com/agrebenn1kov/hydra/transfer"
"github.com/agrebenn1kov/hydra/utils"
"github.com/agrebenn1kov/hydra/worker"
)
var (
mysqlConn = "root:password@/database"
pgConn = "user=postgres password=awesomepassword dbname=test sslmode=disable"
)
func hello() error {
fmt.Println("Hello world")
return nil
}
func main() {
wrk1, err := worker.NewWorker("Test_Worker1", "2021-05-04 12:26:00", "1h32m", "Europe/Moscow")
if err != nil {
log.Fatal(err)
}
wrk1.AddTasks(
task.FuncTask(hello).AsParallel(),
task.SqlTask("mysql", mysqlConn, "./path/to/query0.sql"),
task.SqlTask("mysql", mysqlConn, "./path/to/query1.sql"),
task.SqlTask("mysql", mysqlConn, "./path/to/query0.sql"),
transfer.MysqlQueryToPgsql("./path/to/query.sql", mysqlConn, "targetTableName", pgConn).AsParallel(),
task.SqlTask("postgres", pgConn, "./path/to/query2.sql"),
task.MailTask(
utils.NewMailConfig("username@gmail.com", "smtp.gmail.com", "25", "myawesomepassword"),
"from@gmail.com",
"Test_Worker1 successfully completed",
[]string{"foo@mail.com","bar@gmail.com","baz@mail.com"}).WaitAll(),
)
wrk2, err := worker.NewWorker("Test_Worker2", "2021-05-04 12:26:00", "4h", "America/New_York")
wrk2.AddTasks(
task.ShellTask("./path/to/script.sh").AsParallel(),
task.SqlTask("mysql", mysqlConn, "./path/to/query3.sql"),
task.SqlTask("mysql", mysqlConn, "./path/to/query4.sql"),
task.SqlTask("mysql", mysqlConn, "./path/to/query5.sql"),
transfer.PgsqlTableToMysql("sourceTableName", pgConn, "targetTableName", mysqlConn).AsParallel(),
task.SqlTask("postgres", pgConn, "./path/to/query6.sql"),
task.MailTask(
utils.NewMailConfig("username@gmail.com", "smtp.gmail.com", "25", "myawesomepassword"),
"from@gmail.com",
"Test_Worker2 successfully completed",
[]string{"foo@mail.com","bar@gmail.com","baz@mail.com"}).WaitAll(),
)
scheduler, err := scheduler.NewScheduler([]*worker.Worker{wrk1, wrk2}, "user=postgres password=password dbname=logs sslmode=disable")
if err != nil {
log.Fatal(err)
}
if err:= scheduler.Start(); err != nil {
log.Fatal(err)
}
}