helper

package
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2022 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddMissingSlashToURL

func AddMissingSlashToURL(baseUrl *string)

func GetRawMessageDirectFromResponse

func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error)

func GetURIStringPointer

func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, error)

func NewDefaultTaskContext

func NewDefaultTaskContext(
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
	ctx context.Context,
	name string,
	subtasks map[string]bool,
	progress chan core.RunningProgress,
) core.TaskContext

func NewDefaultTaskLogger

func NewDefaultTaskLogger(log *logrus.Logger, prefix string) core.Logger

func NewStandaloneSubTaskContext

func NewStandaloneSubTaskContext(
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
	ctx context.Context,
	name string,
	data interface{},
) core.SubTaskContext

This returns a stand-alone core.SubTaskContext, not attached to any core.TaskContext. Use this if you need to run/debug a subtask without going through the usual workflow.

func RemoveStartingSlashFromPath

func RemoveStartingSlashFromPath(relativePath string) string

func UnmarshalResponse

func UnmarshalResponse(res *http.Response, v interface{}) error

Types

type ApiAsyncCallback

type ApiAsyncCallback func(*http.Response, error) error

type ApiAsyncClient

type ApiAsyncClient struct {
	*ApiClient
	// contains filtered or unexported fields
}

ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic You may submit multiple requests at once by calling `GetAsync`, and those requests will be performed in parallel with rate-limit support

func CreateAsyncApiClient

func CreateAsyncApiClient(
	taskCtx core.TaskContext,
	apiClient *ApiClient,
	rateLimiter *ApiRateLimitCalculator,
) (*ApiAsyncClient, error)

func (*ApiAsyncClient) DoAsync

func (apiClient *ApiAsyncClient) DoAsync(
	method string,
	path string,
	query url.Values,
	body interface{},
	header http.Header,
	handler ApiAsyncCallback,
	retry int,
) error

func (*ApiAsyncClient) GetAsync

func (apiClient *ApiAsyncClient) GetAsync(
	path string,
	query url.Values,
	header http.Header,
	handler ApiAsyncCallback,
) error

Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests

func (*ApiAsyncClient) GetQps

func (apiClient *ApiAsyncClient) GetQps() float64

func (*ApiAsyncClient) WaitAsync

func (apiClient *ApiAsyncClient) WaitAsync() error

Wait until all async requests were done

type ApiClient

type ApiClient struct {
	// contains filtered or unexported fields
}

ApiClient is designed for simple api requests

func NewApiClient

func NewApiClient(
	endpoint string,
	headers map[string]string,
	timeout time.Duration,
	proxy string,
	ctx context.Context,
) (*ApiClient, error)

func (*ApiClient) Do

func (apiClient *ApiClient) Do(
	method string,
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) Get

func (apiClient *ApiClient) Get(
	path string,
	query url.Values,
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) GetEndpoint

func (apiClient *ApiClient) GetEndpoint() string

func (*ApiClient) GetHeaders

func (apiClient *ApiClient) GetHeaders() map[string]string

func (*ApiClient) Post

func (apiClient *ApiClient) Post(
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) SetAfterFunction

func (apiClient *ApiClient) SetAfterFunction(callback ApiClientAfterResponse)

func (*ApiClient) SetBeforeFunction

func (apiClient *ApiClient) SetBeforeFunction(callback ApiClientBeforeRequest)

func (*ApiClient) SetContext

func (apiClient *ApiClient) SetContext(ctx context.Context)

func (*ApiClient) SetEndpoint

func (apiClient *ApiClient) SetEndpoint(endpoint string)

func (*ApiClient) SetHeaders

func (apiClient *ApiClient) SetHeaders(headers map[string]string)

func (*ApiClient) SetLogger

func (apiClient *ApiClient) SetLogger(logger core.Logger)

func (*ApiClient) SetProxy

func (apiClient *ApiClient) SetProxy(proxyUrl string) error

func (*ApiClient) SetTimeout

func (ApiClient *ApiClient) SetTimeout(timeout time.Duration)

func (*ApiClient) Setup

func (apiClient *ApiClient) Setup(
	endpoint string,
	headers map[string]string,
	timeout time.Duration,

)

type ApiClientAfterResponse

type ApiClientAfterResponse func(res *http.Response) error

type ApiClientBeforeRequest

type ApiClientBeforeRequest func(req *http.Request) error

type ApiCollector

type ApiCollector struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

func NewApiCollector

func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error)

NewApiCollector allocates a new ApiCollector with the given args. ApiCollector can help you collecting data from some api with ease, pass in a AsyncApiClient and tell it which part of response you want to save, ApiCollector will collect them from remote server and store them into database.

func (*ApiCollector) Execute

func (collector *ApiCollector) Execute() error

Start collection

type ApiCollectorArgs

type ApiCollectorArgs struct {
	RawDataSubTaskArgs
	/*
		url may use arbitrary variables from different source in any order, we need GoTemplate to allow more
		flexible for all kinds of possibility.
		Pager contains information for a particular page, calculated by ApiCollector, and will be passed into
		GoTemplate to generate a url for that page.
		We want to do page-fetching in ApiCollector, because the logic are highly similar, by doing so, we can
		avoid duplicate logic for every tasks, and when we have a better idea like improving performance, we can
		do it in one place
	*/
	UrlTemplate string `comment:"GoTemplate for API url"`
	// (Optional) Return query string for request, or you can plug them into UrlTemplate directly
	Query func(reqData *RequestData) (url.Values, error) `comment:"Extra query string when requesting API, like 'Since' option for jira issues collection"`
	// Some api might do pagination by http headers
	Header      func(reqData *RequestData) (http.Header, error)
	PageSize    int
	Incremental bool `comment:"Indicate this is a incremental collection, so the existing data won't get flushed"`
	ApiClient   RateLimitedApiClient
	/*
		Sometimes, we need to collect data based on previous collected data, like jira changelog, it requires
		issue_id as part of the url.
		We can mimic `stdin` design, to accept a `Input` function which produces a `Iterator`, collector
		should iterate all records, and do data-fetching for each on, either in parallel or sequential order
		UrlTemplate: "api/3/issue/{{ Input.ID }}/changelog"
	*/
	Input          Iterator
	InputRateLimit int
	/*
		For api endpoint that returns number of total pages, ApiCollector can collect pages in parallel with ease,
		or other techniques are required if this information was missing.
	*/
	GetTotalPages  func(res *http.Response, args *ApiCollectorArgs) (int, error)
	Concurrency    int
	ResponseParser func(res *http.Response) ([]json.RawMessage, error)
}

type ApiExtractor

type ApiExtractor struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data It reads rows from specified raw data table, and feed it into `Extract` handler you can return arbitrary tool layer entities in this handler, ApiExtractor would first delete old data by their RawDataOrigin information, and then perform a batch insertion for you.

func NewApiExtractor

func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error)

func (*ApiExtractor) Execute

func (extractor *ApiExtractor) Execute() error

type ApiExtractorArgs

type ApiExtractorArgs struct {
	RawDataSubTaskArgs
	Params    interface{}
	Extract   RawDataExtractor
	BatchSize int
}

type ApiRateLimitCalculator

type ApiRateLimitCalculator struct {
	UserRateLimitPerHour   int
	GlobalRateLimitPerHour int
	MaxRetry               int
	Method                 string
	ApiPath                string
	DynamicRateLimit       func(res *http.Response) (int, time.Duration, error)
}

A helper to calculate api rate limit dynamically, assuming api returning remaining/resettime information

func (*ApiRateLimitCalculator) Calculate

func (c *ApiRateLimitCalculator) Calculate(apiClient *ApiClient) (int, time.Duration, error)

type AsyncResponseHandler

type AsyncResponseHandler func(res *http.Response) error

type BatchSave

type BatchSave struct {
	// contains filtered or unexported fields
}

Insert data by batch can increase database performance drastically, this class aim to make batch-save easier, It takes care the database operation for specified `slotType`, records got saved into database whenever cache hits The `size` limit, remember to call the `Close` method to save the last batch

func NewBatchSave

func NewBatchSave(db *gorm.DB, slotType reflect.Type, size int) (*BatchSave, error)

func (*BatchSave) Add

func (c *BatchSave) Add(slot interface{}) error

func (*BatchSave) Close

func (c *BatchSave) Close() error

func (*BatchSave) Flush

func (c *BatchSave) Flush() error

type BatchSaveDivider

type BatchSaveDivider struct {
	// contains filtered or unexported fields
}

Holds a map of BatchInsert, return `*BatchInsert` for a specific records, so caller can do batch operation for it

func NewBatchSaveDivider

func NewBatchSaveDivider(db *gorm.DB, batchSize int) *BatchSaveDivider

Return a new BatchInsertDivider instance

func (*BatchSaveDivider) Close

func (d *BatchSaveDivider) Close() error

close all batches so all rest records get saved into db as well

func (*BatchSaveDivider) ForType

func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error)

return *BatchInsert for specified type

func (*BatchSaveDivider) OnNewBatchSave

func (d *BatchSaveDivider) OnNewBatchSave(cb OnNewBatchSave)

type CursorIterator

type CursorIterator struct {
	// contains filtered or unexported fields
}

func NewCursorIterator

func NewCursorIterator(db *gorm.DB, cursor *sql.Rows, elemType reflect.Type) (*CursorIterator, error)

func (*CursorIterator) Close

func (c *CursorIterator) Close() error

func (*CursorIterator) Fetch

func (c *CursorIterator) Fetch() (interface{}, error)

func (*CursorIterator) HasNext

func (c *CursorIterator) HasNext() bool

type DataConvertHandler

type DataConvertHandler func(row interface{}) ([]interface{}, error)

Accept row from source cursor, return list of entities that need to be stored

type DataConverter

type DataConverter struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

DataConverter helps you convert Data from Tool Layer Tables to Domain Layer Tables It reads rows from specified Iterator, and feed it into `Convter` handler you can return arbitrary domain layer entities from this handler, ApiConverter would first delete old data by their RawDataOrigin information, and then perform a batch save operation for you.

func NewDataConverter

func NewDataConverter(args DataConverterArgs) (*DataConverter, error)

func (*DataConverter) Execute

func (converter *DataConverter) Execute() error

type DataConverterArgs

type DataConverterArgs struct {
	RawDataSubTaskArgs
	// Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, `github:GithubIssue`
	InputRowType reflect.Type
	// Cursor to a set of Tool Layer Records
	Input     *sql.Rows
	Convert   DataConvertHandler
	BatchSize int
}

type DateInterator

type DateInterator struct {
	Days    int
	Current int
	// contains filtered or unexported fields
}

func NewDateInterator

func NewDateInterator(days int) (*DateInterator, error)

func (*DateInterator) Close

func (c *DateInterator) Close() error

func (*DateInterator) Fetch

func (c *DateInterator) Fetch() (interface{}, error)

func (*DateInterator) HasNext

func (c *DateInterator) HasNext() bool

type DatePair

type DatePair struct {
	PairStartTime time.Time
	PairEndTime   time.Time
}

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

func NewDefaultLogger

func NewDefaultLogger(log *logrus.Logger, prefix string) *DefaultLogger

func (*DefaultLogger) Debug

func (l *DefaultLogger) Debug(format string, a ...interface{})

func (*DefaultLogger) Error

func (l *DefaultLogger) Error(format string, a ...interface{})

func (*DefaultLogger) Info

func (l *DefaultLogger) Info(format string, a ...interface{})

func (*DefaultLogger) IsLevelEnabled

func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool

func (*DefaultLogger) Log

func (l *DefaultLogger) Log(level core.LogLevel, format string, a ...interface{})

func (*DefaultLogger) Nested

func (l *DefaultLogger) Nested(name string) core.Logger

func (*DefaultLogger) Printf

func (l *DefaultLogger) Printf(format string, a ...interface{})

func (*DefaultLogger) Warn

func (l *DefaultLogger) Warn(format string, a ...interface{})

type DefaultSubTaskContext

type DefaultSubTaskContext struct {
	// contains filtered or unexported fields
}

SubTaskContext default implementation

func (DefaultSubTaskContext) GetConfig

func (c DefaultSubTaskContext) GetConfig(name string) string

func (DefaultSubTaskContext) GetContext

func (c DefaultSubTaskContext) GetContext() context.Context

func (DefaultSubTaskContext) GetData

func (c DefaultSubTaskContext) GetData() interface{}

func (DefaultSubTaskContext) GetDb

func (c DefaultSubTaskContext) GetDb() *gorm.DB

func (DefaultSubTaskContext) GetLogger

func (c DefaultSubTaskContext) GetLogger() core.Logger

func (DefaultSubTaskContext) GetName

func (c DefaultSubTaskContext) GetName() string

func (*DefaultSubTaskContext) IncProgress

func (c *DefaultSubTaskContext) IncProgress(quantity int)

func (*DefaultSubTaskContext) SetProgress

func (c *DefaultSubTaskContext) SetProgress(current int, total int)

func (*DefaultSubTaskContext) TaskContext

func (c *DefaultSubTaskContext) TaskContext() core.TaskContext

type DefaultTaskContext

type DefaultTaskContext struct {
	// contains filtered or unexported fields
}

TaskContext default implementation

func (DefaultTaskContext) GetConfig

func (c DefaultTaskContext) GetConfig(name string) string

func (DefaultTaskContext) GetContext

func (c DefaultTaskContext) GetContext() context.Context

func (DefaultTaskContext) GetData

func (c DefaultTaskContext) GetData() interface{}

func (DefaultTaskContext) GetDb

func (c DefaultTaskContext) GetDb() *gorm.DB

func (DefaultTaskContext) GetLogger

func (c DefaultTaskContext) GetLogger() core.Logger

func (DefaultTaskContext) GetName

func (c DefaultTaskContext) GetName() string

func (*DefaultTaskContext) IncProgress

func (c *DefaultTaskContext) IncProgress(quantity int)

func (*DefaultTaskContext) SetData

func (c *DefaultTaskContext) SetData(data interface{})

func (*DefaultTaskContext) SetProgress

func (c *DefaultTaskContext) SetProgress(current int, total int)

func (*DefaultTaskContext) SubTaskContext

func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, error)

type Iterator

type Iterator interface {
	HasNext() bool
	Fetch() (interface{}, error)
	Close() error
}

type OnNewBatchSave

type OnNewBatchSave func(rowType reflect.Type) error

type Pager

type Pager struct {
	Page int
	Skip int
	Size int
}

type RateLimitedApiClient

type RateLimitedApiClient interface {
	GetAsync(path string, query url.Values, header http.Header, handler ApiAsyncCallback) error
	WaitAsync() error
	GetQps() float64
}

type RawData

type RawData struct {
	ID        uint64 `gorm:"primaryKey"`
	Params    string `gorm:"type:varchar(255);index"`
	Data      datatypes.JSON
	Url       string
	Input     datatypes.JSON
	CreatedAt time.Time
}

Table structure for raw data storage

type RawDataExtractor

type RawDataExtractor func(row *RawData) ([]interface{}, error)

Accept raw json body and params, return list of entities that need to be stored

type RawDataSubTask

type RawDataSubTask struct {
	// contains filtered or unexported fields
}

Common features for raw data sub tasks

type RawDataSubTaskArgs

type RawDataSubTaskArgs struct {
	Ctx core.SubTaskContext

	//	Table store raw data
	Table string `comment:"Raw data table name"`

	//	This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
	//	set of data to be process, for example, we process JiraIssues by Board
	Params interface{} `comment:"To identify a set of records with same UrlTemplate, i.e. {SourceId, BoardId} for jira entities"`
}

type RequestData

type RequestData struct {
	Pager  *Pager
	Params interface{}
	Input  interface{}
}

type WorkerScheduler

type WorkerScheduler struct {
	// contains filtered or unexported fields
}

func NewWorkerScheduler

func NewWorkerScheduler(workerNum int, maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error)

NewWorkerScheduler 创建一个并行执行的调度器,控制最大运行数和每秒最大运行数量 NewWorkerScheduler Create a parallel scheduler to control the maximum number of runs and the maximum number of runs per second 注意: task执行是无序的 Warning: task execution is out of order

func (*WorkerScheduler) Release

func (s *WorkerScheduler) Release()

func (*WorkerScheduler) Submit

func (s *WorkerScheduler) Submit(task func() error, pool ...*ants.Pool) error

func (*WorkerScheduler) WaitUntilFinish

func (s *WorkerScheduler) WaitUntilFinish() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL