collection

package
v0.0.0-...-8886d92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewHandlers

func NewHandlers(r *Repository)

NewHandlers sets the repository for the handlers

Types

type Logs

type Logs []Message

type LogsBatch

type LogsBatch struct {
	LogMessage      []Message
	ServiceSeverity []ServiceSeverity
}

type Message

type Message struct {
	ServiceName string    `json:"service_name"`
	Payload     string    `json:"payload"`
	Severity    string    `json:"severity"`
	Timestamp   time.Time `json:"timestamp"`
}

Message holds the message structure

type ReceiverJob

type ReceiverJob struct{}

type ReceiverResult

type ReceiverResult struct {
	Data []byte
}

type Repository

type Repository struct {
	App *config.AppConfig
}

Repository holds App config

var Repo *Repository

func NewRepo

func NewRepo(a *config.AppConfig) *Repository

NewRepo initialise and return Repository Type Which holds AppConfig

func (*Repository) BulkDbInsert

func (repo *Repository) BulkDbInsert(messageRows []Message, logSeverityRows []ServiceSeverity) error

BulkDbInsert inserts batch data

func (*Repository) CreateDbProcessWorkerPools

func (repo *Repository) CreateDbProcessWorkerPools(poolSize int, logsBatch <-chan LogsBatch, logsBatchReceive chan<- LogsBatch, sLogs chan<- lmslogging.Log, wg *sync.WaitGroup)

CreateDbProcessWorkerPools creates a pool of Receiver Workers

func (*Repository) CreateJobsPool

func (repo *Repository) CreateJobsPool(jobs chan<- ReceiverJob)

CreateJobsPool sending unlimited jobs to ReceiverJobs Channel

func (*Repository) CreateProcessWorkerPools

func (repo *Repository) CreateProcessWorkerPools(poolSize int, results <-chan ReceiverResult, logsBatch chan<- LogsBatch, wg *sync.WaitGroup)

CreateProcessWorkerPools creates a pool of Receiver Workers

func (*Repository) CreateReceiverWorkerPools

func (repo *Repository) CreateReceiverWorkerPools(poolSize int, jobs <-chan ReceiverJob, results chan<- ReceiverResult, logs chan<- lmslogging.Log, wg *sync.WaitGroup)

CreateReceiverWorkerPools creates a pool of Receiver Workers

func (*Repository) MessageDbProcessWorker

func (repo *Repository) MessageDbProcessWorker(logsBatchReceive <-chan LogsBatch, logsBatchSend chan<- LogsBatch, lmsLogChan chan<- lmslogging.Log)

MessageDbProcessWorker gets batch the messages from LogsBatch channel and insert to DB 5 retries if error occurred. If still error on insert it will send the LogsBatch back to channel

func (*Repository) MessageProcessWorker

func (repo *Repository) MessageProcessWorker(msgSize int, results <-chan ReceiverResult, logsBatch chan<- LogsBatch)

MessageProcessWorker gets the messages from results channel and process as batch send to 'Logs' channel

func (*Repository) ReceiverWorker

func (repo *Repository) ReceiverWorker(jobs <-chan ReceiverJob, results chan<- ReceiverResult, logs chan<- lmslogging.Log)

ReceiverWorker receives messages from pub/sub and send it to receiverResult Channel

type ServiceSeverity

type ServiceSeverity struct {
	ServiceName string `json:"service_name"`
	Severity    string `json:"severity"`
	Count       int    `json:"count"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL