taskqueueworker

package
v1.7.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2021 License: Apache-2.0 Imports: 32 Imported by: 0

README

Example

Create delivery handler

package workerhandler

import (
	"context"
	"time"

	taskqueueworker "pkg.agungdp.dev/candi/codebase/app/task_queue_worker"
	"pkg.agungdp.dev/candi/codebase/factory/types"
	"pkg.agungdp.dev/candi/logger"
)

// TaskQueueHandler struct
type TaskQueueHandler struct {
}

// NewTaskQueueHandler constructor
func NewTaskQueueHandler() *TaskQueueHandler {
	return &TaskQueueHandler{}
}

// MountHandlers return map topic to handler func
func (h *TaskQueueHandler) MountHandlers(group *types.WorkerHandlerGroup) {

	group.Add("task-one", h.taskOne)
	group.Add("task-two", h.taskTwo)
}

func (h *TaskQueueHandler) taskOne(ctx context.Context, message []byte) error {
	logger.LogRed("task-one: " + string(message))
	return &taskqueueworker.ErrorRetrier{
		Delay:   10 * time.Second,
		Message: "Error",
	}
}

func (h *TaskQueueHandler) taskTwo(ctx context.Context, message []byte) error {
	logger.LogYellow("task-two: " + string(message))
	return &taskqueueworker.ErrorRetrier{
		Delay:   3 * time.Second,
		Message: "Error",
	}
}

Register in module

package examplemodule

import (

	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"pkg.agungdp.dev/candi/codebase/factory/dependency"
	"pkg.agungdp.dev/candi/codebase/factory/types"
	"pkg.agungdp.dev/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.TaskQueue: workerhandler.NewTaskQueueHandler(),
		},
	}
}

// ...another method

Add task in each usecase module

  • From internal service (same runtime)
package usecase

import (
	"context"
	"log"

	taskqueueworker "pkg.agungdp.dev/candi/codebase/app/task_queue_worker"
)

func someUsecase() {
	// add task queue for `task-one` with 5 retry
	if err := taskqueueworker.AddJob("task-one", 5, `{"params": "test-one"}`); err != nil {
		log.Println(err)
	}

	// add task queue for `task-two` with 5 retry
	if err := taskqueueworker.AddJob("task-two", 5, `{"params": "test-two"}`); err != nil {
		log.Println(err)
	}
}
  • Or if running on a separate server

Via GraphQL API

POST {{task-queue-worker-host}}/graphql

mutation addJob {
  add_job(
    task_name: "task-one"
    max_retry: 5
    args: "{\"params\": \"test-one\"}"
  )
}

Direct call function

// add task queue for `task-one` via HTTP request
if err := taskqueueworker.AddJobViaHTTPRequest(ctx, "{{task-queue-worker-host}}", "task-one", 5, `{"params": "test-one"}`); err != nil {
	log.Println(err)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddJob

func AddJob(taskName string, maxRetry int, args []byte) (err error)

AddJob public function for add new job in same runtime

func AddJobViaHTTPRequest added in v1.7.0

func AddJobViaHTTPRequest(ctx context.Context, workerHost string, taskName string, maxRetry int, args []byte) error

AddJobViaHTTPRequest public function for add new job via http request

func NewTaskQueueWorker added in v1.6.8

func NewTaskQueueWorker(service factory.ServiceFactory, q QueueStorage, perst Persistent, opts ...OptionFunc) factory.AppServerFactory

NewTaskQueueWorker create new task queue worker

Types

type Filter

type Filter struct {
	Page, Limit  int
	TaskName     string
	TaskNameList []string
	Search       *string
	Status       []string
	ShowAll      bool
}

Filter type

type Job

type Job struct {
	ID          string    `bson:"_id" json:"_id"`
	TaskName    string    `bson:"task_name" json:"task_name"`
	Arguments   string    `bson:"arguments" json:"arguments"`
	Retries     int       `bson:"retries" json:"retries"`
	MaxRetry    int       `bson:"max_retry" json:"max_retry"`
	Interval    string    `bson:"interval" json:"interval"`
	CreatedAt   time.Time `bson:"created_at" json:"created_at"`
	FinishedAt  time.Time `bson:"finished_at" json:"finished_at"`
	Status      string    `bson:"status" json:"status"`
	Error       string    `bson:"error" json:"error"`
	TraceID     string    `bson:"traceId" json:"traceId"`
	NextRetryAt string    `bson:"-" json:"-"`
}

Job model

type JobListResolver

type JobListResolver struct {
	Meta MetaJobList
	Data []Job
}

JobListResolver resolver

type JobStatusEnum added in v1.7.0

type JobStatusEnum string

JobStatusEnum enum status

type MemstatsResolver added in v1.6.12

type MemstatsResolver struct {
	Alloc         string
	TotalAlloc    string
	NumGC         int
	NumGoroutines int
}

MemstatsResolver resolver

type MetaJobList added in v1.6.8

type MetaJobList struct {
	Page           int
	Limit          int
	TotalRecords   int
	TotalPages     int
	IsCloseSession bool
	Detail         struct {
		Failure, Retrying, Success, Queueing, Stopped int
	}
}

MetaJobList resolver

type MetaTaskResolver added in v1.6.8

type MetaTaskResolver struct {
	Page                  int
	Limit                 int
	TotalRecords          int
	TotalPages            int
	IsCloseSession        bool
	TotalClientSubscriber int
}

MetaTaskResolver meta resolver

type OptionFunc added in v1.6.8

type OptionFunc func(*option)

OptionFunc type

func SetAutoRemoveClientInterval added in v1.6.8

func SetAutoRemoveClientInterval(d time.Duration) OptionFunc

SetAutoRemoveClientInterval option func

func SetDashboardBanner added in v1.7.0

func SetDashboardBanner(banner string) OptionFunc

SetDashboardBanner option func

func SetDashboardHTTPPort added in v1.7.4

func SetDashboardHTTPPort(port uint16) OptionFunc

SetDashboardHTTPPort option func

func SetDebugMode added in v1.7.4

func SetDebugMode(debugMode bool) OptionFunc

SetDebugMode option func

func SetJaegerTracingDashboard added in v1.6.8

func SetJaegerTracingDashboard(host string) OptionFunc

SetJaegerTracingDashboard option func

func SetMaxClientSubscriber added in v1.6.8

func SetMaxClientSubscriber(max int) OptionFunc

SetMaxClientSubscriber option func

type Persistent added in v1.7.0

type Persistent interface {
	FindAllJob(ctx context.Context, filter Filter) (jobs []Job)
	FindJobByID(ctx context.Context, id string) (job *Job, err error)
	CountAllJob(ctx context.Context, filter Filter) int
	AggregateAllTaskJob(ctx context.Context, filter Filter) (result []TaskResolver)
	SaveJob(ctx context.Context, job *Job)
	UpdateAllStatus(ctx context.Context, taskName string, currentStatus []JobStatusEnum, updatedStatus JobStatusEnum)
	CleanJob(ctx context.Context, taskName string)
}

Persistent abstraction

func NewMongoPersistent added in v1.7.0

func NewMongoPersistent(db *mongo.Database) Persistent

NewMongoPersistent create mongodb persistent

type QueueStorage

type QueueStorage interface {
	PushJob(job *Job)
	PopJob(taskName string) (jobID string)
	NextJob(taskName string) (jobID string)
	Clear(taskName string)
}

QueueStorage abstraction for queue storage backend

func NewInMemQueue

func NewInMemQueue() QueueStorage

NewInMemQueue init inmem queue

func NewRedisQueue

func NewRedisQueue(redisPool *redis.Pool) QueueStorage

NewRedisQueue init inmem queue

type TaglineResolver

type TaglineResolver struct {
	Banner                    string
	Tagline                   string
	Version                   string
	TaskListClientSubscribers []string
	JobListClientSubscribers  []string
	MemoryStatistics          MemstatsResolver
}

TaglineResolver resolver

type TaskListResolver added in v1.6.8

type TaskListResolver struct {
	Meta MetaTaskResolver
	Data []TaskResolver
}

TaskListResolver resolver

type TaskResolver

type TaskResolver struct {
	Name      string
	TotalJobs int
	Detail    struct {
		Failure, Retrying, Success, Queueing, Stopped int
	}
}

TaskResolver resolver

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL