workflow

package
v0.0.0-...-7d215dd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2021 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetWorkerName

func GetWorkerName(deviceType DeviceType, workerNum int) string

func OnExit

func OnExit(ctx context.Context, cerr chan<- errs.Error, prefixMsg string, fn func())

func ProtoUserAppsBuilder

func ProtoUserAppsBuilder() interface{}

func Run

func Run(ctx context.Context, runners ...Runner) <-chan struct{}

Types

type DBufferStats

type DBufferStats struct {
	ItemsCounter reg.Counter
	EndTime      time.Time
}

type DTBufferStats

type DTBufferStats map[string]*DBufferStats

func (DTBufferStats) SortByDeviceType

func (m DTBufferStats) SortByDeviceType() []string

type DTParserStats

type DTParserStats map[DeviceType]ParserStats

func (DTParserStats) SortByDeviceType

func (m DTParserStats) SortByDeviceType() []DeviceType

type DTSaverStats

type DTSaverStats map[DeviceType]*SaverStats

func (DTSaverStats) SortByDeviceType

func (m DTSaverStats) SortByDeviceType() []DeviceType

type DeviceRecord

type DeviceRecord struct {
	DeviceType          string
	ProtoDeviceUserApps ProtoUserApps
}

type DeviceType

type DeviceType string

type DeviceTypeDBuffer

type DeviceTypeDBuffer interface {
	Run(context.Context) <-chan struct{}
	ResChs() map[DeviceType]chan *ProtoUserApps
	ErrCh() <-chan errs.Error
	Stats() interface{}
}

func NewDTDBuffer

func NewDTDBuffer(ctx context.Context, dtInputChs map[DeviceType]chan *ProtoUserApps, workersCount int, dirPath string, itemsPerSegment int, resume bool, turbo bool, statsOn bool) (DeviceTypeDBuffer, error)

type DeviceTypeDBufferStats

type DeviceTypeDBufferStats struct {
	StartTime     time.Time
	DTInputStats  map[string]*DBufferStats
	DTOutputStats map[string]*DBufferStats
}

type DeviceTypeParserStats

type DeviceTypeParserStats struct {
	DTStats            map[DeviceType]ParserStats
	StartTime, EndTime time.Time
}

type DeviceTypeSaverStats

type DeviceTypeSaverStats struct {
	DTStats               map[DeviceType]*SaverStats
	StartTime, FinishTime time.Time
}

type Loader

type Loader interface {
	Run(ctx context.Context) <-chan struct{}
	ResCh() <-chan string
	ErrCh() <-chan errs.Error
	Stats() interface{}
}

func NewLoader

func NewLoader(ctx context.Context, filePaths []string, maxWorkers int, usr *user.User, dry bool, statsOn bool) Loader

type LoaderStats

type LoaderStats struct {
	StartTime, FinishTime      time.Time
	FilesCounter, ItemsCounter reg.Counter
}

type Parser

type Parser interface {
	Run(ctx context.Context) <-chan struct{}
	ResChs() map[DeviceType]chan *ProtoUserApps // <-chan *DeviceRecord
	ErrCh() <-chan errs.Error
	Stats() interface{}
}

func NewParser

func NewParser(ctx context.Context, inputCh <-chan string, maxWorkers int, deviceTypes map[DeviceType]bool, ignoreUnknownDeviceType bool, statsOn bool) Parser

type ParserStats

type ParserStats interface {
	ItemsCounter() reg.Counter
	InputBytesCounter() reg.Counter
	OutputBytesCounter() reg.Counter
}

type Pipeline

type Pipeline []Pipeliner

func (Pipeline) Runners

func (pl Pipeline) Runners() []Runner

func (Pipeline) StatProducers

func (pl Pipeline) StatProducers() []StatProducer

type Pipeliner

type Pipeliner interface {
	Runner
	StatProducer
}

type ProtoUserApps

type ProtoUserApps struct {
	DeviceKey string
	Data      []byte
}

func (ProtoUserApps) Size

func (d ProtoUserApps) Size() int

type Runner

type Runner interface {
	Run(context.Context) <-chan struct{}
}

type Saver

type Saver interface {
	Run(ctx context.Context) <-chan struct{}
	ErrCh() <-chan errs.Error
	Stats() interface{}
}

func NewMemcSaver

func NewMemcSaver(ctx context.Context, dtInputs map[DeviceType]chan *ProtoUserApps, addrs map[string]string, dry bool, timeout time.Duration, maxRetries int, retryTimeout time.Duration, statsOn bool) (Saver, errs.Error)

todo: as option - to improve performance 1) add workers per deviceType or/and 2) use memcache connections pool (if client has one ...)

type SaverStats

type SaverStats struct {
	ItemsCounter reg.Counter
}

type StatProducer

type StatProducer interface {
	Stats() interface{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL