reader

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DoneFileName = "file.done"

	FtSaveLogPath = "ft_log" // ft log 在 meta 中的文件夹名字
)
View Source
const (
	DefautFileRetention = 7

	ModeMetrics = "metrics"
)

Variables

View Source
var ErrFileNotDir = errors.New("file is not directory")
View Source
var ErrFileNotRegular = errors.New("file is not regular")
View Source
var ErrMetaFileRead = errors.New("cannot read meta file")
View Source
var ErrNoFileChosen = errors.New("no files found")
View Source
var ErrStopped = errors.New("runner stopped")
View Source
var WaitNoSuchFile = 100 * time.Millisecond

Functions

func AndCondition

func AndCondition(f1, f2 func(os.FileInfo) bool) func(os.FileInfo) bool

func CompressedFile

func CompressedFile(path string) bool

func GetLogPathAbs

func GetLogPathAbs(conf conf.MapConf) (logpath, oldLogPath string, err error)

func GetMaxFile

func GetMaxFile(logdir string, condition func(os.FileInfo) bool, gte func(f1, f2 os.FileInfo) bool) (chosen os.FileInfo, err error)

GetMaxFile 在指定的限制条件condition下,根据比较函数gte 选择最大的os.FileInfo condition 文件必须满足的条件 gte f1 >= f2 则返回true

func GetMetaOption

func GetMetaOption(conf conf.MapConf) (string, string, string, error)

func GetMinFile

func GetMinFile(logdir string, condition func(os.FileInfo) bool, gte func(f1, f2 os.FileInfo) bool) (os.FileInfo, error)

GetMinFile 于getMaxFile 相反,返回最小的文件

func HeadPatternMode

func HeadPatternMode(mode string, v interface{}) (reg *regexp.Regexp, err error)

func IgnoreFileSuffixes

func IgnoreFileSuffixes(file string, suffixes []string) bool

IgnoreFileSuffixes return true if file has suffix of one of the suffixes

func IgnoreHidden

func IgnoreHidden(file string, ignoreHidden bool) bool

IgnoreHidden return ture if file has dot(.) which presents ignore files in *nix system

func InRunTime

func InRunTime(hour, minute int, runTime RunTime) bool

func JoinFileInode

func JoinFileInode(filename, inode string) string

func NoCondition

func NoCondition(f os.FileInfo) bool

NoCondition 无限制条件

func NotCondition

func NotCondition(f1 func(os.FileInfo) bool) func(os.FileInfo) bool

func OrCondition

func OrCondition(f1, f2 func(os.FileInfo) bool) func(os.FileInfo) bool

func ParseLoopDuration

func ParseLoopDuration(cronSched string) (dur time.Duration, err error)

func ParseNumber

func ParseNumber(str string) (number int, err error)

func ParseTime

func ParseTime(timeStr string) (hour, minute int, err error)

func RegisterConstructor

func RegisterConstructor(typ string, c Constructor)

RegisterConstructor adds a new constructor for a given type of reader.

func ValidFileRegex

func ValidFileRegex(file, validFilePattern string) bool

ValidFileRegex return true if file matches with validFilePattern

Types

type Constructor

type Constructor func(*Meta, conf.MapConf) (Reader, error)

type DaemonReader

type DaemonReader interface {
	// Start 用于非阻塞的启动读取器对应的守护线程,需要读取器自行负责其生命周期
	Start() error
}

DaemonReader 代表了一个需要守护线程的读取器

type DataReader

type DataReader interface {
	// ReadData 用于读取一条数据以及数据的实际读取字节
	ReadData() (Data, int64, error)
}

DataReader 代表了一个可直接读取内存数据结构的读取器

type FileReader

type FileReader interface {
	Name() string
	Source() string
	Read(p []byte) (n int, err error)
	Close() error
	SyncMeta() error
}

FileReader reader 接口方法

type LagReader

type LagReader interface {
	Lag() (*LagInfo, error)
}

获取数据lag的接口

type Meta

type Meta struct {
	Dir string // 记录文件处理进度的路径

	DoneFilePath string // 记录扫描过文件记录的文件

	TagFile string //记录tag文件路径的标签名称

	Readlimit int //读取磁盘限速单位 MB/s

	RunnerName string

	LastKey string // 记录从s3 最近一次拉取的文件
	// contains filtered or unexported fields
}

func NewMeta

func NewMeta(metadir, filedonedir, logpath, mode, tagfile string, donefileRetention int) (m *Meta, err error)

func NewMetaWithConf

func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error)

func NewMetaWithRunnerName

func NewMetaWithRunnerName(runnerName, metadir, filedonedir, logpath, mode, tagfile string, donefileRetention int) (m *Meta, err error)

func (*Meta) AddSubMeta

func (m *Meta) AddSubMeta(key string, meta *Meta) error

func (*Meta) AppendDeleteFile

func (m *Meta) AppendDeleteFile(path string) (err error)

func (*Meta) AppendDoneFile

func (m *Meta) AppendDoneFile(path string) (err error)

AppendDoneFile 将处理完的文件写入doneFile中

func (*Meta) AppendDoneFileInode

func (m *Meta) AppendDoneFileInode(path string, inode uint64) (err error)

AppendDoneFileInode 将处理完的文件路径、inode以及完成时间写入doneFile中

func (*Meta) BufFile

func (m *Meta) BufFile() string

BufFile 返回buf的文件路径

func (*Meta) BufMetaFile

func (m *Meta) BufMetaFile() string

BufMetaFile 返回buf的meta文件路径

func (*Meta) CacheLineFile

func (m *Meta) CacheLineFile() string

func (*Meta) CheckExpiredSubMetas

func (m *Meta) CheckExpiredSubMetas(expire time.Duration)

CheckExpiredSubMetas 仅用于轮询收集所有过期的 submeta,清理操作应通过调用 CleanExpiredSubMetas 方法完成。 一般情况下,应由 reader 实现启动 goroutine 单独调用以避免 submeta 数量过多导致进程被长时间阻塞。 另外,如果 submeta 没有存放在该 meta 的子目录则调用此方法无效

func (*Meta) CleanExpiredSubMetas

func (m *Meta) CleanExpiredSubMetas(expire time.Duration)

CleanExpiredSubMetas 清除超过指定过期时长的 submeta 目录,清理数目单次调用存在上限以减少阻塞时间

func (*Meta) Clear

func (m *Meta) Clear() error

Clear 删除所有meta信息

func (*Meta) Delete

func (m *Meta) Delete() error

func (*Meta) DeleteDoneFile

func (m *Meta) DeleteDoneFile(path string) error

func (*Meta) DeleteFile

func (m *Meta) DeleteFile() string

DeleteFile 处理完成文件地址,按日进行rotate

func (*Meta) DoneFile

func (m *Meta) DoneFile() string

DoneFile 处理完成文件地址,按日进行rotate

func (*Meta) ExtraInfo

func (m *Meta) ExtraInfo() map[string]string

func (*Meta) FtSaveLogPath

func (m *Meta) FtSaveLogPath() string

FtSaveLogPath 返回 ft_sender 日志信息记录文件夹路径

func (*Meta) GetDataSourceTag

func (m *Meta) GetDataSourceTag() string

func (*Meta) GetDoneFileContent

func (m *Meta) GetDoneFileContent() ([]string, error)

func (*Meta) GetDoneFileInode

func (m *Meta) GetDoneFileInode(inodeSensitive bool) map[string]bool

func (*Meta) GetDoneFiles

func (m *Meta) GetDoneFiles() ([]File, error)

func (*Meta) GetEncodeTag

func (m *Meta) GetEncodeTag() string

func (*Meta) GetEncodingWay

func (m *Meta) GetEncodingWay() (e string)

GetEncodingWay 获取文件编码方式

func (*Meta) GetMode

func (m *Meta) GetMode() string

func (*Meta) GetTagFile

func (m *Meta) GetTagFile() string

func (*Meta) GetTags

func (m *Meta) GetTags() map[string]interface{}

func (*Meta) IsDoneFile

func (m *Meta) IsDoneFile(file string) bool

IsDoneFile 返回是否是Donefile格式的文件

func (*Meta) IsExist

func (m *Meta) IsExist() bool

func (*Meta) IsFileMode

func (m *Meta) IsFileMode() bool

func (*Meta) IsNotExist

func (m *Meta) IsNotExist() bool

IsNotExist meta 不存在,用来判断是第一次创建

func (*Meta) IsNotValid

func (m *Meta) IsNotValid() bool

IsNotValid meta 数据已经过时,用来判断offset文件是否已经不存在,或者meta文件是否损坏

func (*Meta) IsStatisticFileExist

func (m *Meta) IsStatisticFileExist() bool

func (*Meta) IsStatisticFileNotExist

func (m *Meta) IsStatisticFileNotExist() bool

IsNotExist meta 不存在,用来判断是第一次创建

func (*Meta) IsValid

func (m *Meta) IsValid() bool

func (*Meta) LogPath

func (m *Meta) LogPath() string

func (*Meta) MetaFile

func (m *Meta) MetaFile() string

MetaFile 返回metaFileoffset 的meta文件地址

func (*Meta) ReadBuf

func (m *Meta) ReadBuf(buf []byte) (n int, err error)

func (*Meta) ReadBufMeta

func (m *Meta) ReadBufMeta() (r, w, bufsize int, err error)

func (*Meta) ReadCacheLine

func (m *Meta) ReadCacheLine() ([]byte, error)

func (*Meta) ReadDBDoneFile

func (m *Meta) ReadDBDoneFile(database string) (content []string, err error)

ReadDBDoneFile 读取当前Database已经读取的表

func (*Meta) ReadOffset

func (m *Meta) ReadOffset() (currFile string, offset int64, err error)

ReadOffset 读取当前读取的文件和offset

func (*Meta) ReadRecordsFile

func (m *Meta) ReadRecordsFile(recordsFile string) ([]string, error)

ReadRecordsFile 读取当前runner已经读取的表

func (*Meta) ReadStatistic

func (m *Meta) ReadStatistic() (stat Statistic, err error)

func (*Meta) RemoveSubMeta

func (m *Meta) RemoveSubMeta(key string)

func (*Meta) Reset

func (m *Meta) Reset() error

func (*Meta) SetEncodingWay

func (m *Meta) SetEncodingWay(e string)

SetEncodingWay 设置文件编码方式,默认为 utf-8

func (*Meta) StatisticFile

func (m *Meta) StatisticFile() string

StatisticFile 返回 Runner 统计信息的文件路径

func (*Meta) WriteBuf

func (m *Meta) WriteBuf(buf []byte, r, w, bufsize int) (err error)

func (*Meta) WriteCacheLine

func (m *Meta) WriteCacheLine(lines string) error

func (*Meta) WriteOffset

func (m *Meta) WriteOffset(currFile string, offset int64) (err error)

WriteOffset 将当前文件和offset写入meta中

func (*Meta) WriteStatistic

func (m *Meta) WriteStatistic(stat *Statistic) error

type NewSourceRecorder

type NewSourceRecorder interface {
	NewSourceIndex() []SourceIndex
}

type OnceReader

type OnceReader interface {
	ReadDone() bool
}

type Reader

type Reader interface {
	// Name 用于返回读取器的具体名称
	Name() string
	// SetMode 用于设置读取器的匹配模式
	SetMode(mode string, v interface{}) error
	// Source 用于返回当前读取的数据源
	Source() string
	// ReadLine 用于向读取器请求返回一行数据
	ReadLine() (string, error)
	// SyncMeta 用于通知读取器保存同步相关元数据
	SyncMeta()
	// Close 用于关闭读取器
	Close() error
}

Reader 代表了一个通用的行读取器

func NewFileBufReader deprecated

func NewFileBufReader(conf conf.MapConf, errDirectReturn bool) (reader Reader, err error)

Deprecated: NewFileBufReader 名字上有歧义,实际上就是NewReader,包括任何类型,保证兼容性,保留

func NewReader

func NewReader(conf conf.MapConf, errDirectReturn bool) (reader Reader, err error)

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry reader 的工厂类。可以注册自定义reader

func NewRegistry

func NewRegistry() *Registry

func (*Registry) NewReader

func (reg *Registry) NewReader(conf conf.MapConf, errDirectReturn bool) (reader Reader, err error)

func (*Registry) NewReaderWithMeta

func (reg *Registry) NewReaderWithMeta(conf conf.MapConf, meta *Meta, errDirectReturn bool) (Reader, error)

func (*Registry) RegisterReader

func (reg *Registry) RegisterReader(readerType string, constructor Constructor) error

type RunTime

type RunTime struct {
	StartHour, StartMin int
	EndHour, EndMin     int
}

func ParseRunTime

func ParseRunTime(runTimeStr string) (runTime RunTime, err error)

func ParseRunTimeWithMode

func ParseRunTimeWithMode(mode string, v interface{}) (runTime RunTime, err error)

func (RunTime) Equal

func (r RunTime) Equal() bool

func (RunTime) GreaterThanStart

func (r RunTime) GreaterThanStart(hour, minute int) bool

func (RunTime) LessThanEnd

func (r RunTime) LessThanEnd(hour, minute int) bool

type RunTimeReader

type RunTimeReader interface {
	SetRunTime(mode string, v interface{}) error
}

type SourceIndex

type SourceIndex struct {
	Source string
	Index  int
}

type Statistic

type Statistic struct {
	ReaderCnt       int64                     `json:"reader_count"`    // 读取总条数
	ParserCnt       [2]int64                  `json:"parser_connt"`    // [解析成功, 解析失败]
	SenderCnt       map[string][2]int64       `json:"sender_count"`    // [发送成功, 发送失败]
	TransCnt        map[string][2]int64       `json:"transform_count"` // [解析成功, 解析失败]
	ReadErrors      ErrorStatistic            `json:"read_errors"`
	ParseErrors     ErrorStatistic            `json:"parse_errors"`
	TransformErrors map[string]ErrorStatistic `json:"transform_errors"`
	SendErrors      map[string]ErrorStatistic `json:"send_errors"`
}

type StatsReader

type StatsReader interface {
	//Name reader名称
	Name() string
	Status() StatsInfo
}

StatsReader 是一个通用的带有统计接口的reader

Directories

Path Synopsis
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Package bufio implements buffered I/O. It wraps an FileReader or io.Writer object, creating another object (Reader or Writer) that also implements the interface but provides buffering and some help for textual I/O.
Package builtin does nothing but import all builtin readers to execute their init functions.
Package builtin does nothing but import all builtin readers to execute their init functions.
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL