postgresworker

package
v1.7.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

README

Example

This is example for create Postgres Event Listener, inspired by Hasura Event Triggers

Create delivery handler

package workerhandler

import (
	"context"
	"encoding/json"
	
	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"pkg.agungdp.dev/candi/codebase/factory/types"
	"pkg.agungdp.dev/candi/tracer"
)

// PostgresListenerHandler struct
type PostgresListenerHandler struct {
	uc        usecase.Usecase
	validator interfaces.Validator
}

// NewPostgresListenerHandler constructor
func NewPostgresListenerHandler(uc usecase.Usecase, validator interfaces.Validator) *PostgresListenerHandler {
	return &PostgresListenerHandler{
		uc:        uc,
		validator: validator,
	}
}

// MountHandlers mount handler group
func (h *PostgresListenerHandler) MountHandlers(group *types.WorkerHandlerGroup) {
	group.Add("table-names", h.handleDataChange) // listen data change on table "table-names"
}

func (h *PostgresListenerHandler) handleDataChange(ctx context.Context, message []byte) error {
	trace := tracer.StartTrace(ctx, "DeliveryPostgresListener:HandleDataChange")
	defer trace.Finish()
	ctx = trace.Context()

	fmt.Printf("data change on table 'table-names' detected: %s\n", message)
	// call usecase
	return nil
}


Register in module

package examplemodule

import (
	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"pkg.agungdp.dev/candi/codebase/factory/dependency"
	"pkg.agungdp.dev/candi/codebase/factory/types"
	"pkg.agungdp.dev/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.PostgresListener: workerhandler.NewPostgresListenerHandler(usecaseUOW.User(), deps.GetValidator()),
		},
	}
}

// ...another method

JSON Payload

Received on messages ([]byte data type) in handler param.

{
  "table": "<table-name>",
  "action": "<operation-name>", // INSERT, UPDATE, or DELETE
  "data": {
    "old": <old-column-values-object>,
    "new": <new-column-values-object>
  }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewWorker

func NewWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppServerFactory

NewWorker create new postgres event listener

Types

type EventPayload

type EventPayload struct {
	Table  string `json:"table"`
	Action string `json:"action"`
}

EventPayload event model

type OptionFunc added in v1.7.4

type OptionFunc func(*option)

OptionFunc type

func SetConsul added in v1.7.4

func SetConsul(consul *candiutils.Consul) OptionFunc

SetConsul option func

func SetDebugMode added in v1.7.4

func SetDebugMode(debugMode bool) OptionFunc

SetDebugMode option func

func SetMaxGoroutines added in v1.7.4

func SetMaxGoroutines(maxGoroutines int) OptionFunc

SetMaxGoroutines option func

func SetPostgresDSN added in v1.7.4

func SetPostgresDSN(dsn string) OptionFunc

SetPostgresDSN option func

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL