mgr

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2020 License: Apache-2.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusOK   = "ok"
	StatusBad  = "bad"
	StatusLost = "lost"
)
View Source
const (
	DefaultTryTimes = 3
	MetaTmp         = "meta_tmp/"

	DefaultRawDataBatchLen = 10
	RawDataMaxBatchLines   = 100
	DefaultRawDataSize     = 16 * 1024
)
View Source
const (
	StatsShell = "stats"
	PREFIX     = "/logkit"
)
View Source
const (
	SpeedUp     = "up"
	SpeedDown   = "down"
	SpeedStable = "stable"

	RunnerRunning = "running"
	RunnerStopped = "stopped"
)
View Source
const DefaultMyTag = "default"
View Source
const (
	KeyMetricType = "type"
)
View Source
const KeyRouterConfig = "router"
View Source
const KeySendConfig = "senders"

Variables

View Source
var DEFAULT_LOGKIT_REST_DIR = ".logkitconfs"
View Source
var DEFAULT_PORT = 3000
View Source
var DIR_NOT_EXIST_SLEEP_TIME = "300" //300 s
View Source
var KeySampleLog = "sampleLog"
View Source
var RawDataTimeOut = 30 * time.Second

Functions

func GetMySlaveUrl added in v1.3.6

func GetMySlaveUrl(address, schema string) (uri string, err error)

func MergeExtraInfoTags added in v1.4.6

func MergeExtraInfoTags(meta *reader.Meta, prefix string, tags map[string]interface{}) map[string]interface{}

func NewMetric

func NewMetric(tp string) (metric.Collector, error)

func ParseData added in v1.4.3

func ParseData(parserConfig conf.MapConf) (parsedData []Data, err error)

parse模块中各种type的日志都能获取解析后的数据

func RawData added in v1.4.3

func RawData(readerConfig conf.MapConf) ([]string, error)

RawData 从 reader 模块中根据 type 获取多条字符串形式的样例日志

func Register added in v1.3.6

func Register(masters []string, myhost, tag string) error

func RespError added in v1.4.0

func RespError(c echo.Context, respCode int, errCode, errMsg string) error

func RespSuccess added in v1.4.0

func RespSuccess(c echo.Context, data interface{}) error

func SendData added in v1.4.3

func SendData(senderConfig map[string]interface{}) error

func TransformData added in v1.4.3

func TransformData(transformerConfig map[string]interface{}) ([]Data, error)

Types

type CleanInfo

type CleanInfo struct {
	// contains filtered or unexported fields
}

type Cluster added in v1.3.6

type Cluster struct {
	ClusterConfig
	// contains filtered or unexported fields
}

func NewCluster added in v1.3.6

func NewCluster(cc *ClusterConfig) *Cluster

func (*Cluster) AddSlave added in v1.3.6

func (cc *Cluster) AddSlave(url, tag string)

func (*Cluster) RunRegisterLoop added in v1.3.6

func (cc *Cluster) RunRegisterLoop() error

func (*Cluster) UpdateSlaveStatus added in v1.3.6

func (cc *Cluster) UpdateSlaveStatus()

type ClusterConfig added in v1.3.6

type ClusterConfig struct {
	MasterUrl []string `json:"master_url"`
	IsMaster  bool     `json:"is_master"`
	Enable    bool     `json:"enable"`
	Address   string   `json:"address"`
	Tag       string   `json:"tag"`
}

type ClusterStatus added in v1.3.6

type ClusterStatus struct {
	Status map[string]RunnerStatus `json:"status"`
	Tag    string                  `json:"tag"`
	Err    string                  `json:"error"`
}

type CollectLog added in v1.5.4

type CollectLog struct {
	CollectLogPath   string `json:"collect_log_path"`
	CollectLogEnable bool   `json:"collect_log_enable"`
	ReadFrom         string `json:"read_from"`
	EnvTag           string `json:"-"`
	Pandora
}

type CompatibleErrorResult added in v1.5.4

type CompatibleErrorResult struct {
	ReadErrors      *ErrorStatistic            `json:"read_errors"`
	ParseErrors     *ErrorStatistic            `json:"parse_errors"`
	TransformErrors map[string]*ErrorStatistic `json:"transform_errors"`
	SendErrors      map[string]*ErrorStatistic `json:"send_errors"`
}

为了兼容之前的消息传递是errorqueue的结构

type ErrorsList added in v1.5.2

type ErrorsList struct {
	ReadErrors      *equeue.ErrorQueue            `json:"read_errors"`
	ParseErrors     *equeue.ErrorQueue            `json:"parse_errors"`
	TransformErrors map[string]*equeue.ErrorQueue `json:"transform_errors"`
	SendErrors      map[string]*equeue.ErrorQueue `json:"send_errors"`
}

func NewErrorsList added in v1.5.4

func NewErrorsList() *ErrorsList

func (*ErrorsList) Clone added in v1.5.2

func (list *ErrorsList) Clone() *ErrorsList

Clone 返回当前 ErrorList 的完整拷贝,若无数据则会返回 nil

func (*ErrorsList) Empty added in v1.5.4

func (list *ErrorsList) Empty() bool

Empty 检查列表是否为空

func (*ErrorsList) HasParseErr added in v1.5.4

func (list *ErrorsList) HasParseErr() bool

func (*ErrorsList) HasReadErr added in v1.5.4

func (list *ErrorsList) HasReadErr() bool

func (*ErrorsList) HasSendErr added in v1.5.4

func (list *ErrorsList) HasSendErr() bool

func (*ErrorsList) HasTransformErr added in v1.5.4

func (list *ErrorsList) HasTransformErr() bool

func (*ErrorsList) List added in v1.5.4

func (list *ErrorsList) List() (dst ErrorsResult)

List 复制出一个顺序的 Errors

func (*ErrorsList) Reset added in v1.5.2

func (list *ErrorsList) Reset()

Reset 清空列表

type ErrorsResult added in v1.5.2

type ErrorsResult struct {
	ReadErrors      []equeue.ErrorInfo            `json:"read_errors"`
	ParseErrors     []equeue.ErrorInfo            `json:"parse_errors"`
	TransformErrors map[string][]equeue.ErrorInfo `json:"transform_errors"`
	SendErrors      map[string][]equeue.ErrorInfo `json:"send_errors"`
}

type LogExportRunner

type LogExportRunner struct {
	RunnerInfo
	// contains filtered or unexported fields
}

func NewLogExportRunner

func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (runner *LogExportRunner, err error)

func NewLogExportRunnerWithService

func NewLogExportRunnerWithService(info RunnerInfo, reader reader.Reader, cleaner *cleaner.Cleaner, parser parser.Parser,
	transformers []transforms.Transformer, senders []sender.Sender, router *router.Router, meta *reader.Meta) (runner *LogExportRunner, err error)

func (*LogExportRunner) Cleaner

func (r *LogExportRunner) Cleaner() CleanInfo

func (*LogExportRunner) Delete added in v1.5.4

func (r *LogExportRunner) Delete() (err error)

func (*LogExportRunner) GetErrors added in v1.5.2

func (r *LogExportRunner) GetErrors() ErrorsResult

func (*LogExportRunner) LagStats

func (r *LogExportRunner) LagStats() (rl *LagInfo, err error)

func (*LogExportRunner) Name

func (r *LogExportRunner) Name() string

func (*LogExportRunner) Reset added in v1.3.1

func (r *LogExportRunner) Reset() (err error)

func (*LogExportRunner) Run

func (r *LogExportRunner) Run()

func (*LogExportRunner) Status

func (r *LogExportRunner) Status() (rs RunnerStatus)

func (*LogExportRunner) StatusBackup added in v1.3.5

func (r *LogExportRunner) StatusBackup()

StatusBackup 除了备份Status的数据之外,还会备份historyError数据,因为重构前混到一起,导致备份写到同一个statistics.meta文件中

func (*LogExportRunner) StatusRestore added in v1.3.5

func (r *LogExportRunner) StatusRestore()

StatusRestore 除了恢复Status的数据之外,还会恢复historyError数据,因为重构前混到一起,导致备份写到同一个statistics.meta文件中

func (*LogExportRunner) Stop

func (r *LogExportRunner) Stop()

Stop 清理所有使用到的资源, 等待10秒尝试读取完毕 先停Reader,不再读取,然后停Run函数,让读取的都转到发送,最后停Sender结束整个过程。 Parser 无状态,无需stop。

func (*LogExportRunner) TokenRefresh added in v1.4.4

func (r *LogExportRunner) TokenRefresh(tokens AuthTokens) error

type Manager

type Manager struct {
	ManagerConfig
	DefaultDir string

	Version    string
	SystemInfo string

	CollectLogRunner *self.LogRunner
	// contains filtered or unexported fields
}

func NewCustomManager

func NewCustomManager(conf ManagerConfig, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (*Manager, error)

func NewManager

func NewManager(conf ManagerConfig) (*Manager, error)

func (*Manager) Add

func (m *Manager) Add(confPath string)

func (*Manager) AddRunner added in v1.4.3

func (m *Manager) AddRunner(name string, conf RunnerConfig, createTime time.Time) (err error)

func (*Manager) Configs added in v1.4.2

func (m *Manager) Configs() (rss map[string]RunnerConfig)

func (*Manager) DeleteRunner added in v1.4.3

func (m *Manager) DeleteRunner(name string) (err error)

func (*Manager) Error added in v1.5.2

func (m *Manager) Error(name string) (rss ErrorsResult, err error)

func (*Manager) Errors added in v1.5.2

func (m *Manager) Errors() (es map[string]ErrorsResult)

func (*Manager) ForkRunner added in v1.2.3

func (m *Manager) ForkRunner(confPath string, config RunnerConfig, returnOnErr bool) error

func (*Manager) GetRunnerNames added in v1.5.4

func (m *Manager) GetRunnerNames() []string

func (*Manager) GetRunnerPath added in v1.5.4

func (m *Manager) GetRunnerPath(name string) (string, bool)

func (*Manager) IsRunning added in v1.5.1

func (m *Manager) IsRunning(confPath string) bool

func (*Manager) Remove

func (m *Manager) Remove(confPath string) (err error)

func (*Manager) RemoveWithConfig added in v1.3.5

func (m *Manager) RemoveWithConfig(confPath string, isDelete bool) (err error)

func (*Manager) ResetRunner added in v1.4.3

func (m *Manager) ResetRunner(name string) (err error)

ResetRunner 必须在runner实例存在下才可以reset, reset是调用runner本身的方法, 而runner stop实际上是销毁实例,所以先要启动runner

func (*Manager) RestoreWebDir added in v1.2.3

func (m *Manager) RestoreWebDir()

func (*Manager) StartRunner added in v1.4.3

func (m *Manager) StartRunner(name string) (err error)

func (*Manager) StartRunnerWithFilename added in v1.5.4

func (m *Manager) StartRunnerWithFilename(filename string) (err error)

func (*Manager) Status

func (m *Manager) Status() (rss map[string]RunnerStatus)

func (*Manager) StatusAndConfig added in v1.5.4

func (m *Manager) StatusAndConfig() (rs map[string]RunnerStatus, rc map[string]RunnerConfig)

func (*Manager) Stop

func (m *Manager) Stop() error

func (*Manager) StopRunner added in v1.4.3

func (m *Manager) StopRunner(name string) (err error)

func (*Manager) StopRunnerWithFilename added in v1.5.4

func (m *Manager) StopRunnerWithFilename(filename string) error

func (*Manager) UpdateReaderRegister added in v1.5.4

func (m *Manager) UpdateReaderRegister()

func (*Manager) UpdateRunner added in v1.4.3

func (m *Manager) UpdateRunner(name string, conf RunnerConfig) (err error)

func (*Manager) UpdateSenderRegister added in v1.5.5

func (m *Manager) UpdateSenderRegister()

func (*Manager) UpdateToken added in v1.4.4

func (m *Manager) UpdateToken(tokens []AuthTokens) (err error)

func (*Manager) Watch

func (m *Manager) Watch(confsPath []string) (err error)

type ManagerConfig

type ManagerConfig struct {
	BindHost string `json:"bind_host"`

	Idc          string        `json:"idc"`
	Zone         string        `json:"zone"`
	RestDir      string        `json:"rest_dir"`
	Cluster      ClusterConfig `json:"cluster"`
	DisableWeb   bool          `json:"disable_web"`
	ServerBackup bool          `json:"-"`
	AuditDir     string        `json:"audit_dir"`

	CollectLog
}

type MetricConfig added in v1.3.6

type MetricConfig struct {
	MetricType string                 `json:"type"`
	Attributes map[string]bool        `json:"attributes"`
	Config     map[string]interface{} `json:"config"`
}

type MetricRunner

type MetricRunner struct {
	RunnerName string `json:"name"`
	// contains filtered or unexported fields
}

func NewMetricRunner

func NewMetricRunner(rc RunnerConfig, sr *sender.Registry) (runner *MetricRunner, err error)

func (*MetricRunner) Cleaner

func (*MetricRunner) Cleaner() CleanInfo

func (*MetricRunner) Delete added in v1.5.4

func (mr *MetricRunner) Delete() error

func (*MetricRunner) Name

func (mr *MetricRunner) Name() string

func (*MetricRunner) Reset added in v1.3.6

func (mr *MetricRunner) Reset() (err error)

func (*MetricRunner) Run

func (r *MetricRunner) Run()

func (*MetricRunner) Status

func (mr *MetricRunner) Status() (rs RunnerStatus)

func (*MetricRunner) StatusBackup added in v1.3.6

func (mr *MetricRunner) StatusBackup()

func (*MetricRunner) StatusRestore added in v1.3.6

func (mr *MetricRunner) StatusRestore()

func (*MetricRunner) Stop

func (mr *MetricRunner) Stop()

func (*MetricRunner) TokenRefresh added in v1.4.4

func (mr *MetricRunner) TokenRefresh(tokens AuthTokens) error

type PostParseRet added in v1.2.3

type PostParseRet struct {
	SamplePoints []Data `json:"SamplePoints"`
}

PostParseRet 返回值

type RegisterReq added in v1.3.6

type RegisterReq struct {
	Url string `json:"url"`
	Tag string `json:"tag"`
}

type RestService

type RestService struct {
	// contains filtered or unexported fields
}

func NewRestService

func NewRestService(mgr *Manager, router *echo.Echo) *RestService

func (*RestService) ClusterStatus added in v1.3.6

func (rs *RestService) ClusterStatus() echo.HandlerFunc

master API GET /logkit/cluster/status?tag=tagValue&url=urlValue

func (*RestService) DeleteClusterConfig added in v1.3.6

func (rs *RestService) DeleteClusterConfig() echo.HandlerFunc

DELETE /logkti/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) DeleteConfig added in v1.2.3

func (rs *RestService) DeleteConfig() echo.HandlerFunc

Delete /logkit/configs/<name>

func (*RestService) DeleteSlaves added in v1.3.6

func (rs *RestService) DeleteSlaves() echo.HandlerFunc

DELETE /logkit/cluster/slaves?tag=tagValue&url=urlValue

func (*RestService) GetCleanerKeyOptions added in v1.4.8

func (rs *RestService) GetCleanerKeyOptions() echo.HandlerFunc

get /logkit/cleaner/options 获取解析选项

func (*RestService) GetClusterConfig added in v1.4.1

func (rs *RestService) GetClusterConfig() echo.HandlerFunc

master API Get /logkit/cluster/configs:name?tag=tagValue&url=urlValue

func (*RestService) GetClusterConfigs added in v1.3.6

func (rs *RestService) GetClusterConfigs() echo.HandlerFunc

master API Get /logkit/cluster/configs?tag=tagValue&url=urlValue

func (*RestService) GetClusterRunners added in v1.4.0

func (rs *RestService) GetClusterRunners() echo.HandlerFunc

master API GET /logkit/cluster/runners?tag=tagValue&url=urlValue

func (*RestService) GetConfig added in v1.2.3

func (rs *RestService) GetConfig() echo.HandlerFunc

get /logkit/configs/:name

func (*RestService) GetConfigs added in v1.2.3

func (rs *RestService) GetConfigs() echo.HandlerFunc

get /logkit/configs

func (*RestService) GetError added in v1.5.2

func (rs *RestService) GetError() echo.HandlerFunc

get /logkit/errors/<name>

func (*RestService) GetErrorCodeHumanize added in v1.4.0

func (rs *RestService) GetErrorCodeHumanize() echo.HandlerFunc

get /logkit/errorcode

func (*RestService) GetErrors added in v1.5.2

func (rs *RestService) GetErrors() echo.HandlerFunc

get /logkit/errors

func (*RestService) GetMetricKeys added in v1.3.6

func (rs *RestService) GetMetricKeys() echo.HandlerFunc

GET /logkit/metric/keys

func (*RestService) GetMetricOptions added in v1.3.6

func (rs *RestService) GetMetricOptions() echo.HandlerFunc

GET /logkit/metric/options

func (*RestService) GetMetricUsages added in v1.3.6

func (rs *RestService) GetMetricUsages() echo.HandlerFunc

GET /logkit/metric/usages

func (*RestService) GetParserKeyOptions added in v1.2.3

func (rs *RestService) GetParserKeyOptions() echo.HandlerFunc

get /logkit/parser/options 获取解析选项

func (*RestService) GetParserSampleLogs added in v1.2.3

func (rs *RestService) GetParserSampleLogs() echo.HandlerFunc

get /logkit/parser/samplelogs 获取样例日志

func (*RestService) GetParserTooltips added in v1.5.1

func (rs *RestService) GetParserTooltips() echo.HandlerFunc

get /logkit/parser/tooltips 获取解析用途提示

func (*RestService) GetParserUsages added in v1.2.3

func (rs *RestService) GetParserUsages() echo.HandlerFunc

get /logkit/parser/usages 获得解析用途说明

func (*RestService) GetReaderKeyOptions added in v1.2.3

func (rs *RestService) GetReaderKeyOptions() echo.HandlerFunc

get /logkit/reader/options 获取Reader参数配置

func (*RestService) GetReaderTooltips added in v1.5.1

func (rs *RestService) GetReaderTooltips() echo.HandlerFunc

get /logkit/reader/tooltips 获取Reader用途提示

func (*RestService) GetReaderUsages added in v1.2.3

func (rs *RestService) GetReaderUsages() echo.HandlerFunc

get /logkit/reader/usages 获取Reader用途

func (*RestService) GetRunners added in v1.4.0

func (rs *RestService) GetRunners() echo.HandlerFunc

get /logkit/runners

func (*RestService) GetSenderKeyOptions added in v1.2.3

func (rs *RestService) GetSenderKeyOptions() echo.HandlerFunc

get /logkit/sender/options 获取sender配置参数

func (*RestService) GetSenderRouterOption added in v1.4.2

func (rs *RestService) GetSenderRouterOption() echo.HandlerFunc

get /logkit/sender/router/option 获取所有sender router的配置项

func (*RestService) GetSenderRouterUsage added in v1.4.2

func (rs *RestService) GetSenderRouterUsage() echo.HandlerFunc

get /logkit/sender/router/usage 获取所有sender router匹配方式的名字和作用

func (*RestService) GetSenderUsages added in v1.2.3

func (rs *RestService) GetSenderUsages() echo.HandlerFunc

get /logkit/sender/usages 获取sender用途说明

func (*RestService) GetTransformerOptions added in v1.3.2

func (rs *RestService) GetTransformerOptions() echo.HandlerFunc

GET /logkit/transformer/options

func (*RestService) GetTransformerSampleConfigs added in v1.3.2

func (rs *RestService) GetTransformerSampleConfigs() echo.HandlerFunc

GET /logkit/transformer/sampleconfigs

func (*RestService) GetTransformerUsages added in v1.3.2

func (rs *RestService) GetTransformerUsages() echo.HandlerFunc

GET /logkit/transformer/usages

func (*RestService) GetVersion added in v1.3.1

func (rs *RestService) GetVersion() echo.HandlerFunc

func (*RestService) IsMaster added in v1.4.0

func (rs *RestService) IsMaster() echo.HandlerFunc

master API GET /logkit/cluster/ismaster

func (*RestService) Ping added in v1.3.6

func (rs *RestService) Ping() echo.HandlerFunc

master API GET /logkit/cluster/ping

func (*RestService) PostClusterConfig added in v1.3.6

func (rs *RestService) PostClusterConfig() echo.HandlerFunc

POST /logkit/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigReset added in v1.3.6

func (rs *RestService) PostClusterConfigReset() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/reset?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigStart added in v1.3.6

func (rs *RestService) PostClusterConfigStart() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/start?tag=tagValue&url=urlValue

func (*RestService) PostClusterConfigStop added in v1.3.6

func (rs *RestService) PostClusterConfigStop() echo.HandlerFunc

POST /logkit/cluster/configs/<name>/stop?tag=tagValue&url=urlValue

func (*RestService) PostConfig added in v1.2.3

func (rs *RestService) PostConfig() echo.HandlerFunc

post /logkit/configs/<name>

func (*RestService) PostConfigReset added in v1.3.1

func (rs *RestService) PostConfigReset() echo.HandlerFunc

POST /logkit/configs/<name>/reset

func (*RestService) PostConfigStart added in v1.3.5

func (rs *RestService) PostConfigStart() echo.HandlerFunc

POST /logkit/configs/<name>/start

func (*RestService) PostConfigStop added in v1.3.5

func (rs *RestService) PostConfigStop() echo.HandlerFunc

POST /logkit/configs/<name>/stop

func (*RestService) PostParse added in v1.2.3

func (rs *RestService) PostParse() echo.HandlerFunc

post /logkit/parser/parse 接受解析请求

func (*RestService) PostParserCheck added in v1.2.4

func (rs *RestService) PostParserCheck() echo.HandlerFunc

POST /logkit/parser/check

func (*RestService) PostRead added in v1.4.3

func (rs *RestService) PostRead() echo.HandlerFunc

POST /logkit/reader/read 请求校验reader配置

func (*RestService) PostReaderCheck added in v1.2.4

func (rs *RestService) PostReaderCheck() echo.HandlerFunc

POST /logkit/reader/check 请求校验reader配置

func (*RestService) PostRegister added in v1.3.6

func (rs *RestService) PostRegister() echo.HandlerFunc

master API POST /logkit/cluster/register

func (*RestService) PostSend added in v1.4.3

func (rs *RestService) PostSend() echo.HandlerFunc

POST /logkit/sender/send 请求校验sender配置

func (*RestService) PostSenderCheck added in v1.2.4

func (rs *RestService) PostSenderCheck() echo.HandlerFunc

POST /logkit/sender/check 请求校验sender配置

func (*RestService) PostSlaveTag added in v1.3.6

func (rs *RestService) PostSlaveTag() echo.HandlerFunc

POST /logkit/cluster/slaves/tag?tag=tagValue&url=urlValue

func (*RestService) PostTag added in v1.3.6

func (rs *RestService) PostTag() echo.HandlerFunc

slave API POST /logkit/cluster/tag

func (*RestService) PostTransform added in v1.4.1

func (rs *RestService) PostTransform() echo.HandlerFunc

POST /logkit/transformer/transform Transform (multiple logs/single log) in (json array/json object) format with registered transformers Return result string in json array format

func (*RestService) PostTransformerCheck added in v1.4.3

func (rs *RestService) PostTransformerCheck() echo.HandlerFunc

POST /logkit/transformer/check

func (*RestService) PutClusterConfig added in v1.3.6

func (rs *RestService) PutClusterConfig() echo.HandlerFunc

PUT /logkit/cluster/configs/<name>?tag=tagValue&url=urlValue

func (*RestService) PutConfig added in v1.3.1

func (rs *RestService) PutConfig() echo.HandlerFunc

put /logkit/configs/<name>

func (*RestService) Register added in v1.3.6

func (rs *RestService) Register() error

func (*RestService) Slaves added in v1.3.6

func (rs *RestService) Slaves() echo.HandlerFunc

master API GET /logkit/cluster/slaves?tag=tagValue&url=urlValue

func (*RestService) Status added in v1.2.3

func (rs *RestService) Status() echo.HandlerFunc

get /logkit/status

func (*RestService) Stop

func (rs *RestService) Stop()

Stop will stop RestService

type Runner

type Runner interface {
	Name() string
	Run()
	Stop()
	Cleaner() CleanInfo
	Status() RunnerStatus
}

func NewCustomRunner

func NewCustomRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, rr *reader.Registry, pr *parser.Registry, sr *sender.Registry) (runner Runner, err error)

func NewRunner

func NewRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal) (runner Runner, err error)

NewRunner 创建Runner

func NewRunnerWithService

func NewRunnerWithService(info RunnerInfo, reader reader.Reader, cleaner *cleaner.Cleaner, parser parser.Parser, transformers []transforms.Transformer,
	senders []sender.Sender, router *router.Router, meta *reader.Meta) (runner Runner, err error)

type RunnerConfig

type RunnerConfig struct {
	RunnerInfo
	SourceData    string                   `json:"sourceData,omitempty"`
	MetricConfig  []MetricConfig           `json:"metric,omitempty"`
	ReaderConfig  conf.MapConf             `json:"reader"`
	CleanerConfig conf.MapConf             `json:"cleaner,omitempty"`
	ParserConf    conf.MapConf             `json:"parser"`
	Transforms    []map[string]interface{} `json:"transforms,omitempty"`
	SendersConfig []conf.MapConf           `json:"senders"`
	Router        router.RouterConfig      `json:"router,omitempty"`
	IsInWebFolder bool                     `json:"web_folder,omitempty"`
	IsStopped     bool                     `json:"is_stopped,omitempty"`
	IsFromServer  bool                     `json:"from_server,omitempty"` // 判读是否从服务器拉取的配置
	AuditChan     chan<- audit.Message     `json:"-"`
}

RunnerConfig 从多数据源读取,经过解析后,发往多个数据目的地

func Compatible

func Compatible(rc RunnerConfig) RunnerConfig

Compatible 用于新老配置的兼容

func TrimSecretInfo added in v1.4.6

func TrimSecretInfo(conf RunnerConfig, trimSk bool) RunnerConfig

TrimSecretInfo 将配置文件中的 token 等鉴权相关信息去掉

type RunnerErrors added in v1.5.2

type RunnerErrors interface {
	GetErrors() ErrorsResult
}

type RunnerInfo

type RunnerInfo struct {
	RunnerName             string `json:"name"`
	Note                   string `json:"note,omitempty"`
	CollectInterval        int    `json:"collect_interval,omitempty"`           // metric runner收集的频率
	MaxBatchLen            int    `json:"batch_len,omitempty"`                  // 每个read batch的行数
	MaxBatchSize           int    `json:"batch_size,omitempty"`                 // 每个read batch的字节数
	MaxBatchInterval       int    `json:"batch_interval,omitempty"`             // 最大发送时间间隔
	MaxBatchTryTimes       int    `json:"batch_try_times,omitempty"`            // 最大发送次数,小于等于0代表无限重试
	MaxReaderCloseWaitTime int    `json:"max_reader_close_wait_time,omitempty"` // runner 等待reader close时间,
	ErrorsListCap          int    `json:"errors_list_cap"`                      // 记录错误信息的最大条数
	SyncEvery              int    `json:"sync_every,omitempty"`                 // 每多少次sync一下,填小于的0数字表示stop时sync,正整数表示发送成功多少次以后同步,填0或1就是每次发送成功都同步,兼容原来不配置的逻辑
	CreateTime             string `json:"createtime"`
	EnvTag                 string `json:"env_tag,omitempty"` // 用这个字段的值来获取环境变量, 作为 tag 添加到数据中
	ExtraInfo              bool   `json:"extra_info"`
	LogAudit               bool   `json:"log_audit"`
	SendRaw                bool   `json:"send_raw"`            //使用发送原始字符串的接口,而不是Data
	ReadTime               bool   `json:"read_time"`           // 读取时间
	InternalKeyPrefix      string `json:"internal_key_prefix"` // 内置字段名前缀
	MaxLineLen             int64  `json:"max_line_len"`        // 限制单条数据/每个key对应的value大小,防止读取数据/发送数据出错时内存/磁盘占用过大
	IsBlock                bool   `json:"is_block"`            // 阻塞式发送
}

type RunnerStatus

type RunnerStatus struct {
	Name           string               `json:"name"`
	Logpath        string               `json:"logpath"`
	ReadDataSize   int64                `json:"readDataSize"`
	ReadDataCount  int64                `json:"readDataCount"`
	Elaspedtime    float64              `json:"elaspedtime"`
	Lag            LagInfo              `json:"lag"`
	ReaderStats    StatsInfo            `json:"readerStats"`
	ParserStats    StatsInfo            `json:"parserStats"`
	SenderStats    map[string]StatsInfo `json:"senderStats"`
	TransformStats map[string]StatsInfo `json:"transformStats"`
	Error          string               `json:"error,omitempty"`

	ReadSpeedKB      int64  `json:"readspeed_kb"`
	ReadSpeed        int64  `json:"readspeed"`
	ReadSpeedTrendKb string `json:"readspeedtrend_kb"`
	ReadSpeedTrend   string `json:"readspeedtrend"`
	RunningStatus    string `json:"runningStatus"`
	Tag              string `json:"tag,omitempty"`
	Url              string `json:"url,omitempty"`

	//仅作为将history error同步上传到服务端时使用
	HistorySyncErrors CompatibleErrorResult `json:"history_errors"`
	// contains filtered or unexported fields
}

RunnerStatus runner运行状态,添加字段请在clone函数中相应添加

func (*RunnerStatus) Clone added in v1.4.8

func (src *RunnerStatus) Clone() RunnerStatus

Clone 复制出一个完整的RunnerStatus

type Service added in v1.2.3

type Service struct {
	Prefix string
}

type Slave added in v1.3.6

type Slave struct {
	Url       string    `json:"url"`
	Tag       string    `json:"tag"`
	Status    string    `json:"status"`
	LastTouch time.Time `json:"last_touch"`
}

type SlaveConfig added in v1.3.6

type SlaveConfig struct {
	Configs map[string]RunnerConfig `json:"configs"`
	Tag     string                  `json:"tag"`
	Err     string                  `json:"error"`
}

type StatusPersistable added in v1.3.5

type StatusPersistable interface {
	StatusBackup()
	StatusRestore()
}

type TagReq added in v1.3.6

type TagReq struct {
	Tag string `json:"tag"`
}

type TokenRefreshable added in v1.4.4

type TokenRefreshable interface {
	TokenRefresh(AuthTokens) error
}

type Version added in v1.3.1

type Version struct {
	Version string `json:"version"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL