Documentation
¶
Index ¶
- Constants
- func GetSink() api.Sink
- func GetSource() api.Source
- type FileDirSourceRewindMeta
- type FileType
- type Source
- func (fs *Source) Close(ctx api.StreamContext) error
- func (fs *Source) Connect(_ api.StreamContext, sch api.StatusChangeHandler) error
- func (fs *Source) GetOffset() (any, error)
- func (fs *Source) Info() (i model.NodeInfo)
- func (fs *Source) Load(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest)
- func (fs *Source) Provision(ctx api.StreamContext, props map[string]any) error
- func (fs *Source) Pull(ctx api.StreamContext, _ time.Time, ingest api.TupleIngest, ...)
- func (fs *Source) ResetOffset(_ map[string]any) error
- func (fs *Source) Rewind(offset any) error
- func (fs *Source) SetEofIngest(eof api.EOFIngest)
- func (fs *Source) TransformType() api.Source
- type SourceConfig
- type WatchWrapper
- func (f *WatchWrapper) Close(ctx api.StreamContext) error
- func (f *WatchWrapper) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (f *WatchWrapper) Provision(ctx api.StreamContext, configs map[string]any) error
- func (f *WatchWrapper) SetEofIngest(eof api.EOFIngest)
- func (f *WatchWrapper) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error
- type WithTime
- type WithTimeSlice
Constants ¶
View Source
const ( GZIP = "gzip" ZSTD = "zstd" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type FileDirSourceRewindMeta ¶ added in v2.1.0
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
Source load data from file system. Depending on file types, it may read line by line like lines, csv. Otherwise, it reads the file as a whole and send to company reader node to read and split. The planner need to plan according to the file type.
func (*Source) Connect ¶
func (fs *Source) Connect(_ api.StreamContext, sch api.StatusChangeHandler) error
func (*Source) Load ¶
func (fs *Source) Load(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest)
func (*Source) Pull ¶
func (fs *Source) Pull(ctx api.StreamContext, _ time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest)
Pull file source may ingest bytes or tuple For stream source, it ingest one line For batch source, it ingest the whole file, thus it need a reader node to coordinate and read the content into lines/array
func (*Source) SetEofIngest ¶
func (*Source) TransformType ¶ added in v2.1.0
TransformType must call after provision
type SourceConfig ¶
type SourceConfig struct { FileName string `json:"datasource"` FileType string `json:"fileType"` Path string `json:"path"` Interval cast.DurationConf `json:"interval"` IsTable bool `json:"isTable"` SendInterval cast.DurationConf `json:"sendInterval"` ActionAfterRead int `json:"actionAfterRead"` MoveTo string `json:"moveTo"` IgnoreStartLines int `json:"ignoreStartLines"` IgnoreEndLines int `json:"ignoreEndLines"` // Only use for planning Decompression string `json:"decompression"` // contains filtered or unexported fields }
type WatchWrapper ¶ added in v2.1.0
type WatchWrapper struct {
// contains filtered or unexported fields
}
func (*WatchWrapper) Close ¶ added in v2.1.0
func (f *WatchWrapper) Close(ctx api.StreamContext) error
func (*WatchWrapper) Connect ¶ added in v2.1.0
func (f *WatchWrapper) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
func (*WatchWrapper) Provision ¶ added in v2.1.0
func (f *WatchWrapper) Provision(ctx api.StreamContext, configs map[string]any) error
func (*WatchWrapper) SetEofIngest ¶ added in v2.1.0
func (f *WatchWrapper) SetEofIngest(eof api.EOFIngest)
func (*WatchWrapper) Subscribe ¶ added in v2.1.0
func (f *WatchWrapper) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error
type WithTimeSlice ¶ added in v2.1.0
type WithTimeSlice []WithTime
func (WithTimeSlice) Len ¶ added in v2.1.0
func (f WithTimeSlice) Len() int
func (WithTimeSlice) Less ¶ added in v2.1.0
func (f WithTimeSlice) Less(i, j int) bool
func (WithTimeSlice) Swap ¶ added in v2.1.0
func (f WithTimeSlice) Swap(i, j int)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.