streamer

package
v0.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Static = iota
	Dynamic
	Increment
	DynInc
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DataParser

type DataParser interface {
	Parse([]byte, interface{}) []ParserResult
}

type DefaultTextParser

type DefaultTextParser struct {
}

func (*DefaultTextParser) Parse

func (*DefaultTextParser) Parse(data []byte, userData interface{}) []ParserResult

type Info

type Info struct {
	Name         string        `json:"name"`
	TotalNum     int           `json:"total_num"`
	AddNum       int           `json:"add_num"`
	ErrorNum     int           `json:"error_num"`
	LastBaseTime time.Time     `json:"last_base_time"`
	LastIncTime  time.Time     `json:"last_inc_time"`
	BaseTimeUsed time.Duration `json:"base_time_used"`
	IncTimeUsed  time.Duration `json:"inc_time_used"`
}

type LocalFileStreamer

type LocalFileStreamer struct {
	// contains filtered or unexported fields
}

func NewFileStreamer

func NewFileStreamer(cfg *LocalFileStreamerCfg) *LocalFileStreamer

func (*LocalFileStreamer) GetContainer

func (fs *LocalFileStreamer) GetContainer() container.Container

func (*LocalFileStreamer) GetInfo

func (fs *LocalFileStreamer) GetInfo() *Info

func (*LocalFileStreamer) GetSchedInfo

func (fs *LocalFileStreamer) GetSchedInfo() *SchedInfo

func (*LocalFileStreamer) HasNext

func (fs *LocalFileStreamer) HasNext() (bool, error)

func (*LocalFileStreamer) InfoStatus

func (fs *LocalFileStreamer) InfoStatus(s string)

func (*LocalFileStreamer) Next

func (fs *LocalFileStreamer) Next() (container.DataMode, container.MapKey, interface{}, error)

func (*LocalFileStreamer) SetContainer

func (fs *LocalFileStreamer) SetContainer(container container.Container)

func (*LocalFileStreamer) UpdateData

func (fs *LocalFileStreamer) UpdateData(ctx context.Context) error

func (*LocalFileStreamer) WarnStatus

func (fs *LocalFileStreamer) WarnStatus(s string)

type LocalFileStreamerCfg

type LocalFileStreamerCfg struct {
	Name         string
	Path         string
	UpdatMode    UpdatMode
	Interval     int
	IsSync       bool
	DataParser   DataParser
	UserData     interface{}
	Logger       log.BiLogger
	OnBeforeBase func(streamer Streamer) error
	OnFinishBase func(streamer Streamer)
}

type MongoStreamer

type MongoStreamer struct {
	// contains filtered or unexported fields
}

func NewMongoStreamer

func NewMongoStreamer(mongoConfig *MongoStreamerCfg) (*MongoStreamer, error)

func (*MongoStreamer) GetContainer

func (ms *MongoStreamer) GetContainer() container.Container

func (*MongoStreamer) GetInfo

func (ms *MongoStreamer) GetInfo() *Info

func (*MongoStreamer) GetSchedInfo

func (ms *MongoStreamer) GetSchedInfo() *SchedInfo

func (*MongoStreamer) HasNext

func (ms *MongoStreamer) HasNext() (bool, error)

func (*MongoStreamer) InfoStatus

func (ms *MongoStreamer) InfoStatus(s string)

func (*MongoStreamer) Next

func (ms *MongoStreamer) Next() (container.DataMode, container.MapKey, interface{}, error)

func (*MongoStreamer) SetContainer

func (ms *MongoStreamer) SetContainer(container container.Container)

func (*MongoStreamer) UpdateData

func (ms *MongoStreamer) UpdateData(ctx context.Context) error

func (*MongoStreamer) WarnStatus

func (ms *MongoStreamer) WarnStatus(s string)

type MongoStreamerCfg

type MongoStreamerCfg struct {
	Name           string
	UpdateMode     UpdatMode
	IncInterval    int
	BaseInterval   int
	IsSync         bool
	TryTimes       int
	URI            string
	DB             string
	Collection     string
	ConnectTimeout int
	ReadTimeout    int
	BaseParser     DataParser
	IncParser      DataParser
	BaseQuery      interface{}
	IncQuery       interface{}
	UserData       interface{}
	FindOpt        *options.FindOptions
	OnBeforeBase   func(interface{}) interface{}
	OnBeforeInc    func(interface{}) interface{}
	OnFinishBase   func(streamer Streamer)
	OnFinishInc    func(streamer Streamer)
	Logger         log.BiLogger
}

type ParserResult

type ParserResult struct {
	DataMode container.DataMode
	Key      container.MapKey
	Value    interface{}
	Err      error
}

type Sched

type Sched []*SchedUnit

func (*Sched) AddStreamer

func (s *Sched) AddStreamer(name string, dataStreamer Streamer)

func (Sched) Len

func (s Sched) Len() int

func (Sched) Less

func (s Sched) Less(i, j int) bool

func (*Sched) Pop

func (s *Sched) Pop() interface{}

func (*Sched) Push

func (s *Sched) Push(x interface{})

func (*Sched) Schedule

func (s *Sched) Schedule(ctx context.Context)

func (Sched) Swap

func (s Sched) Swap(i, j int)

func (*Sched) Top

func (s *Sched) Top() *SchedUnit

type SchedInfo

type SchedInfo struct {
	TimeInterval int
}

type SchedUnit

type SchedUnit struct {
	// contains filtered or unexported fields
}

type Streamer

type Streamer interface {
	SetContainer(container.Container)
	GetContainer() container.Container
	GetSchedInfo() *SchedInfo
	UpdateData(ctx context.Context) error

	GetInfo() *Info
}

type UpdatMode

type UpdatMode int64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL