reader

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2020 License: Apache-2.0 Imports: 18 Imported by: 74

Documentation

Index

Constants

View Source
const (
	DoneFileName = "file.done"

	FtSaveLogPath = "ft_log" // ft log 在 meta 中的文件夹名字
)
View Source
const (
	DefautFileRetention = 7

	ModeMetrics = "metrics"
)

Variables

View Source
var ErrFileNotDir = errors.New("file is not directory")
View Source
var ErrFileNotRegular = errors.New("file is not regular")
View Source
var ErrMetaFileRead = errors.New("cannot read meta file")
View Source
var ErrNoFileChosen = errors.New("no files found")
View Source
var ErrStopped = errors.New("runner stopped")
View Source
var WaitNoSuchFile = 100 * time.Millisecond

Functions

func AndCondition added in v1.5.4

func AndCondition(f1, f2 func(os.FileInfo) bool) func(os.FileInfo) bool

func CompressedFile added in v1.5.4

func CompressedFile(path string) bool

func GetLogPathAbs added in v1.5.4

func GetLogPathAbs(conf conf.MapConf) (logpath, oldLogPath string, err error)

func GetMaxFile added in v1.5.4

func GetMaxFile(logdir string, condition func(os.FileInfo) bool, gte func(f1, f2 os.FileInfo) bool) (chosen os.FileInfo, err error)

GetMaxFile 在指定的限制条件condition下,根据比较函数gte 选择最大的os.FileInfo condition 文件必须满足的条件 gte f1 >= f2 则返回true

func GetMetaOption added in v1.5.4

func GetMetaOption(conf conf.MapConf) (string, string, string, error)

func GetMinFile added in v1.5.4

func GetMinFile(logdir string, condition func(os.FileInfo) bool, gte func(f1, f2 os.FileInfo) bool) (os.FileInfo, error)

GetMinFile 于getMaxFile 相反,返回最小的文件

func HeadPatternMode

func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)

func IgnoreFileSuffixes added in v1.5.4

func IgnoreFileSuffixes(file string, suffixes []string) bool

IgnoreFileSuffixes return true if file has suffix of one of the suffixes

func IgnoreHidden added in v1.5.4

func IgnoreHidden(file string, ignoreHidden bool) bool

IgnoreHidden return ture if file has dot(.) which presents ignore files in *nix system

func InRunTime added in v1.5.4

func InRunTime(hour, minute int, runTime RunTime) bool

func JoinFileInode added in v1.5.4

func JoinFileInode(filename, inode string) string

func NoCondition added in v1.5.4

func NoCondition(f os.FileInfo) bool

NoCondition 无限制条件

func NotCondition added in v1.5.4

func NotCondition(f1 func(os.FileInfo) bool) func(os.FileInfo) bool

func OrCondition added in v1.5.4

func OrCondition(f1, f2 func(os.FileInfo) bool) func(os.FileInfo) bool

func ParseLoopDuration added in v1.5.0

func ParseLoopDuration(cronSched string) (dur time.Duration, err error)

func ParseNumber added in v1.5.4

func ParseNumber(str string) (number int, err error)

func ParseTime added in v1.5.4

func ParseTime(timeStr string) (hour, minute int, err error)

func RegisterConstructor added in v1.5.0

func RegisterConstructor(typ string, c Constructor)

RegisterConstructor adds a new constructor for a given type of reader.

func ValidFileRegex added in v1.5.4

func ValidFileRegex(file, validFilePattern string) bool

ValidFileRegex return true if file matches with validFilePattern

Types

type Constructor added in v1.5.0

type Constructor func(*Meta, conf.MapConf) (Reader, error)

type DaemonReader added in v1.5.2

type DaemonReader interface {
	// Start 用于非阻塞的启动读取器对应的守护线程,需要读取器自行负责其生命周期
	Start() error
}

DaemonReader 代表了一个需要守护线程的读取器

type DataReader added in v1.5.0

type DataReader interface {
	// ReadData 用于读取一条数据以及数据的实际读取字节
	ReadData() (Data, int64, error)
}

DataReader 代表了一个可直接读取内存数据结构的读取器

type FileReader

type FileReader interface {
	Name() string
	Source() string
	Read(p []byte) (n int, err error)
	Close() error
	SyncMeta() error
}

FileReader reader 接口方法

type LagReader added in v1.4.4

type LagReader interface {
	Lag() (*LagInfo, error)
}

获取数据lag的接口

type Meta

type Meta struct {
	Dir string // 记录文件处理进度的路径

	DoneFilePath string // 记录扫描过文件记录的文件

	TagFile string //记录tag文件路径的标签名称

	Readlimit int //读取磁盘限速单位 MB/s

	RunnerName string

	LastKey string // 记录从s3 最近一次拉取的文件
	// contains filtered or unexported fields
}

func NewMeta

func NewMeta(metadir, filedonedir, logpath, mode, tagfile string, donefileRetention int) (m *Meta, err error)

func NewMetaWithConf

func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error)

func NewMetaWithRunnerName added in v1.5.4

func NewMetaWithRunnerName(runnerName, metadir, filedonedir, logpath, mode, tagfile string, donefileRetention int) (m *Meta, err error)

func (*Meta) AddSubMeta added in v1.5.0

func (m *Meta) AddSubMeta(key string, meta *Meta) error

func (*Meta) AppendDeleteFile

func (m *Meta) AppendDeleteFile(path string) (err error)

func (*Meta) AppendDoneFile

func (m *Meta) AppendDoneFile(path string) (err error)

AppendDoneFile 将处理完的文件写入doneFile中

func (*Meta) AppendDoneFileInode added in v1.4.8

func (m *Meta) AppendDoneFileInode(path string, inode uint64) (err error)

AppendDoneFileInode 将处理完的文件路径、inode以及完成时间写入doneFile中

func (*Meta) BufFile

func (m *Meta) BufFile() string

BufFile 返回buf的文件路径

func (*Meta) BufMetaFile

func (m *Meta) BufMetaFile() string

BufMetaFile 返回buf的meta文件路径

func (*Meta) CacheLineFile

func (m *Meta) CacheLineFile() string

func (*Meta) CheckExpiredSubMetas added in v1.5.3

func (m *Meta) CheckExpiredSubMetas(expire time.Duration)

CheckExpiredSubMetas 仅用于轮询收集所有过期的 submeta,清理操作应通过调用 CleanExpiredSubMetas 方法完成。 一般情况下,应由 reader 实现启动 goroutine 单独调用以避免 submeta 数量过多导致进程被长时间阻塞。 另外,如果 submeta 没有存放在该 meta 的子目录则调用此方法无效

func (*Meta) CleanExpiredSubMetas added in v1.5.3

func (m *Meta) CleanExpiredSubMetas(expire time.Duration)

CleanExpiredSubMetas 清除超过指定过期时长的 submeta 目录,清理数目单次调用存在上限以减少阻塞时间

func (*Meta) Clear

func (m *Meta) Clear() error

Clear 删除所有meta信息

func (*Meta) Delete added in v1.5.4

func (m *Meta) Delete() error

func (*Meta) DeleteDoneFile

func (m *Meta) DeleteDoneFile(path string) error

func (*Meta) DeleteFile

func (m *Meta) DeleteFile() string

DeleteFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFile

func (m *Meta) DoneFile() string

DoneFile 处理完成文件地址,按日进行rotate

func (*Meta) ExtraInfo added in v1.4.6

func (m *Meta) ExtraInfo() map[string]string

func (*Meta) FtSaveLogPath added in v1.3.5

func (m *Meta) FtSaveLogPath() string

FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径

func (*Meta) GetDataSourceTag

func (m *Meta) GetDataSourceTag() string

func (*Meta) GetDoneFileContent added in v1.4.8

func (m *Meta) GetDoneFileContent() ([]string, error)

func (*Meta) GetDoneFileInode added in v1.4.8

func (m *Meta) GetDoneFileInode(inodeSensitive bool) map[string]bool

func (*Meta) GetDoneFiles

func (m *Meta) GetDoneFiles() ([]File, error)

func (*Meta) GetEncodeTag added in v1.5.4

func (m *Meta) GetEncodeTag() string

func (*Meta) GetEncodingWay

func (m *Meta) GetEncodingWay() (e string)

GetEncodingWay 获取文件编码方式

func (*Meta) GetMode

func (m *Meta) GetMode() string

func (*Meta) GetTagFile added in v1.4.3

func (m *Meta) GetTagFile() string

func (*Meta) GetTags added in v1.4.3

func (m *Meta) GetTags() map[string]interface{}

func (*Meta) IsDoneFile

func (m *Meta) IsDoneFile(file string) bool

IsDoneFile 返回是否是Donefile格式的文件

func (*Meta) IsExist

func (m *Meta) IsExist() bool

func (*Meta) IsFileMode added in v1.3.1

func (m *Meta) IsFileMode() bool

func (*Meta) IsNotExist

func (m *Meta) IsNotExist() bool

IsNotExist meta 不存在,用来判断是第一次创建

func (*Meta) IsNotValid

func (m *Meta) IsNotValid() bool

IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏

func (*Meta) IsStatisticFileExist added in v1.5.4

func (m *Meta) IsStatisticFileExist() bool

func (*Meta) IsStatisticFileNotExist added in v1.5.4

func (m *Meta) IsStatisticFileNotExist() bool

IsNotExist meta 不存在,用来判断是第一次创建

func (*Meta) IsValid

func (m *Meta) IsValid() bool

func (*Meta) LogPath

func (m *Meta) LogPath() string

func (*Meta) MetaFile

func (m *Meta) MetaFile() string

MetaFile 返回metaFileoffset 的meta文件地址

func (*Meta) ReadBuf

func (m *Meta) ReadBuf(buf []byte) (n int, err error)

func (*Meta) ReadBufMeta

func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)

func (*Meta) ReadCacheLine

func (m *Meta) ReadCacheLine() ([]byte, error)

func (*Meta) ReadDBDoneFile added in v1.4.8

func (m *Meta) ReadDBDoneFile(database string) (content []string, err error)

ReadDBDoneFile 读取当前Database已经读取的表

func (*Meta) ReadOffset

func (m *Meta) ReadOffset() (currFile string, offset int64, err error)

ReadOffset 读取当前读取的文件和offset

func (*Meta) ReadRecordsFile added in v1.5.0

func (m *Meta) ReadRecordsFile(recordsFile string) ([]string, error)

ReadRecordsFile 读取当前runner已经读取的表

func (*Meta) ReadStatistic added in v1.3.5

func (m *Meta) ReadStatistic() (stat Statistic, err error)

func (*Meta) RemoveSubMeta added in v1.5.0

func (m *Meta) RemoveSubMeta(key string)

func (*Meta) Reset added in v1.2.4

func (m *Meta) Reset() error

func (*Meta) SetEncodingWay

func (m *Meta) SetEncodingWay(e string)

SetEncodingWay 设置文件编码方式,默认为 utf-8

func (*Meta) StatisticFile added in v1.3.5

func (m *Meta) StatisticFile() string

StatisticFile 返回 Runner 统计信息的文件路径

func (*Meta) WriteBuf

func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)

func (*Meta) WriteCacheLine

func (m *Meta) WriteCacheLine(lines string) error

func (*Meta) WriteOffset

func (m *Meta) WriteOffset(currFile string, offset int64) (err error)

WriteOffset 将当前文件和offset写入meta中

func (*Meta) WriteStatistic added in v1.3.5

func (m *Meta) WriteStatistic(stat *Statistic) error

type NewSourceRecorder added in v1.5.4

type NewSourceRecorder interface {
	NewSourceIndex() []SourceIndex
}

type OnceReader added in v1.5.4

type OnceReader interface {
	ReadDone() bool
}

type Reader

type Reader interface {
	// Name 用于返回读取器的具体名称
	Name() string
	// SetMode 用于设置读取器的匹配模式
	SetMode(mode string, v interface{}) error
	// Source 用于返回当前读取的数据源
	Source() string
	// ReadLine 用于向读取器请求返回一行数据
	ReadLine() (string, error)
	// SyncMeta 用于通知读取器保存同步相关元数据
	SyncMeta()
	// Close 用于关闭读取器
	Close() error
}

Reader 代表了一个通用的行读取器

func NewFileBufReader deprecated

func NewFileBufReader(conf conf.MapConf, errDirectReturn bool) (reader Reader, err error)

Deprecated: NewFileBufReader 名字上有歧义,实际上就是NewReader,包括任何类型,保证兼容性,保留

func NewReader added in v1.5.1

func NewReader(conf conf.MapConf, errDirectReturn bool) (reader Reader, err error)

type Registry added in v1.5.0

type Registry struct {
	// contains filtered or unexported fields
}

Registry reader 的工厂类。可以注册自定义reader

func NewRegistry added in v1.5.0

func NewRegistry() *Registry

func (*Registry) NewReader added in v1.5.0

func (reg *Registry) NewReader(conf conf.MapConf, errDirectReturn bool) (reader Reader, err error)

func (*Registry) NewReaderWithMeta added in v1.5.0

func (reg *Registry) NewReaderWithMeta(conf conf.MapConf, meta *Meta, errDirectReturn bool) (Reader, error)

func (*Registry) RegisterReader added in v1.5.0

func (reg *Registry) RegisterReader(readerType string, constructor Constructor) error

type RunTime added in v1.5.4

type RunTime struct {
	StartHour, StartMin int
	EndHour, EndMin     int
}

func ParseRunTime added in v1.5.4

func ParseRunTime(runTimeStr string) (runTime RunTime, err error)

func ParseRunTimeWithMode added in v1.5.4

func ParseRunTimeWithMode(mode string, v interface{}) (runTime RunTime, err error)

func (RunTime) Equal added in v1.5.4

func (r RunTime) Equal() bool

func (RunTime) GreaterThanStart added in v1.5.4

func (r RunTime) GreaterThanStart(hour, minute int) bool

func (RunTime) LessThanEnd added in v1.5.4

func (r RunTime) LessThanEnd(hour, minute int) bool

type RunTimeReader added in v1.5.4

type RunTimeReader interface {
	SetRunTime(mode string, v interface{}) error
}

type SourceIndex added in v1.4.8

type SourceIndex struct {
	Source string
	Index  int
}

type Statistic added in v1.3.5

type Statistic struct {
	ReaderCnt       int64                     `json:"reader_count"`    // 读取总条数
	ParserCnt       [2]int64                  `json:"parser_connt"`    // [解析成功, 解析失败]
	SenderCnt       map[string][2]int64       `json:"sender_count"`    // [发送成功, 发送失败]
	TransCnt        map[string][2]int64       `json:"transform_count"` // [解析成功, 解析失败]
	ReadErrors      ErrorStatistic            `json:"read_errors"`
	ParseErrors     ErrorStatistic            `json:"parse_errors"`
	TransformErrors map[string]ErrorStatistic `json:"transform_errors"`
	SendErrors      map[string]ErrorStatistic `json:"send_errors"`
}

type StatsReader added in v1.3.1

type StatsReader interface {
	//Name reader名称
	Name() string
	Status() StatsInfo
}

StatsReader 是一个通用的带有统计接口的reader

Directories

Path Synopsis
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Package builtin does nothing but import all builtin readers to execute their init functions.
Package builtin does nothing but import all builtin readers to execute their init functions.
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL