Version: v0.2.2 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2020 License: Apache-2.0 Imports: 19 Imported by: 0




View Source
const (
	AgentStatusStarting       = "STARTING"
	AgentStatusWaitingForConf = "WAITING_FOT_CONFIGURATION"
	AgentStatusRegistred      = "REGISTRED"
	AgentStatusUnRegistred    = "UNREGISTRED"
	AgentStatusOnline         = "ONLINE"
	AgentStatusOffline        = "OFFLINE"
	AgentStatusOnError        = "ON_ERROR"

Possible Statuses

View Source
const AuthPath = "/auth/token"

AuthPath path of the authentication endpoint

View Source
const (
	DefaultTimeOut = 60
View Source
const HeaderDcc = "X-Dcc-Auth"


View Source
var WaitAuth = time.Second * 5

WaitAuth number of second to wait if authentication failed


func Run

func Run(config *viper.Viper, s chan error) (err error)

Run agent if controller part is present in config file remote will be init else agent will be standalone mode


type Agent

type Agent struct {
	// contains filtered or unexported fields

Agent representation of agent

func (*Agent) GetSchemas

func (a *Agent) GetSchemas() map[string]map[string]map[string]*sources.Column

GetSchemas get schema from sources

func (*Agent) HealthCheck

func (a *Agent) HealthCheck() (alive bool)

HealthCheck returns true if agent and all source are up

func (*Agent) InitAgent added in v0.2.0

func (a *Agent) InitAgent() (err error)

InitAgent prepare agent from conf init multiplexer , source and sinks

func (*Agent) InitRemoteMeta added in v0.2.0

func (a *Agent) InitRemoteMeta()

InitRemoteMeta get meta from remote and assign it to sources

func (*Agent) LoadDeMultiplexer added in v0.2.0

func (a *Agent) LoadDeMultiplexer(demux *map[string][]string) error

LoadDeMultiplexer setup DeMultiplexer each source has its own DeMultiplexer

func (*Agent) LoadMultiplexer

func (a *Agent) LoadMultiplexer(multiplexer *map[string][]string) error

LoadMultiplexer setup Multiplexer each source as is own multiplexer

func (*Agent) LoadSink

func (a *Agent) LoadSink(sinkName string, sinkType string) (err error)

LoadSink create sink from name

func (*Agent) LoadSinks

func (a *Agent) LoadSinks() (err error)

LoadSinks Load all sinks init sinks from conf

func (*Agent) LoadSource

func (a *Agent) LoadSource(sourceName string, sourceType string) (err error)

LoadSource create Source from name

func (*Agent) LoadSources

func (a *Agent) LoadSources(multiplexer *map[string][]string, demux *map[string][]string) (err error)

LoadSources Load all Sources init sources from conf

func (*Agent) Poller added in v0.2.0

func (a *Agent) Poller()

Poller send meta and get task at each tick on every tick call sendMetaAndGetProcessTask

func (*Agent) ProcessTask added in v0.2.0

func (a *Agent) ProcessTask(task utils.Task) (err error)

ProcessTask process given task from server and update status

func (*Agent) RemoteInit added in v0.2.0

func (a *Agent) RemoteInit() error

RemoteInit init controller get configuration and meta from remote server send capabilities and schema to remote server

func (*Agent) SendCapabilities added in v0.2.0

func (a *Agent) SendCapabilities() (err error)

SendCapabilities send capability to server

func (*Agent) Start added in v0.2.0

func (a *Agent) Start() error

type Auth

type Auth struct {
	// contains filtered or unexported fields

Auth representation of auth

func NewAuth added in v0.2.0

func NewAuth(uuid string, password string, baseURL string) *Auth

NewAuth creates a new Auth using the given collector uuid, password and remote base url

func (*Auth) GetToken

func (a *Auth) GetToken() string

GetToken return the current token if it exists, get a new one otherwise

type Controller

type Controller struct {
	PendingTask int
	// contains filtered or unexported fields

Controller allow the collector to be controlled by API

func NewControllerClient

func NewControllerClient(conf *viper.Viper, auth *Auth) *Controller

NewControllerClient create new controller from configuration return an initialized controller

func (*Controller) GetConfiguration added in v0.2.0

func (c *Controller) GetConfiguration() (config []byte, err error)

GetConfiguration get Configuration from server return sources and sinks

func (*Controller) GetMeta added in v0.2.0

func (c *Controller) GetMeta(metaName string) (meta utils.Metas, err error)

GetMeta get metadata by name from API get meta from name if metaName is empty get all metas return meta as Metas object

func (*Controller) GetTasks added in v0.2.0

func (c *Controller) GetTasks(limit int) (task []utils.Task, err error)

GetTasks get the list of tasks the collector needs to execute from the API if limit is set to -1 return all task else return limit set

func (*Controller) SendCapabilities added in v0.2.0

func (c *Controller) SendCapabilities(tasks map[string]*utils.TaskDescription) (err error)

SendCapabilities send the currently supported capabilities of this collector to the API

func (*Controller) SendMeta added in v0.2.0

func (c *Controller) SendMeta(meta utils.Metas) (err error)

SendMeta send meta to the API

func (*Controller) SendSchema added in v0.2.0

func (c *Controller) SendSchema(sourceName string, schema map[string]map[string]*sources.Column) (err error)

SendSchema send the schema to the API

func (*Controller) SendSourcesCapabilities added in v0.2.0

func (c *Controller) SendSourcesCapabilities(sourceName string, tasks map[string]*utils.TaskDescription) (err error)

SendSourcesCapabilities send the currently supported capabilities of the configured source to the API

func (*Controller) UpdateTasks added in v0.2.0

func (c *Controller) UpdateTasks(task utils.Task) (err error)

UpdateTasks update the status (and results if needed) of a task that has been executed by sending it to the API

type ControllerConfig

type ControllerConfig struct {
	BaseURL      string `mapstructure:"base_url" json:"base_url"`
	PollerTicker string `mapstructure:"poller_ticker" json:"poller_ticker"`
	Worker       int    `json:"worker"`

ControllerConfig representation of controller config

type DeMultiplexer added in v0.2.0

type DeMultiplexer struct {
	// contains filtered or unexported fields

DeMultiplexer is used to send offset from sink to source

func NewDemultiplexer added in v0.2.0

func NewDemultiplexer(ins []chan interface{}, out chan interface{}) (demux *DeMultiplexer)

NewDemultiplexer create new DeMultiplexer return an initialized DeMultiplexer object

type Multiplexer

type Multiplexer struct {
	// contains filtered or unexported fields

Multiplexer represent the Multiplexer of collector

func NewMultiplexer

func NewMultiplexer(in chan events.LookatchEvent, outs []chan events.LookatchEvent) (multiplexer *Multiplexer)

NewMultiplexer create a new multiplexer

type Schema added in v0.2.0

type Schema struct {
	Key    string                     `json:"key"`
	Values map[string]*sources.Column `json:"values"`

Schema use to send schema to the API

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL