gwp

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2020 License: Apache-2.0 Imports: 6 Imported by: 0

README

Build codecov Codacy Badge Go Report Card go.dev reference license Release Go version

gwp - Go Worker Pool

This package wants to offer the community and implement workers with the pure Go code for Golangers, without any other dependency just Uuid. It allows you to expose an http server to answer the response of health checks, stats, debug pprof and the main "workers". Workers for consumer queues, channel processes and other things that you think worker needs.

image

Prerequisites

Golang version >= 1.14

Features

  • Setup http server to monitoring yours;
    • /stats with workers, showing statuses her, number of goroutines, number of cpus and more;
    • /health-check that look for status of workers;
    • /debug/pprof expose all endpoints of investivate golang runtime http;
  • Allow multiple concurrencies of work, handle errors and restart always worker;

Documentation

For examples visit godoc#pkg-examples

For GoDoc reference, visit pkg.go.dev

Examples

Simple Worker
package main

import (
	"errors"
	"github.com/dalmarcogd/gwp"
	"github.com/dalmarcogd/gwp/pkg/worker"
	"log"
	"time"
)

func main() {
	if err := gwp.
		New().
		Stats().
		HealthCheck().
		DebugPprof().
		HandleError(func(w *worker.Worker, err error) {
			log.Printf("Worker [%s] error: %s", w.Name, err)
		}).
		Worker(
			"w1",
			func() error {
				<-time.After(10 * time.Second)
				return errors.New("test")
			},
			worker.WithRestartAlways()).
		Worker(
			"w2",
			func() error {
				<-time.After(30 * time.Second)
				return nil
			}).
		Worker(
			"w3",
			func() error {
				<-time.After(1 * time.Minute)
				return errors.New("test")
			}).
		Run(); err != nil {
		panic(err)
	}
}
Simple Worker Consume Channel
package main

import (
	"github.com/dalmarcogd/gwp"
	"github.com/dalmarcogd/gwp/pkg/worker"
	"log"
	"time"
)

func main() {

	ch := make(chan bool, 1)

	if err := gwp.
		New().
		Stats().
		HealthCheck().
		DebugPprof().
		HandleError(func(w *worker.Worker, err error) {
			log.Printf("Worker [%s] error: %s", w.Name, err)
		}).
		Worker(
			"w1",
			func() error {
				<-time.After(10 * time.Second)
				ch <- true
				log.Printf("Produced %t", true)
				return nil
			},
			1,
			true).
		Worker(
			"w2",
			func() error {
				for {
					select {
					case r := <-ch:
						log.Printf("Received %t", r)
					}
				}
			},
			1,
			false).
		Run(); err != nil {
		panic(err)
	}
}
Simple Worker Consume Buffered Channel
package main

import (
	"github.com/dalmarcogd/gwp"
	"github.com/dalmarcogd/gwp/pkg/worker"
	"log"
	"time"
)

func main() {

	numberOfConcurrency := 10
	ch := make(chan bool, numberOfConcurrency)

	if err := gwp.
		New().
		Stats().
		HealthCheck().
		DebugPprof().
		HandleError(func(w *worker.Worker, err error) {
			log.Printf("Worker [%s] error: %s", w.Name, err)
		}).
		Worker(
			"w1",
			func() error {
				<-time.After(10 * time.Second)
				ch <- true
				ch <- true
				ch <- true
				ch <- true
				ch <- true
				ch <- true
				ch <- true
				log.Printf("Produced %t", true)
				return nil
			},

			worker.WithRestartAlways()).
		Worker(
			"w2",
			func() error {
				for {
					select {
					case r := <-ch:
						log.Printf("Received %t", r)
					}
				}
			},
			worker.WithConcurrency(numberOfConcurrency)).
		Run(); err != nil {
		panic(err)
	}
}
Simple Worker Consume SQS
package main

import (
	"fmt"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/dalmarcogd/gwp"
	"github.com/dalmarcogd/gwp/pkg/worker"
	"log"
	"strconv"
)

func main() {

	params := &sqs.CreateQueueInput{
		QueueName: aws.String("test-consume-sqs"), // Required
	}
	ss, _ := session.NewSession(&aws.Config{
		Endpoint: aws.String("http://localhost:9324"),
		Region:   aws.String("us-east-1"),
	})
	svc := sqs.New(ss)

	var resp, err = svc.CreateQueue(params)

	if err != nil {
		fmt.Println(err.Error())
		panic(err)
	}
	fmt.Println(resp)

	queueURL := aws.String("http://localhost:9324/queue/test-consume-sqs")

	for i := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
		paramsSend := &sqs.SendMessageInput{
			MessageBody: aws.String("Testing " + strconv.Itoa(i)), // Required
			QueueUrl:    queueURL,                                 // Required
		}
		respSend, err := svc.SendMessage(paramsSend)
		if err != nil {
			fmt.Println(err.Error())
			panic(err)
		}
		fmt.Println(respSend)
	}

	if err := gwp.
		New().
		Stats().
		HealthCheck().
		DebugPprof().
		HandleError(func(w *worker.Worker, err error) {
			log.Printf("Worker [%s] error: %s", w.Name, err)
		}).
		Worker("w2", func() error {
			params := &sqs.ReceiveMessageInput{
				QueueUrl:            queueURL, // Required
				MaxNumberOfMessages: aws.Int64(10),
				VisibilityTimeout:   aws.Int64(20),
			}
			resp, err := svc.ReceiveMessage(params)

			if err != nil {
				fmt.Println(err.Error())
				return err
			}
			fmt.Println(resp.Messages)
			for _, msg := range resp.Messages {
				fmt.Println(aws.StringValue(msg.Body))
			}
			return nil
		}, worker.WithRestartAlways()).
		Run(); err != nil {
		panic(err)
	}
}
Simple Worker Consume Rabbit
package main

import (
	"fmt"
	"github.com/dalmarcogd/gwp"
	"github.com/dalmarcogd/gwp/pkg/worker"
	"github.com/streadway/amqp"
	"log"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {

	connection, err := amqp.Dial("amqp://rabbitmq:rabbitmq@localhost:5672//")

	failOnError(err, "Error when get connection")
	defer connection.Close()

	channel, err := connection.Channel()
	failOnError(err, "Error when get channel")
	defer channel.Close()

	queue, err := channel.QueueDeclare(
		"test-consume", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		nil,            // arguments
	)
	failOnError(err, "Error when declare a queue")

	for i := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
		failOnError(channel.Publish("", queue.Name, false, false, amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			Body:         []byte(fmt.Sprint(i)),
		}), "fail on publishing")
	}

	if err := gwp.
		New().
		Stats().
		HealthCheck().
		DebugPprof().
		HandleError(func(w *worker.Worker, err error) {
			log.Printf("Worker [%s] error: %s", w.Name, err)
		}).
		Worker("w2", func() error {
			msgs, err := channel.Consume(queue.Name,
				"",
				true,
				false,
				false,
				false,
				nil)
			failOnError(err, "Error when create consumer")

			for msg := range msgs {
				fmt.Println(string(msg.Body))
			}
			return nil
		}, worker.WithRestartAlways()).
		Run(); err != nil {
		panic(err)
	}
}
Simple Worker Consume Kafka
package main

import (
	"context"
	"fmt"
	"github.com/dalmarcogd/gwp"
	"github.com/dalmarcogd/gwp/pkg/worker"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	topic := "teste"
	partition := 1

	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	failOnError(err, "Fail when create connection")

	_ = conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	_, _ = conn.WriteMessages(
		kafka.Message{Value: []byte("one!")},
		kafka.Message{Value: []byte("two!")},
		kafka.Message{Value: []byte("three!")},
	)

	defer conn.Close()

	_ = conn.SetReadDeadline(time.Now().Add(10 * time.Second))

	if err := gwp.
		New().
		Stats().
		HealthCheck().
		DebugPprof().
		HandleError(func(w *worker.Worker, err error) {
			log.Printf("Worker [%s] error: %s", w.Name, err)
		}).
		Worker("w2", func() error {
			batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
			b := make([]byte, 10e3)            // 10KB max per message
			for {
				_, err := batch.Read(b)
				if err != nil {
					break
				}
				fmt.Println(string(b))
			}

			_ = batch.Close()
			return nil
		},  worker.WithRestartAlways()).
		Run(); err != nil {
		panic(err)
	}
}

Documentation

Overview

This package wants to offer the community and implement workers with the pure Go code for Golangers, without any other dependency just Uuid. It allows you to expose an http server to answer the response of health checks, stats, debug pprof and the main "workers". Workers for consumer queues, channel processes and other things that you think worker needs.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type WorkerServer

type WorkerServer struct {
	// contains filtered or unexported fields
}

WorkerServer is a server that administrate the workers and the monitor

func New

func New() *WorkerServer

New build an #WorkerServer with #defaultConfig

func NewWithConfig

func NewWithConfig(configs map[string]interface{}) *WorkerServer

NewWithConfig build an #WorkerServer by the settings

func (*WorkerServer) CheckHealth added in v0.1.11

func (s *WorkerServer) CheckHealth(check func() bool) *WorkerServer

CheckHealth includes to server checker the health

func (*WorkerServer) Configs

func (s *WorkerServer) Configs() map[string]interface{}

Configs return the configs from #WorkerServer

func (*WorkerServer) DebugPprof

func (s *WorkerServer) DebugPprof() *WorkerServer

DebugPprof setup for the server to start with /debug/pprof*

func (*WorkerServer) GracefulStop added in v0.3.1

func (s *WorkerServer) GracefulStop() error

GracefulStop stop the server gracefully

func (*WorkerServer) HandleError

func (s *WorkerServer) HandleError(handle func(w *worker.Worker, err error)) *WorkerServer

HandleError setup the a function that will called when to occur and error

func (*WorkerServer) HealthCheck

func (s *WorkerServer) HealthCheck() *WorkerServer

HealthCheck setup for the server to start with /health-check

func (*WorkerServer) HealthCheckFunc

func (s *WorkerServer) HealthCheckFunc(f func(writer http.ResponseWriter, request *http.Request)) *WorkerServer

HealthCheckFunc setup the handler for /health-check

func (*WorkerServer) Healthy added in v0.1.11

func (s *WorkerServer) Healthy() bool

Healthy return true or false if the WorkerServer its ok or no, respectively

func (*WorkerServer) Infos added in v0.2.0

func (s *WorkerServer) Infos() map[string]interface{}

Infos return the infos about of WorkerServer

func (*WorkerServer) Run

func (s *WorkerServer) Run() error

Run user to start the #WorkerServer

func (*WorkerServer) Stats

func (s *WorkerServer) Stats() *WorkerServer

Stats setup for the server to start with /stats

func (*WorkerServer) StatsFunc

func (s *WorkerServer) StatsFunc(f func(writer http.ResponseWriter, request *http.Request)) *WorkerServer

StatsFunc setup the handler for /stats

func (*WorkerServer) Worker

func (s *WorkerServer) Worker(name string, handle func(ctx context.Context) error, configs ...worker.Config) *WorkerServer

Worker build an #Worker and add to execution with #WorkerServer

func (*WorkerServer) Workers

func (s *WorkerServer) Workers() []*worker.Worker

Workers return the slice of #Worker configured

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL