reader

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2018 License: Apache-2.0 Imports: 46 Imported by: 0

Documentation

Overview

Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.

Index

Constants

View Source
const (
	KeyHttpServiceAddress = "http_service_address"
	KeyHttpServicePath    = "http_service_path"

	DefaultHttpServiceAddress = ":4000"
	DefaultHttpServicePath    = "/logkit/data"

	DefaultSyncEvery       = 10
	DefaultMaxBodySize     = 100 * 1024 * 1024
	DefaultMaxBytesPerFile = 500 * 1024 * 1024
	DefaultWriteSpeedLimit = 10 * 1024 * 1024 // 默认写速限制为10MB

	ContentTypeHeader     = "Content-Type"
	ContentEncodingHeader = "Content-Encoding"
)
View Source
const (
	KeyLogPath       = "log_path"
	KeyMetaPath      = "meta_path"
	KeyFileDone      = "file_done"
	KeyMode          = "mode"
	KeyBufSize       = "reader_buf_size"
	KeyWhence        = "read_from"
	KeyEncoding      = "encoding"
	KeyReadIOLimit   = "readio_limit"
	KeyDataSourceTag = "datasource_tag"
	KeyHeadPattern   = "head_pattern"
	KeyRunnerName    = "runner_name"

	// 忽略隐藏文件
	KeyIgnoreHiddenFile = "ignore_hidden"
	KeyIgnoreFileSuffix = "ignore_file_suffix"
	KeyValidFilePattern = "valid_file_pattern"

	KeyExpire       = "expire"
	KeyMaxOpenFiles = "max_open_files"
	KeyStatInterval = "stat_interval"

	KeyMysqlOffsetKey   = "mysql_offset_key"
	KeyMysqlReadBatch   = "mysql_limit_batch"
	KeyMysqlDataSource  = "mysql_datasource"
	KeyMysqlDataBase    = "mysql_database"
	KeyMysqlSQL         = "mysql_sql"
	KeyMysqlCron        = "mysql_cron"
	KeyMysqlExecOnStart = "mysql_exec_onstart"

	KeySQLSchema = "sql_schema"

	KeyMssqlOffsetKey   = "mssql_offset_key"
	KeyMssqlReadBatch   = "mssql_limit_batch"
	KeyMssqlDataSource  = "mssql_datasource"
	KeyMssqlDataBase    = "mssql_database"
	KeyMssqlSQL         = "mssql_sql"
	KeyMssqlCron        = "mssql_cron"
	KeyMssqlExecOnStart = "mssql_exec_onstart"

	KeyPGsqlOffsetKey   = "postgres_offset_key"
	KeyPGsqlReadBatch   = "postgres_limit_batch"
	KeyPGsqlDataSource  = "postgres_datasource"
	KeyPGsqlDataBase    = "postgres_database"
	KeyPGsqlSQL         = "postgres_sql"
	KeyPGsqlCron        = "postgres_cron"
	KeyPGsqlExecOnStart = "postgres_exec_onstart"

	KeyESReadBatch = "es_limit_batch"
	KeyESIndex     = "es_index"
	KeyESType      = "es_type"
	KeyESHost      = "es_host"
	KeyESKeepAlive = "es_keepalive"
	KeyESVersion   = "es_version"

	KeyMongoHost        = "mongo_host"
	KeyMongoDatabase    = "mongo_database"
	KeyMongoCollection  = "mongo_collection"
	KeyMongoOffsetKey   = "mongo_offset_key"
	KeyMongoReadBatch   = "mongo_limit_batch"
	KeyMongoCron        = "mongo_cron"
	KeyMongoExecOnstart = "mongo_exec_onstart"
	KeyMongoFilters     = "mongo_filters"
	KeyMongoCert        = "mongo_cacert"

	KeyKafkaGroupID          = "kafka_groupid"
	KeyKafkaTopic            = "kafka_topic"
	KeyKafkaZookeeper        = "kafka_zookeeper"
	KeyKafkaZookeeperTimeout = "kafka_zookeeper_timeout"
)

FileReader's conf keys

View Source
const (
	ModeDir      = "dir"
	ModeFile     = "file"
	ModeTailx    = "tailx"
	ModeFileAuto = "fileauto"
	ModeMysql    = "mysql"
	ModeMssql    = "mssql"
	ModePG       = "postgres"
	ModeElastic  = "elastic"
	ModeMongo    = "mongo"
	ModeKafka    = "kafka"
	ModeRedis    = "redis"
	ModeSocket   = "socket"
	ModeHttp     = "http"
)

FileReader's modes

View Source
const (
	ReadModeHeadPatternString = "mode_head_pattern_string"
	ReadModeHeadPatternRegexp = "mode_head_pattern_regexp"
)
View Source
const (
	WhenceOldest = "oldest"
	WhenceNewest = "newest"
)

KeyWhence 的可选项

View Source
const (
	DateTypeHash          = "hash"
	DateTypeSortedSet     = "sortedSet"
	DataTypeSet           = "set"
	DataTypeString        = "string"
	DataTypeList          = "list"
	DataTypeChannel       = "channel"
	DataTypePatterChannel = "pattern_channel"
)
View Source
const (
	KeyRedisDataType   = "redis_datatype" // 必填
	KeyRedisDB         = "redis_db"       //默认 是0
	KeyRedisKey        = "redis_key"      //必填
	KeyRedisHashArea   = "redisHash_area"
	KeyRedisAddress    = "redis_address" // 默认127.0.0.1:6379
	KeyRedisPassword   = "redis_password"
	KeyTimeoutDuration = "redis_timeout"
)
View Source
const (

	// 监听的url形式包括:
	// socket_service_address = "tcp://:3110"
	// socket_service_address = "tcp://127.0.0.1:http"
	// socket_service_address = "tcp4://:3110"
	// socket_service_address = "tcp6://:3110"
	// socket_service_address = "tcp6://[2001:db8::1]:3110"
	// socket_service_address = "udp://:3110"
	// socket_service_address = "udp4://:3110"
	// socket_service_address = "udp6://:3110"
	// socket_service_address = "unix:///tmp/sys.sock"
	// socket_service_address = "unixgram:///tmp/sys.sock"
	KeySocketServiceAddress = "socket_service_address"

	// 最大并发连接数
	// 仅用于 stream sockets (e.g. TCP).
	// 0 (default) 为无限制.
	// socket_max_connections = 1024
	KeySocketMaxConnections = "socket_max_connections"

	// 读的超时时间
	// 仅用于 stream sockets (e.g. TCP).
	// 0 (default) 为没有超时
	// socket_read_timeout = "30s"
	KeySocketReadTimeout = "socket_read_timeout"

	// Socket的Buffer大小,默认65535
	// socket_read_buffer_size = 65535
	KeySocketReadBufferSize = "socket_read_buffer_size"

	// TCP连接的keep_alive时长
	// 0 表示关闭keep_alive
	// 默认5分钟
	KeySocketKeepAlivePeriod = "socket_keep_alive_period"
)
View Source
const (
	StatusInit int32 = iota
	StatusStopped
	StatusStopping
	StatusRunning
)
View Source
const DirMode = "dir"

DirMode 按时间顺序顺次读取文件夹下所有文件的模式

View Source
const FileMode = "file"

FileMode 读取单个文件模式

View Source
const (
	Loop = "loop"
)
View Source
const (
	MaxHeadPatternBufferSize = 20 * 1024 * 1024
)
View Source
const (
	ModeMetrics = "metrics"
)
View Source
const (
	MongoDefaultOffsetKey = "_id"
)
View Source
const (
	SQL_SPLITER = ";"
)

Variables

View Source
var (
	ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte")
	ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune")
	ErrBufferFull        = errors.New("bufio: buffer full")
	ErrNegativeCount     = errors.New("bufio: negative count")
)
View Source
var (
	ElasticVersion3 = "3.x"
	ElasticVersion5 = "5.x"
	ElasticVersion6 = "6.x"
)
View Source
var (
	OptionMetaPath = utils.Option{
		KeyName:      KeyMetaPath,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "logkit元数据路径(meta_path)",
	}
	OptionDataSourceTag = utils.Option{
		KeyName:      KeyDataSourceTag,
		ChooseOnly:   false,
		Default:      "datasource",
		DefaultNoUse: false,
		Description:  "数据来源标签(datasource_tag)",
	}
	OptionBuffSize = utils.Option{
		KeyName:      KeyBufSize,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "文件缓存数据大小(reader_buf_size)",
		CheckRegex:   "\\d+",
	}
	OptionEncoding = utils.Option{
		KeyName:    KeyEncoding,
		ChooseOnly: true,
		ChooseOptions: []interface{}{"UTF-8", "UTF-16", "US-ASCII", "ISO-8859-1",
			"GBK", "GB18030", "EUC-JP", "UTF-16BE", "UTF-16LE", "Big5", "Shift_JIS",
			"ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7",
			"ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-12", "ISO-8859-13",
			"ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "macos-0_2-10.2", "macos-6_2-10.4",
			"macos-7_3-10.2", "macos-29-10.2", "macos-35-10.2", "windows-1250", "windows-1251",
			"windows-1252", "windows-1253", "windows-1254", "windows-1255", "windows-1256",
			"windows-1257", "windows-1258", "windows-874", "IBM037", "ibm-273_P100-1995",
			"ibm-277_P100-1995", "ibm-278_P100-1995", "ibm-280_P100-1995", "ibm-284_P100-1995",
			"ibm-285_P100-1995", "ibm-290_P100-1995", "ibm-297_P100-1995", "ibm-420_X120-1999",

			"KOI8-R", "KOI8-U", "ebcdic-xml-us"},
		Default:      "UTF-8",
		DefaultNoUse: false,
		Description:  "编码方式(encoding)",
	}
	OptionWhence = utils.Option{
		KeyName:       KeyWhence,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{WhenceOldest, WhenceNewest},
		Default:       WhenceOldest,
		Description:   "读取的起始位置(read_from)",
	}
	OptionReadIoLimit = utils.Option{
		KeyName:      KeyReadIOLimit,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "读取速度限制(MB/s)(readio_limit)",
		CheckRegex:   "\\d+",
	}
	OptionHeadPattern = utils.Option{
		KeyName:      KeyHeadPattern,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "多行读取的起始行正则表达式(head_pattern)",
	}
	OptionSQLSchema = utils.Option{
		KeyName:      KeySQLSchema,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "SQL字段类型定义(sql_schema)",
	}
)
View Source
var ErrFileNotDir = errors.New("file is not directory")
View Source
var ErrFileNotRegular = errors.New("file is not regular")
View Source
var ErrMetaFileRead = errors.New("cannot read meta file")
View Source
var ErrNoFileChosen = errors.New("no files found")
View Source
var ErrStopped = errors.New("runner stopped")
View Source
var ModeKeyOptions = map[string][]utils.Option{
	ModeDir: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "/home/users/john/log/",
			DefaultNoUse: true,
			Description:  "日志文件夹路径(log_path)",
		},
		OptionMetaPath,
		OptionBuffSize,
		OptionWhence,
		OptionEncoding,
		OptionDataSourceTag,
		OptionReadIoLimit,
		OptionHeadPattern,
		{
			KeyName:      KeyFileDone,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "读取过的文件信息保存路径(file_done)",
		},
		{
			KeyName:       KeyIgnoreHiddenFile,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否忽略隐藏文件(ignore_hidden)",
		},
		{
			KeyName:      KeyIgnoreFileSuffix,
			ChooseOnly:   false,
			Default:      strings.Join(defaultIgnoreFileSuffix, ","),
			DefaultNoUse: false,
			Description:  "根据后缀忽略文件(ignore_file_suffix)",
		},
		{
			KeyName:      KeyValidFilePattern,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "根据正则表达式匹配文件(valid_file_pattern)",
		},
	},
	ModeFile: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "/home/users/john/log/my.log",
			DefaultNoUse: true,
			Description:  "日志文件路径(log_path)",
		},
		OptionMetaPath,
		OptionBuffSize,
		OptionWhence,
		OptionDataSourceTag,
		OptionEncoding,
		OptionReadIoLimit,
		OptionHeadPattern,
	},
	ModeTailx: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "/home/users/*/mylog/*.log",
			DefaultNoUse: true,
			Description:  "日志文件路径模式串(log_path)",
		},
		OptionMetaPath,
		OptionBuffSize,
		OptionWhence,
		OptionEncoding,
		OptionReadIoLimit,
		OptionDataSourceTag,
		OptionHeadPattern,
		{
			KeyName:      KeyExpire,
			ChooseOnly:   false,
			Default:      "24h",
			DefaultNoUse: false,
			Description:  "文件过期时间(时h,分m,秒s)(expire)",
			CheckRegex:   "\\d+[hms]",
		},
		{
			KeyName:      KeyMaxOpenFiles,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "最大的打开文件数(max_open_files)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyStatInterval,
			ChooseOnly:   false,
			Default:      "3m",
			DefaultNoUse: false,
			Description:  "文件扫描间隔(stat_interval)",
			CheckRegex:   "\\d+[hms]",
		},
	},
	ModeFileAuto: {
		{
			KeyName:      KeyLogPath,
			ChooseOnly:   false,
			Default:      "/your/log/dir/or/path*.log",
			DefaultNoUse: true,
			Description:  "日志文件夹路径(log_path)",
		},
		OptionMetaPath,
		OptionWhence,
		OptionEncoding,
		OptionDataSourceTag,
		{
			KeyName:      KeyHeadPattern,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "多行读取的起始行正则表达式(head_pattern)",
		},
		{
			KeyName:      KeyValidFilePattern,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "根据正则表达式匹配文件(valid_file_pattern)",
		},
	},
	ModeMysql: {
		{
			KeyName:      KeyMysqlDataSource,
			ChooseOnly:   false,
			Default:      "<username>:<password>@tcp(<hostname>:<port>)",
			DefaultNoUse: true,
			Description:  "数据库地址(mysql_datasource)",
		},
		{
			KeyName:      KeyMysqlDataBase,
			ChooseOnly:   false,
			Default:      "<database>",
			DefaultNoUse: true,
			Description:  "数据库名称(mysql_database)",
		},
		{
			KeyName:      KeyMysqlSQL,
			ChooseOnly:   false,
			Default:      "select * from <table>;",
			DefaultNoUse: true,
			Description:  "数据查询语句(mysql_sql)",
		},
		{
			KeyName:      KeyMysqlOffsetKey,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "递增的列名称(mysql_offset_key)",
		},
		{
			KeyName:      KeyMysqlReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mysql_limit_batch)",
			CheckRegex:   "\\d+",
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyMysqlCron,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "定时任务调度Cron(mysql_cron)",
		},
		{
			KeyName:       KeyMysqlExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mysql_exec_onstart)",
		},
		OptionSQLSchema,
	},
	ModeMssql: {
		{
			KeyName:      KeyMssqlDataSource,
			ChooseOnly:   false,
			Default:      "server=<hostname or instance>;user id=<username>;password=<password>;port=<port>",
			DefaultNoUse: true,
			Description:  "数据库地址(mssql_datasource)",
		},
		{
			KeyName:      KeyMssqlDataBase,
			ChooseOnly:   false,
			Default:      "<database>",
			DefaultNoUse: true,
			Description:  "数据库名称(mssql_database)",
		},
		{
			KeyName:      KeyMssqlSQL,
			ChooseOnly:   false,
			Default:      "select * from <table>;",
			DefaultNoUse: true,
			Description:  "数据查询语句(mssql_sql)",
		},
		{
			KeyName:      KeyMssqlOffsetKey,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: true,
			Description:  "递增的列名称(mssql_offset_key)",
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyMssqlReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mssql_limit_batch)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyMssqlCron,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "定时任务调度Crontab(mssql_cron)",
		},
		{
			KeyName:       KeyMssqlExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mssql_exec_onstart)",
		},
		OptionSQLSchema,
	},
	ModePG: {
		{
			KeyName:      KeyPGsqlDataSource,
			ChooseOnly:   false,
			Default:      "host=localhost port=5432 connect_timeout=10 user=pqgotest password=123456 sslmode=disable",
			DefaultNoUse: true,
			Description:  "数据库地址(postgres_datasource)",
		},
		{
			KeyName:      KeyPGsqlDataBase,
			ChooseOnly:   false,
			Default:      "<database>",
			DefaultNoUse: true,
			Description:  "数据库名称(postgres_database)",
		},
		{
			KeyName:      KeyPGsqlSQL,
			ChooseOnly:   false,
			Default:      "select * from <table>;",
			DefaultNoUse: true,
			Description:  "数据查询语句(postgres_sql)",
		},
		{
			KeyName:      KeyPGsqlOffsetKey,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: true,
			Description:  "递增的列名称(postgres_offset_key)",
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyPGsqlReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(postgres_limit_batch)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyPGsqlCron,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "定时任务调度Crontab(postgres_cron)",
		},
		{
			KeyName:       KeyPGsqlExecOnStart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(postgres_exec_onstart)",
		},
		OptionSQLSchema,
	},
	ModeElastic: {
		{
			KeyName:      KeyESHost,
			ChooseOnly:   false,
			Default:      "http://localhost:9200",
			DefaultNoUse: true,
			Description:  "数据库地址(es_host)",
		},
		{
			KeyName:       KeyESVersion,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6},
			Description:   "ES版本号(es_version)",
		},
		{
			KeyName:      KeyESIndex,
			ChooseOnly:   false,
			Default:      "app-repo-123",
			DefaultNoUse: true,
			Description:  "ES索引名称(es_index)",
		},
		{
			KeyName:      KeyESType,
			ChooseOnly:   false,
			Default:      "type_app",
			DefaultNoUse: true,
			Description:  "ES的app名称(es_type)",
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyESReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(es_limit_batch)",
		},
		{
			KeyName:      KeyESKeepAlive,
			ChooseOnly:   false,
			Default:      "1d",
			DefaultNoUse: false,
			Description:  "ES的Offset保存时间(es_keepalive)",
			CheckRegex:   "\\d+[dms]",
		},
	},
	ModeMongo: {
		{
			KeyName:      KeyMongoHost,
			ChooseOnly:   false,
			Default:      "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]",
			DefaultNoUse: true,
			Description:  "数据库地址(mongo_host)",
		},
		{
			KeyName:      KeyMongoDatabase,
			ChooseOnly:   false,
			Default:      "app123",
			DefaultNoUse: true,
			Description:  "数据库名称(mongo_database)",
		},
		{
			KeyName:      KeyMongoCollection,
			ChooseOnly:   false,
			Default:      "collection1",
			DefaultNoUse: true,
			Description:  "数据表名称(mongo_collection)",
		},
		{
			KeyName:      KeyMongoOffsetKey,
			ChooseOnly:   false,
			Default:      "_id",
			DefaultNoUse: true,
			Description:  "递增的主键(mongo_offset_key)",
		},
		OptionMetaPath,
		OptionDataSourceTag,
		{
			KeyName:      KeyMongoReadBatch,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "分批查询的单批次大小(mongo_limit_batch)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyMongoCron,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "定时任务调度Cron(mongo_cron)",
		},
		{
			KeyName:       KeyMongoExecOnstart,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "启动时立即执行(mongo_exec_onstart)",
		},
		{
			KeyName:      KeyMongoFilters,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "数据过滤方式(mongo_filters)",
		},
	},
	ModeKafka: {
		{
			KeyName:      KeyKafkaGroupID,
			ChooseOnly:   false,
			Default:      "logkit1",
			DefaultNoUse: true,
			Description:  "Kafka的consumer组名称(kafka_groupid)",
		},
		{
			KeyName:      KeyKafkaTopic,
			ChooseOnly:   false,
			Default:      "test_topic1",
			DefaultNoUse: true,
			Description:  "Kafka的topic名称(kafka_topic)",
		},
		{
			KeyName:      KeyKafkaZookeeper,
			ChooseOnly:   false,
			Default:      "localhost:2181",
			DefaultNoUse: true,
			Description:  "Zookeeper地址(kafka_zookeeper)",
		},
		OptionWhence,
		{
			KeyName:      KeyKafkaZookeeperTimeout,
			ChooseOnly:   false,
			Default:      "1",
			DefaultNoUse: false,
			Description:  "zookeeper超时时间(秒)(kafka_zookeeper_timeout)",
		},
		OptionDataSourceTag,
	},
	ModeRedis: {
		{
			KeyName:       KeyRedisDataType,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{DataTypeList, DataTypeChannel, DataTypePatterChannel, DataTypeString, DataTypeSet, DateTypeSortedSet, DateTypeHash},
			Description:   "Redis的数据读取模式(redis_datatype)",
		},
		{
			KeyName:      KeyRedisDB,
			ChooseOnly:   false,
			Default:      "0",
			DefaultNoUse: true,
			Description:  "数据库名称(redis_db)",
		},
		{
			KeyName:      KeyRedisKey,
			ChooseOnly:   false,
			Default:      "key1",
			DefaultNoUse: true,
			Description:  "redis键(redis_key)",
		},
		{
			KeyName:      KeyRedisHashArea,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "hash模式对应redis的hash数据结构的域(redisHash_area)",
		},
		{
			KeyName:      KeyRedisAddress,
			ChooseOnly:   false,
			Default:      "127.0.0.1:6379",
			DefaultNoUse: false,
			Description:  "数据库地址(redis_address)",
		},
		{
			KeyName:      KeyRedisPassword,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "密码(redis_password)",
		},
		{
			KeyName:      KeyTimeoutDuration,
			ChooseOnly:   false,
			Default:      "5s",
			DefaultNoUse: false,
			Description:  "单次读取超时时间(m(分)、s(秒))(redis_timeout)",
			CheckRegex:   "\\d+[ms]",
		},
		OptionDataSourceTag,
	},
	ModeSocket: {
		{
			KeyName:      KeySocketServiceAddress,
			ChooseOnly:   false,
			Default:      "tcp://127.0.0.1:3110",
			DefaultNoUse: true,
			Description:  "socket监听的地址(协议://端口)(socket_service_address)",
		},
		{
			KeyName:      KeySocketMaxConnections,
			ChooseOnly:   false,
			Default:      "0",
			DefaultNoUse: false,
			Description:  "最大并发连接数(tcp)(socket_max_connections)",
		},
		{
			KeyName:      KeySocketReadTimeout,
			ChooseOnly:   false,
			Default:      "1m",
			DefaultNoUse: false,
			Description:  "连接超时时间(0为不超时)(socket_read_timeout)",
		},
		{
			KeyName:      KeySocketReadBufferSize,
			ChooseOnly:   false,
			Default:      "65535",
			DefaultNoUse: false,
			Description:  "连接缓存大小(udp)(socket_read_buffer_size)",
		},
		{
			KeyName:      KeySocketKeepAlivePeriod,
			ChooseOnly:   false,
			Default:      "5m",
			DefaultNoUse: false,
			Description:  "连接保持时长(0为关闭)(socket_keep_alive_period)",
		},
		OptionDataSourceTag,
	},
	ModeHttp: {
		{
			KeyName:      KeyHttpServiceAddress,
			ChooseOnly:   false,
			Default:      DefaultHttpServiceAddress,
			DefaultNoUse: true,
			Description:  "监听的地址和端口(<ip 或者 host 或者留空>:port)(http_service_address)",
		},
		{
			KeyName:      KeyHttpServicePath,
			ChooseOnly:   false,
			Default:      DefaultHttpServicePath,
			DefaultNoUse: true,
			Description:  "监听地址前缀(http_service_path)",
		},
	},
}
View Source
var ModeUsages = []utils.KeyValue{
	{ModeFileAuto, "从文件读取( fileauto 模式)"},
	{ModeDir, "从文件读取( dir 模式)"},
	{ModeFile, "从文件读取( file 模式)"},
	{ModeTailx, "从文件读取( tailx 模式)"},
	{ModeMysql, "从 MySQL 读取"},
	{ModeMssql, "从 MSSQL 读取"},
	{ModePG, "从 PostgreSQL 读取"},
	{ModeElastic, "从 Elasticsearch 读取"},
	{ModeMongo, "从 MongoDB 读取"},
	{ModeKafka, "从 Kafka 读取"},
	{ModeRedis, "从 Redis 读取"},
	{ModeSocket, "从 Socket 读取"},
	{ModeHttp, "从 http 请求中读取"},
}

ModeUsages 用途说明

View Source
var WaitNoSuchFile = time.Second

Functions

func HeadPatternMode

func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)

Types

type ActiveReader

type ActiveReader struct {
	// contains filtered or unexported fields
}

func NewActiveReader

func NewActiveReader(originPath, realPath, whence string, meta *Meta, msgChan chan<- Result) (ar *ActiveReader, err error)

func (*ActiveReader) Close

func (ar *ActiveReader) Close() error

func (*ActiveReader) Run

func (ar *ActiveReader) Run()

func (*ActiveReader) Status added in v1.3.1

func (ar *ActiveReader) Status() utils.StatsInfo

func (*ActiveReader) SyncMeta

func (ar *ActiveReader) SyncMeta() string

除了sync自己的bufreader,还要sync一行linecache

type BufReader

type BufReader struct {
	// contains filtered or unexported fields
}

BufReader implements buffering for an FileReader object.

func NewReaderSize

func NewReaderSize(rd FileReader, meta *Meta, size int) (*BufReader, error)

NewReaderSize returns a new Reader whose buffer has at least the specified size. If the argument FileReader is already a Reader with large enough size, it returns the underlying Reader.

func (*BufReader) Close

func (b *BufReader) Close() error

func (*BufReader) Name

func (b *BufReader) Name() string

func (*BufReader) ReadLine

func (b *BufReader) ReadLine() (ret string, err error)

ReadLine returns a string line as a normal Reader

func (*BufReader) ReadPattern

func (b *BufReader) ReadPattern() (string, error)

ReadPattern读取日志直到匹配行首模式串

func (*BufReader) ReadString

func (b *BufReader) ReadString(delim byte) (ret string, err error)

ReadString reads until the first occurrence of delim in the input, returning a string containing the data up to and including the delimiter. If ReadString encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadString returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient.

func (*BufReader) SetMode

func (b *BufReader) SetMode(mode string, v interface{}) (err error)

func (*BufReader) Source

func (b *BufReader) Source() string

func (*BufReader) Status added in v1.3.1

func (b *BufReader) Status() utils.StatsInfo

func (*BufReader) SyncMeta

func (b *BufReader) SyncMeta()

type CollectionFilter

type CollectionFilter map[string]interface{}

CollectionFilter is just a typed map of strings of map[string]interface{}

type ElasticReader

type ElasticReader struct {
	// contains filtered or unexported fields
}

func NewESReader

func NewESReader(meta *Meta, readBatch int, estype, esindex, eshost, esVersion, keepAlive string) (er *ElasticReader, err error)

func (*ElasticReader) Close

func (er *ElasticReader) Close() (err error)

func (*ElasticReader) Name

func (er *ElasticReader) Name() string

func (*ElasticReader) ReadLine

func (er *ElasticReader) ReadLine() (data string, err error)

func (*ElasticReader) SetMode

func (er *ElasticReader) SetMode(mode string, v interface{}) error

func (*ElasticReader) Source

func (er *ElasticReader) Source() string

func (*ElasticReader) Start

func (er *ElasticReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*ElasticReader) Status added in v1.3.1

func (er *ElasticReader) Status() utils.StatsInfo

func (*ElasticReader) SyncMeta

func (er *ElasticReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type FileReader

type FileReader interface {
	Name() string
	Source() string
	Read(p []byte) (n int, err error)
	Close() error
	SyncMeta() error
}

FileReader reader 接口方法

type HttpReader added in v1.4.2

type HttpReader struct {
	// contains filtered or unexported fields
}

func NewHttpReader added in v1.4.2

func NewHttpReader(meta *Meta, conf conf.MapConf) (*HttpReader, error)

func (*HttpReader) Close added in v1.4.2

func (h *HttpReader) Close() error

func (*HttpReader) Name added in v1.4.2

func (h *HttpReader) Name() string

func (*HttpReader) ReadLine added in v1.4.2

func (h *HttpReader) ReadLine() (data string, err error)

func (*HttpReader) SetMode added in v1.4.2

func (h *HttpReader) SetMode(mode string, v interface{}) error

func (*HttpReader) Source added in v1.4.2

func (h *HttpReader) Source() string

func (*HttpReader) Start added in v1.4.2

func (h *HttpReader) Start() error

func (*HttpReader) SyncMeta added in v1.4.2

func (h *HttpReader) SyncMeta()

type KafkaReader

type KafkaReader struct {
	ConsumerGroup    string
	Topics           []string
	ZookeeperPeers   []string
	ZookeeperChroot  string
	ZookeeperTimeout time.Duration
	Whence           string

	Consumer *consumergroup.ConsumerGroup
	// contains filtered or unexported fields
}

func NewKafkaReader

func NewKafkaReader(meta *Meta, consumerGroup string,
	topics []string, zookeeper []string, zookeeperTimeout time.Duration, whence string) (kr *KafkaReader, err error)

func (*KafkaReader) Close

func (kr *KafkaReader) Close() (err error)

func (*KafkaReader) Name

func (kr *KafkaReader) Name() string

func (*KafkaReader) ReadLine

func (kr *KafkaReader) ReadLine() (data string, err error)

func (*KafkaReader) SetMode

func (kr *KafkaReader) SetMode(mode string, v interface{}) error

func (*KafkaReader) Source

func (kr *KafkaReader) Source() string

func (*KafkaReader) Start

func (kr *KafkaReader) Start()

func (*KafkaReader) Status added in v1.3.1

func (kr *KafkaReader) Status() utils.StatsInfo

func (*KafkaReader) SyncMeta

func (kr *KafkaReader) SyncMeta()

type LastSync

type LastSync struct {
	// contains filtered or unexported fields
}

type Meta

type Meta struct {
	RunnerName string
	// contains filtered or unexported fields
}

func NewMeta

func NewMeta(metadir, filedonedir, logpath, mode string, donefileRetention int) (m *Meta, err error)

func NewMetaWithConf

func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error)

func (*Meta) AppendDeleteFile

func (m *Meta) AppendDeleteFile(path string) (err error)

func (*Meta) AppendDoneFile

func (m *Meta) AppendDoneFile(path string) (err error)

AppendDoneFile 将处理完的文件写入doneFile中

func (*Meta) BufFile

func (m *Meta) BufFile() string

BufFile 返回buf的文件路径

func (*Meta) BufMetaFile

func (m *Meta) BufMetaFile() string

BufMetaFile 返回buf的meta文件路径

func (*Meta) CacheLineFile

func (m *Meta) CacheLineFile() string

func (*Meta) Clear

func (m *Meta) Clear() error

Clear 删除所有meta信息

func (*Meta) DeleteDoneFile

func (m *Meta) DeleteDoneFile(path string) error

func (*Meta) DeleteFile

func (m *Meta) DeleteFile() string

DeleteFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFile

func (m *Meta) DoneFile() string

DoneFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFilePath

func (m *Meta) DoneFilePath() string

DoneFilePath 返回meta的filedone文件的存放目录

func (*Meta) FtSaveLogPath added in v1.3.5

func (m *Meta) FtSaveLogPath() string

FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径

func (*Meta) GetDataSourceTag

func (m *Meta) GetDataSourceTag() string

func (*Meta) GetDoneFiles

func (m *Meta) GetDoneFiles() (doneFiles []utils.File, err error)

func (*Meta) GetEncodingWay

func (m *Meta) GetEncodingWay() (e string)

GetEncodingWay 获取文件编码方式

func (*Meta) GetMode

func (m *Meta) GetMode() string

func (*Meta) IsDoneFile

func (m *Meta) IsDoneFile(file string) bool

IsDoneFile 返回是否是Donefile格式的文件

func (*Meta) IsExist

func (m *Meta) IsExist() bool

func (*Meta) IsFileMode added in v1.3.1

func (m *Meta) IsFileMode() bool

func (*Meta) IsNotExist

func (m *Meta) IsNotExist() bool

IsNotExist meta 不存在,用来判断是第一次创建

func (*Meta) IsNotValid

func (m *Meta) IsNotValid() bool

IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏

func (*Meta) IsValid

func (m *Meta) IsValid() bool

func (*Meta) LogPath

func (m *Meta) LogPath() string

func (*Meta) MetaFile

func (m *Meta) MetaFile() string

MetaFile 返回metaFileoffset 的meta文件地址

func (*Meta) ReadBuf

func (m *Meta) ReadBuf(buf []byte) (n int, err error)

func (*Meta) ReadBufMeta

func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)

func (*Meta) ReadCacheLine

func (m *Meta) ReadCacheLine() ([]byte, error)

func (*Meta) ReadOffset

func (m *Meta) ReadOffset() (currFile string, offset int64, err error)

ReadOffset 读取当前读取的文件和offset

func (*Meta) ReadStatistic added in v1.3.5

func (m *Meta) ReadStatistic() (stat Statistic, err error)

func (*Meta) Reset added in v1.2.4

func (b *Meta) Reset() error

func (*Meta) SetEncodingWay

func (m *Meta) SetEncodingWay(e string)

SetEncodingWay 设置文件编码方式,默认为 utf-8

func (*Meta) StatisticFile added in v1.3.5

func (m *Meta) StatisticFile() string

StatisticFile 返回 Runner 统计信息的文件路径

func (*Meta) WriteBuf

func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)

func (*Meta) WriteCacheLine

func (m *Meta) WriteCacheLine(lines string) error

func (*Meta) WriteOffset

func (m *Meta) WriteOffset(currFile string, offset int64) (err error)

WriteOffset 将当前文件和offset写入meta中

func (*Meta) WriteStatistic added in v1.3.5

func (m *Meta) WriteStatistic(stat *Statistic) error

type MongoReader

type MongoReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewMongoReader

func NewMongoReader(meta *Meta, readBatch int, host, database, collection, offsetkey, cronSched, filters, certfile string, execOnStart bool) (mr *MongoReader, err error)

func (*MongoReader) Close

func (mr *MongoReader) Close() (err error)

func (*MongoReader) LoopRun added in v1.3.3

func (mr *MongoReader) LoopRun()

func (*MongoReader) Name

func (mr *MongoReader) Name() string

func (*MongoReader) ReadLine

func (mr *MongoReader) ReadLine() (data string, err error)

func (*MongoReader) SetMode

func (mr *MongoReader) SetMode(mode string, v interface{}) error

func (*MongoReader) Source

func (mr *MongoReader) Source() string

func (*MongoReader) Start

func (mr *MongoReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*MongoReader) Status added in v1.3.1

func (mr *MongoReader) Status() utils.StatsInfo

func (*MongoReader) SyncMeta

func (mr *MongoReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type MultiReader

type MultiReader struct {
	// contains filtered or unexported fields
}

func NewMultiReader

func NewMultiReader(meta *Meta, logPathPattern, whence, expireDur, statIntervalDur string, maxOpenFiles int) (mr *MultiReader, err error)

func (*MultiReader) Close

func (mr *MultiReader) Close() (err error)

func (*MultiReader) Expire

func (mr *MultiReader) Expire()

Expire 函数关闭过期的文件,再更新

func (*MultiReader) Name

func (mr *MultiReader) Name() string

func (*MultiReader) ReadLine

func (mr *MultiReader) ReadLine() (data string, err error)

func (*MultiReader) SetMode

func (mr *MultiReader) SetMode(mode string, value interface{}) (err error)

func (*MultiReader) Source

func (mr *MultiReader) Source() string

func (*MultiReader) Start

func (mr *MultiReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题 处理StatIntervel以及Expire两大循环任务

func (*MultiReader) StatLogPath

func (mr *MultiReader) StatLogPath()

func (*MultiReader) Status added in v1.3.1

func (mr *MultiReader) Status() utils.StatsInfo

func (*MultiReader) SyncMeta

func (mr *MultiReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type Reader

type Reader interface {
	//Name reader名称
	Name() string
	//Source 读取的数据源
	Source() string
	ReadLine() (string, error)
	SetMode(mode string, v interface{}) error
	Close() error
	SyncMeta()
}

Reader 是一个通用的行读取reader接口

func NewFileAutoReader added in v1.4.1

func NewFileAutoReader(conf conf.MapConf, meta *Meta, isFromWeb bool, bufSize int, whence string, logpath string, fr FileReader) (reader Reader, err error)

func NewFileBufReader

func NewFileBufReader(conf conf.MapConf, isFromWeb bool) (reader Reader, err error)

NewFileReader 创建FileReader

func NewFileBufReaderWithMeta

func NewFileBufReaderWithMeta(conf conf.MapConf, meta *Meta, isFromWeb bool) (reader Reader, err error)

type RedisOptionn added in v1.2.1

type RedisOptionn struct {
	// contains filtered or unexported fields
}

type RedisReader added in v1.2.1

type RedisReader struct {
	// contains filtered or unexported fields
}

func NewRedisReader added in v1.2.1

func NewRedisReader(meta *Meta, conf conf.MapConf) (rr *RedisReader, err error)

func (*RedisReader) Close added in v1.2.1

func (rr *RedisReader) Close() (err error)

func (*RedisReader) Name added in v1.2.1

func (rr *RedisReader) Name() string

func (*RedisReader) ReadLine added in v1.2.1

func (rr *RedisReader) ReadLine() (data string, err error)

func (*RedisReader) SetMode added in v1.2.1

func (rr *RedisReader) SetMode(mode string, v interface{}) error

func (*RedisReader) Source added in v1.2.1

func (rr *RedisReader) Source() string

func (*RedisReader) Start added in v1.2.1

func (rr *RedisReader) Start()

func (*RedisReader) Status added in v1.3.1

func (rr *RedisReader) Status() utils.StatsInfo

func (*RedisReader) SyncMeta added in v1.2.1

func (rr *RedisReader) SyncMeta()

type Result added in v1.2.2

type Result struct {
	// contains filtered or unexported fields
}

type SeqFile

type SeqFile struct {
	// contains filtered or unexported fields
}

SeqFile 按最终修改时间依次读取文件的Reader类型

func NewSeqFile

func NewSeqFile(meta *Meta, path string, ignoreHidden bool, suffixes []string, validFileRegex, whence string) (sf *SeqFile, err error)

func (*SeqFile) Close

func (sf *SeqFile) Close() (err error)

func (*SeqFile) Name

func (sf *SeqFile) Name() string

func (*SeqFile) Read

func (sf *SeqFile) Read(p []byte) (n int, err error)

func (*SeqFile) Source

func (sf *SeqFile) Source() string

func (*SeqFile) SyncMeta

func (sf *SeqFile) SyncMeta() (err error)

type ServerReader added in v1.2.1

type ServerReader interface {
	//Name reader名称
	Name() string
	//Source 读取的数据源
	Source() string
	Start()
	ReadLine() (string, error)
	Close() error
	SyncMeta()
}

TODO 构建统一的 Server reader框架, 减少重复的编码

type SingleFile

type SingleFile struct {
	// contains filtered or unexported fields
}

func NewSingleFile

func NewSingleFile(meta *Meta, path, whence string, isFromWeb bool) (sf *SingleFile, err error)

func (*SingleFile) Close

func (sf *SingleFile) Close() (err error)

func (*SingleFile) Name

func (sf *SingleFile) Name() string

func (*SingleFile) Read

func (sf *SingleFile) Read(p []byte) (n int, err error)

func (*SingleFile) Reopen

func (sf *SingleFile) Reopen() (err error)

func (*SingleFile) Source

func (sf *SingleFile) Source() string

func (*SingleFile) SyncMeta

func (sf *SingleFile) SyncMeta() error

type SocketReader added in v1.3.6

type SocketReader struct {
	ServiceAddress  string
	MaxConnections  int
	ReadBufferSize  int
	ReadTimeout     time.Duration
	KeepAlivePeriod time.Duration

	// resource need  close
	ReadChan chan string
	Closer   io.Closer
	// contains filtered or unexported fields
}

func NewSocketReader added in v1.3.6

func NewSocketReader(meta *Meta, conf conf.MapConf) (*SocketReader, error)

func (*SocketReader) Close added in v1.3.6

func (sr *SocketReader) Close() (err error)

func (*SocketReader) Name added in v1.3.6

func (sr *SocketReader) Name() string

func (*SocketReader) ReadLine added in v1.3.6

func (sr *SocketReader) ReadLine() (data string, err error)

func (*SocketReader) SetMode added in v1.3.6

func (sr *SocketReader) SetMode(mode string, v interface{}) error

func (*SocketReader) Source added in v1.3.6

func (sr *SocketReader) Source() string

func (*SocketReader) Start added in v1.3.6

func (sr *SocketReader) Start() error

func (*SocketReader) SyncMeta added in v1.3.6

func (sr *SocketReader) SyncMeta()

type SqlReader

type SqlReader struct {
	Cron *cron.Cron //定时任务
	// contains filtered or unexported fields
}

func NewSQLReader

func NewSQLReader(meta *Meta, conf conf.MapConf) (mr *SqlReader, err error)

func (*SqlReader) Close

func (mr *SqlReader) Close() (err error)

func (*SqlReader) LoopRun added in v1.3.3

func (mr *SqlReader) LoopRun()

func (*SqlReader) Name

func (mr *SqlReader) Name() string

func (*SqlReader) ReadLine

func (mr *SqlReader) ReadLine() (data string, err error)

func (*SqlReader) SetMode

func (mr *SqlReader) SetMode(mode string, v interface{}) error

func (*SqlReader) Source

func (mr *SqlReader) Source() string

func (*SqlReader) Start

func (mr *SqlReader) Start()

Start 仅调用一次,借用ReadLine启动,不能在new实例的时候启动,会有并发问题

func (*SqlReader) Status added in v1.3.1

func (mr *SqlReader) Status() utils.StatsInfo

func (*SqlReader) SyncMeta

func (mr *SqlReader) SyncMeta()

SyncMeta 从队列取数据时同步队列,作用在于保证数据不重复。

type Statistic added in v1.3.5

type Statistic struct {
	ReaderCnt int64               `json:"reader_count"` // 读取总条数
	ParserCnt [2]int64            `json:"parser_connt"` // [解析成功, 解析失败]
	SenderCnt map[string][2]int64 `json:"sender_count"` // [发送成功, 发送失败]
}

type StatsReader added in v1.3.1

type StatsReader interface {
	//Name reader名称
	Name() string
	Status() utils.StatsInfo
}

StatsReader 是一个通用的带有统计接口的reader

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL