job

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2019 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Module configures Drivers and Controller.

Functions

func StartController

func StartController(c *Controller)

StartController starts periodically sync up with aresDB controller

Types

type Controller

type Controller struct {
	sync.RWMutex

	// Drivers are all running jobs
	Drivers Drivers
	// contains filtered or unexported fields
}

Controller is responsible for syncing up with aresDB control

func NewController

func NewController(params Params) *Controller

NewController creates controller

func (*Controller) RegisterOnZK

func (c *Controller) RegisterOnZK() error

RegisterOnZK registes aresDB subscriber instance in zookeeper as an ephemeral node

func (*Controller) RestartEtcdHBService

func (c *Controller) RestartEtcdHBService(params Params)

RestartEtcdHBService registers heartbeat again if etcd cluster changes are detected

func (*Controller) SyncUpJobConfigs

func (c *Controller) SyncUpJobConfigs()

SyncUpJobConfigs sync up jobConfigs with aresDB controller

type Driver

type Driver struct {
	sync.RWMutex

	Topic                string
	StartTime            time.Time
	Shutdown             bool
	JobName              string
	AresCluster          string
	TotalProcessors      int
	RunningProcessors    int
	StoppedProcessors    int
	FailedProcessors     int
	RestartingProcessors int
	ProcessorContext     map[string]*ProcessorContext
	// contains filtered or unexported fields
}

Driver will initialize and start the Processor's based on the JobConfig provided

func NewDriver

func NewDriver(
	jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig, aresControllerClient controllerCli.ControllerClient, processorInitFunc NewProcessor, sinkInitFunc NewSink,
	consumerInitFunc NewConsumer, decoderInitFunc NewDecoder) (*Driver, error)

NewDriver will return a new Driver instance to start a ingest job

func (*Driver) AddProcessor

func (d *Driver) AddProcessor() (err error)

AddProcessor will add a new processor to driver

func (*Driver) GetErrors

func (d *Driver) GetErrors() chan ProcessorError

GetErrors returns errors

func (*Driver) MarshalJSON

func (d *Driver) MarshalJSON() ([]byte, error)

MarshalJSON marshal driver into json

func (*Driver) RemoveProcessor

func (d *Driver) RemoveProcessor(ID int) bool

RemoveProcessor will remove a processor from driver.

func (*Driver) Start

func (d *Driver) Start()

Start will start the Driver, which starts Processor's to read from Kafka consumer, process and save to database

func (*Driver) Stop

func (d *Driver) Stop()

Stop will shutdown driver and its processors

func (*Driver) WriteContext

func (d *Driver) WriteContext(w http.ResponseWriter)

WriteContext writes context

type Drivers

type Drivers map[string]map[string]*Driver

Drivers contains information about job, ares cluster and its driver

func NewDrivers

func NewDrivers(params Params, aresControllerClient controllerCli.ControllerClient) (Drivers, error)

NewDrivers return Drivers

type FailureHandler

type FailureHandler interface {

	// HandleFailure will provide a contingent plan to
	// keep track of failed save
	HandleFailure(destination sink.Destination, rows []client.Row) error
}

FailureHandler interface will be implemented by failure handler that are used when saving to storage layer fails

type NewConsumer

type NewConsumer func(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error)

NewConsumer is the type of function each consumer that implements Consumer should provide for initialization.

type NewDecoder

type NewDecoder func(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (decoder message.Decoder, err error)

NewDecoder is the type of function each decoder that implements decoder should provide for initialization.

type NewProcessor

type NewProcessor func(id int, jobConfig *rules.JobConfig, aresControllerClient controllerCli.ControllerClient, sinkInitFunc NewSink, consumerInitFunc NewConsumer, decoderInitFunc NewDecoder,
	errors chan ProcessorError, msgSizes chan int64, serviceConfig config.ServiceConfig) (Processor, error)

NewProcessor is the type of function each processor that implements Processor should provide for initialization This function implementation should always return a new instance of the processor

type NewSink

type NewSink func(
	serviceConfig config.ServiceConfig, jobConfig *rules.JobConfig, cluster string,
	sinkCfg config.SinkConfig, aresControllerClient controllerCli.ControllerClient) (sink.Sink, error)

NewSink is the type of function each decoder that implements sink should provide for initialization.

type Params

type Params struct {
	fx.In

	LifeCycle        fx.Lifecycle
	ServiceConfig    config.ServiceConfig
	JobConfigs       rules.JobConfigs
	SinkInitFunc     NewSink
	ConsumerInitFunc NewConsumer
	DecoderInitFunc  NewDecoder
}

Params defines the base objects for jobConfigs.

type Processor

type Processor interface {

	// GetId will return ID of this processor
	GetID() int

	// GetContext will return the processor context
	GetContext() *ProcessorContext

	// Run will start the processor and run until shutdown
	// is triggered for close for some other reason
	Run()

	// Stop will stop the processor and close all connections
	// to kafka consumer group and storage layer
	Stop()

	// Restart will stop and start current processor
	Restart()
}

Processor is a interface that all processor needs to implement to work with Driver

func NewStreamingProcessor

func NewStreamingProcessor(id int, jobConfig *rules.JobConfig, aresControllerClient controllerCli.ControllerClient, sinkInitFunc NewSink, consumerInitFunc NewConsumer, decoderInitFunc NewDecoder,
	errors chan ProcessorError, msgSizes chan int64, serviceConfig config.ServiceConfig) (Processor, error)

NewStreamingProcessor returns Processor to consume, process and save data to db.

type ProcessorContext

type ProcessorContext struct {
	sync.RWMutex

	StartTime      time.Time       `json:"startTime"`
	TotalMessages  int64           `json:"totalMessages"`
	FailedMessages int64           `json:"failedMessages"`
	Stopped        bool            `json:"stopped"`
	Shutdown       bool            `json:"shutdown"`
	Errors         processorErrors `json:"errors"`
	LastUpdated    time.Time       `json:"lastUpdated"`
	RestartCount   int64           `json:"restartCount"`
	Restarting     bool            `json:"restarting"`
	RestartTime    int64           `json:"restartTime"`
}

ProcessorContext holds information about total messages processed, number of failed messages, number of waiting messages in batcher and last updated timestamp for this information

type ProcessorError

type ProcessorError struct {
	// ID of the processor
	ID int
	// Timestamp defines time when this error
	// happened
	Timestamp int64
	// Error generated
	Error error
}

ProcessorError will define the error and ID of processor that generated it

func (ProcessorError) ErrorToJSON

func (p ProcessorError) ErrorToJSON() string

ErrorToJSON converts error to json format

type Result

type Result struct {
	fx.Out

	Controller *Controller
}

Result defines the objects that the job module provides.

type RetryFailureHandler

type RetryFailureHandler struct {
	// contains filtered or unexported fields
}

RetryFailureHandler implements exponential backoff retry

func NewRetryFailureHandler

func NewRetryFailureHandler(
	config models.FailureHandlerConfig,
	serviceConfig config.ServiceConfig,
	db sink.Sink,
	jobName string) *RetryFailureHandler

NewRetryFailureHandler creates a new RetryFailureHandler

func (*RetryFailureHandler) HandleFailure

func (handler *RetryFailureHandler) HandleFailure(destination sink.Destination, rows []client.Row) (err error)

HandleFailure handles failure with retry

type StreamingProcessor

type StreamingProcessor struct {
	ID int
	// contains filtered or unexported fields
}

StreamingProcessor defines a individual processor that connects to a Kafka high level consumer, processes the messages based on the type of job and saves to database

func (*StreamingProcessor) GetContext

func (s *StreamingProcessor) GetContext() *ProcessorContext

GetContext will return context of this processor

func (*StreamingProcessor) GetID

func (s *StreamingProcessor) GetID() int

GetID will return ID of this processor

func (*StreamingProcessor) Restart

func (s *StreamingProcessor) Restart()

Restart will stop the processor and start the process again in the case failure detected

func (*StreamingProcessor) Run

func (s *StreamingProcessor) Run()

Run will start the Processor that reads from high level kafka consumer, decodes the message and add the row to batcher for saving to ares.

func (*StreamingProcessor) Stop

func (s *StreamingProcessor) Stop()

Stop will stop the processor

type ZKNodeSubscriber

type ZKNodeSubscriber struct {
	// Name is subscriber instanceId
	Name string `json:"name"`
	// Host is host name of subscriber
	Host string `json:"host"`
}

ZKNodeSubscriber defines the information stored in ZKNode subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL