helper

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2023 License: Apache-2.0, MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DateTimeFormats []DateTimeFormatItem

DateTimeFormats FIXME ...

View Source
var ErrFinishCollect = errors.Default.New("finish collect")

ErrFinishCollect is a error which will finish this collector

View Source
var ErrIgnoreAndContinue = errors.Default.New("ignore and continue")

ErrIgnoreAndContinue is a error which should be ignored

View Source
var HttpMinStatusRetryCode = http.StatusBadRequest

HttpMinStatusRetryCode is which status will retry

Functions

func AddMissingSlashToURL

func AddMissingSlashToURL(baseUrl *string)

AddMissingSlashToURL FIXME ...

func ConvertStringToTime

func ConvertStringToTime(timeString string) (t time.Time, err error)

ConvertStringToTime FIXME ...

func Decode added in v0.14.0

func Decode(source interface{}, target interface{}, vld *validator.Validate) errors.Error

Decode decodes `source` into `target`. Pass an optional validator to validate the target.

func DecodeMapStruct

func DecodeMapStruct(input map[string]interface{}, result interface{}) errors.Error

DecodeMapStruct with time.Time and Iso8601Time support

func DecodeStruct

func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, tag string) errors.Error

DecodeStruct validates `input` struct with `validator` and set it into viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `input` must be a pointer

func EncodeStruct

func EncodeStruct(input *viper.Viper, output interface{}, tag string) errors.Error

EncodeStruct encodes struct from viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `object` must be a pointer

func GetLimitOffset added in v0.15.0

func GetLimitOffset(q url.Values, pageSizeKey, pageKey string) (limit int, offset int)

GetLimitOffset extract page and page size, then calculus the limit and offset from them

func GetRawMessageArrayFromResponse

func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, errors.Error)

GetRawMessageArrayFromResponse FIXME ...

func GetRawMessageDirectFromResponse

func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, errors.Error)

GetRawMessageDirectFromResponse FIXME ...

func GetURIStringPointer

func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, errors.Error)

GetURIStringPointer FIXME ...

func Iso8601TimeToTime

func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time

Iso8601TimeToTime FIXME ...

func MakePipelinePlanSubtasks added in v0.12.0

func MakePipelinePlanSubtasks(subtaskMetas []core.SubTaskMeta, entities []string) ([]string, errors.Error)

MakePipelinePlanSubtasks generates subtasks list based on sub-task meta information and entities wanted by user

func NewDefaultTaskContext

func NewDefaultTaskContext(
	ctx context.Context,
	basicRes core.BasicRes,
	name string,
	subtasks map[string]bool,
	progress chan core.RunningProgress,
) core.TaskContext

NewDefaultTaskContext holds everything needed by the task execution.

func NewStandaloneSubTaskContext

func NewStandaloneSubTaskContext(
	ctx context.Context,
	basicRes core.BasicRes,
	name string,
	data interface{},
) core.SubTaskContext

NewStandaloneSubTaskContext returns a stand-alone core.SubTaskContext, not attached to any core.TaskContext. Use this if you need to run/debug a subtask without going through the usual workflow.

func RemoveStartingSlashFromPath

func RemoveStartingSlashFromPath(relativePath string) string

RemoveStartingSlashFromPath FIXME ...

func UnmarshalResponse

func UnmarshalResponse(res *http.Response, v interface{}) errors.Error

UnmarshalResponse FIXME ...

func UpdateEncryptFields added in v0.12.0

func UpdateEncryptFields(val interface{}, update func(in string) (string, errors.Error)) errors.Error

UpdateEncryptFields update fields of val with tag `encrypt:"yes|true"`

Types

type AccessToken added in v0.12.0

type AccessToken struct {
	Token string `mapstructure:"token" validate:"required" json:"token" encrypt:"yes"`
}

AccessToken FIXME ...

type ApiAsyncClient

type ApiAsyncClient struct {
	*ApiClient
	// contains filtered or unexported fields
}

ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic You may submit multiple requests at once by calling `DoGetAsync`, and those requests will be performed in parallel with rate-limit support

func CreateAsyncApiClient

func CreateAsyncApiClient(
	taskCtx core.TaskContext,
	apiClient *ApiClient,
	rateLimiter *ApiRateLimitCalculator,
) (*ApiAsyncClient, errors.Error)

CreateAsyncApiClient creates a new ApiAsyncClient

func (*ApiAsyncClient) DoAsync

func (apiClient *ApiAsyncClient) DoAsync(
	method string,
	path string,
	query url.Values,
	body interface{},
	header http.Header,
	handler common.ApiAsyncCallback,
	retry int,
)

DoAsync would carry out an asynchronous request

func (*ApiAsyncClient) DoGetAsync added in v0.12.0

func (apiClient *ApiAsyncClient) DoGetAsync(
	path string,
	query url.Values,
	header http.Header,
	handler common.ApiAsyncCallback,
)

DoGetAsync Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests

func (*ApiAsyncClient) DoPostAsync added in v0.14.0

func (apiClient *ApiAsyncClient) DoPostAsync(
	path string,
	query url.Values,
	body interface{},
	header http.Header,
	handler common.ApiAsyncCallback,
)

DoPostAsync Enqueue an api post request, the request may be sent sometime in future in parallel with other api requests

func (*ApiAsyncClient) GetMaxRetry

func (apiClient *ApiAsyncClient) GetMaxRetry() int

GetMaxRetry returns the maximum retry attempts for a request

func (*ApiAsyncClient) GetNumOfWorkers added in v0.12.0

func (apiClient *ApiAsyncClient) GetNumOfWorkers() int

GetNumOfWorkers to return the Workers count if scheduler.

func (*ApiAsyncClient) HasError added in v0.12.0

func (apiClient *ApiAsyncClient) HasError() bool

HasError to return if the scheduler has Error

func (*ApiAsyncClient) NextTick added in v0.12.0

func (apiClient *ApiAsyncClient) NextTick(task func() errors.Error)

NextTick to return the NextTick of scheduler

func (*ApiAsyncClient) Release added in v0.12.0

func (apiClient *ApiAsyncClient) Release()

Release will release the ApiAsyncClient with scheduler

func (*ApiAsyncClient) SetMaxRetry

func (apiClient *ApiAsyncClient) SetMaxRetry(
	maxRetry int,
)

SetMaxRetry sets the maximum retry attempts for a request

func (*ApiAsyncClient) WaitAsync

func (apiClient *ApiAsyncClient) WaitAsync() errors.Error

WaitAsync blocks until all async requests were done

type ApiClient

type ApiClient struct {
	// contains filtered or unexported fields
}

ApiClient is designed for simple api requests

func NewApiClient

func NewApiClient(
	ctx context.Context,
	endpoint string,
	headers map[string]string,
	timeout time.Duration,
	proxy string,
	br core.BasicRes,
) (*ApiClient, errors.Error)

NewApiClient FIXME ...

func (*ApiClient) Do

func (apiClient *ApiClient) Do(
	method string,
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, errors.Error)

Do FIXME ...

func (*ApiClient) Get

func (apiClient *ApiClient) Get(
	path string,
	query url.Values,
	headers http.Header,
) (*http.Response, errors.Error)

Get FIXME ...

func (*ApiClient) GetAfterFunction added in v0.13.0

func (apiClient *ApiClient) GetAfterFunction() common.ApiClientAfterResponse

GetAfterFunction return afterResponseFunction

func (*ApiClient) GetBeforeFunction added in v0.13.0

func (apiClient *ApiClient) GetBeforeFunction() common.ApiClientBeforeRequest

GetBeforeFunction return beforeResponseFunction

func (*ApiClient) GetEndpoint

func (apiClient *ApiClient) GetEndpoint() string

GetEndpoint FIXME ...

func (*ApiClient) GetHeaders

func (apiClient *ApiClient) GetHeaders() map[string]string

GetHeaders FIXME ...

func (*ApiClient) GetTimeout added in v0.14.0

func (apiClient *ApiClient) GetTimeout() time.Duration

GetTimeout FIXME ...

func (*ApiClient) Post

func (apiClient *ApiClient) Post(
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, errors.Error)

Post FIXME ...

func (*ApiClient) SetAfterFunction

func (apiClient *ApiClient) SetAfterFunction(callback common.ApiClientAfterResponse)

SetAfterFunction will set afterResponseFunction don't call this function directly in collector, use Collector.AfterResponse instead.

func (*ApiClient) SetBeforeFunction

func (apiClient *ApiClient) SetBeforeFunction(callback common.ApiClientBeforeRequest)

SetBeforeFunction will set beforeResponseFunction

func (*ApiClient) SetContext

func (apiClient *ApiClient) SetContext(ctx context.Context)

SetContext FIXME ...

func (*ApiClient) SetEndpoint

func (apiClient *ApiClient) SetEndpoint(endpoint string)

SetEndpoint FIXME ...

func (*ApiClient) SetHeaders

func (apiClient *ApiClient) SetHeaders(headers map[string]string)

SetHeaders FIXME ...

func (*ApiClient) SetLogger

func (apiClient *ApiClient) SetLogger(logger core.Logger)

SetLogger FIXME ...

func (*ApiClient) SetProxy

func (apiClient *ApiClient) SetProxy(proxyUrl string) errors.Error

SetProxy FIXME ...

func (*ApiClient) SetTimeout

func (apiClient *ApiClient) SetTimeout(timeout time.Duration)

SetTimeout FIXME ...

func (*ApiClient) Setup

func (apiClient *ApiClient) Setup(
	endpoint string,
	headers map[string]string,
	timeout time.Duration,

)

Setup FIXME ...

type ApiClientGetter added in v0.15.0

type ApiClientGetter interface {
	Get(
		path string,
		query url.Values,
		headers http.Header,
	) (*http.Response, errors.Error)
}

ApiClientGetter will be used for uint test

type ApiCollector

type ApiCollector struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

ApiCollector FIXME ...

func NewApiCollector

func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, errors.Error)

NewApiCollector allocates a new ApiCollector with the given args. ApiCollector can help us collecting data from api with ease, pass in a AsyncApiClient and tell it which part of response we want to save, ApiCollector will collect them from remote server and store them into database.

func (*ApiCollector) Execute

func (collector *ApiCollector) Execute() errors.Error

Execute will start collection

func (*ApiCollector) GetAfterResponse added in v0.13.0

func (collector *ApiCollector) GetAfterResponse() common.ApiClientAfterResponse

GetAfterResponse return apiClient's afterResponseFunction

func (*ApiCollector) SetAfterResponse

func (collector *ApiCollector) SetAfterResponse(f common.ApiClientAfterResponse)

SetAfterResponse set apiClient's afterResponseFunction

type ApiCollectorArgs

type ApiCollectorArgs struct {
	RawDataSubTaskArgs
	// UrlTemplate is used to generate the final URL for Api Collector to request
	// i.e. `api/3/issue/{{ .Input.IssueId }}/changelog`
	// For detail of what variables can be used, please check `RequestData`
	UrlTemplate string `comment:"GoTemplate for API url"`
	// Query would be sent out as part of the request URL
	Query func(reqData *RequestData) (url.Values, errors.Error) ``
	// Header would be sent out along with request
	Header func(reqData *RequestData) (http.Header, errors.Error)
	// PageSize tells ApiCollector the page size
	PageSize int
	// Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true
	Incremental bool `comment:"indicate if this collection is incremental update"`
	// ApiClient is a asynchronize api request client with qps
	ApiClient RateLimitedApiClient
	// Input helps us collect data based on previous collected data, like collecting changelogs based on jira
	// issue ids
	Input Iterator
	// GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page.
	// so `ApiCollector` could collect those pages in parallel for us
	GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error)
	// Concurrency specify qps for api that doesn't return total number of pages/records
	// NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means
	Concurrency    int
	ResponseParser func(res *http.Response) ([]json.RawMessage, errors.Error)
	AfterResponse  common.ApiClientAfterResponse
	RequestBody    func(reqData *RequestData) map[string]interface{}
	Method         string
}

ApiCollectorArgs FIXME ...

type ApiCollectorStateManager added in v0.15.0

type ApiCollectorStateManager struct {
	RawDataSubTaskArgs
	*ApiCollector
	*GraphqlCollector
	LatestState      models.CollectorLatestState
	CreatedDateAfter *time.Time
	ExecuteStart     time.Time
}

ApiCollectorStateManager save collector state in framework table

func NewApiCollectorWithState added in v0.15.0

func NewApiCollectorWithState(args RawDataSubTaskArgs, createdDateAfter *time.Time) (*ApiCollectorStateManager, errors.Error)

NewApiCollectorWithState create a new ApiCollectorStateManager

func (ApiCollectorStateManager) Execute added in v0.15.0

Execute the embedded collector and record execute state

func (ApiCollectorStateManager) ExecuteGraphQL added in v0.15.0

func (m ApiCollectorStateManager) ExecuteGraphQL() errors.Error

ExecuteGraphQL the embedded collector and record execute state

func (*ApiCollectorStateManager) InitCollector added in v0.15.0

func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) (err errors.Error)

InitCollector init the embedded collector

func (*ApiCollectorStateManager) InitGraphQLCollector added in v0.15.0

func (m *ApiCollectorStateManager) InitGraphQLCollector(args GraphqlCollectorArgs) (err errors.Error)

InitGraphQLCollector init the embedded collector

func (ApiCollectorStateManager) IsIncremental added in v0.15.0

func (m ApiCollectorStateManager) IsIncremental() bool

IsIncremental return if the old data can support collect incrementally. only when latest collection is success && (m.LatestState.CreatedDateAfter == nil means all data have been collected || CreatedDateAfter at this time exists and no before than in the LatestState) if CreatedDateAfter at this time not exists, collect incrementally only when "m.LatestState.CreatedDateAfter == nil"

type ApiExtractor

type ApiExtractor struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data It reads rows from specified raw data table, and feed it into `Extract` handler you can return arbitrary tool layer entities in this handler, ApiExtractor would first delete old data by their RawDataOrigin information, and then perform a batch save for you.

func NewApiExtractor

func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, errors.Error)

NewApiExtractor creates a new ApiExtractor

func (*ApiExtractor) Execute

func (extractor *ApiExtractor) Execute() errors.Error

Execute sub-task

type ApiExtractorArgs

type ApiExtractorArgs struct {
	RawDataSubTaskArgs
	Params    interface{}
	Extract   func(row *RawData) ([]interface{}, errors.Error)
	BatchSize int
}

ApiExtractorArgs FIXME ...

type ApiRateLimitCalculator

type ApiRateLimitCalculator struct {
	UserRateLimitPerHour   int
	GlobalRateLimitPerHour int
	MaxRetry               int
	Method                 string
	ApiPath                string
	DynamicRateLimit       func(res *http.Response) (int, time.Duration, errors.Error)
}

ApiRateLimitCalculator is A helper to calculate api rate limit dynamically, assuming api returning remaining/resettime information

func (*ApiRateLimitCalculator) Calculate

func (c *ApiRateLimitCalculator) Calculate(apiClient *ApiClient) (int, time.Duration, errors.Error)

Calculate FIXME ...

type AppKey added in v0.12.0

type AppKey struct {
	AppId     string `mapstructure:"app_id" validate:"required" json:"appId"`
	SecretKey string `mapstructure:"secret_key" validate:"required" json:"secretKey" encrypt:"yes"`
}

AppKey FIXME ...

type AsyncResponseHandler

type AsyncResponseHandler func(res *http.Response) error

AsyncResponseHandler FIXME ...

type BaseConnection added in v0.12.0

type BaseConnection struct {
	Name string `gorm:"type:varchar(100);uniqueIndex" json:"name" validate:"required"`
	common.Model
}

BaseConnection FIXME ...

type BasicAuth added in v0.12.0

type BasicAuth struct {
	Username string `mapstructure:"username" validate:"required" json:"username"`
	Password string `mapstructure:"password" validate:"required" json:"password" encrypt:"yes"`
}

BasicAuth FIXME ...

func (BasicAuth) GetEncodedToken added in v0.12.0

func (ba BasicAuth) GetEncodedToken() string

GetEncodedToken FIXME ...

type BatchSave

type BatchSave struct {
	// contains filtered or unexported fields
}

BatchSave performs mulitple records persistence of a specific type in one sql query to improve the performance

func NewBatchSave

func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int, tableName ...string) (*BatchSave, errors.Error)

NewBatchSave creates a new BatchSave instance

func (*BatchSave) Add

func (c *BatchSave) Add(slot interface{}) errors.Error

Add record to cache. BatchSave would flush them into Database when cache is max out

func (*BatchSave) Close

func (c *BatchSave) Close() errors.Error

Close would flash the cache and release resources

func (*BatchSave) Flush

func (c *BatchSave) Flush() errors.Error

Flush save cached records into database

type BatchSaveDivider

type BatchSaveDivider struct {
	// contains filtered or unexported fields
}

BatchSaveDivider creates and caches BatchSave, this is helpful when dealing with massive amount of data records with arbitrary types.

func NewBatchSaveDivider

func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string, params string) *BatchSaveDivider

NewBatchSaveDivider create a new BatchInsertDivider instance

func (*BatchSaveDivider) Close

func (d *BatchSaveDivider) Close() errors.Error

Close all batches so the rest records get saved into db

func (*BatchSaveDivider) ForType

func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, errors.Error)

ForType returns a `BatchSave` instance for specific type

type CSTTime

type CSTTime time.Time

CSTTime FIXME ...

func (*CSTTime) Scan

func (jt *CSTTime) Scan(v interface{}) error

Scan FIXME ...

func (*CSTTime) UnmarshalJSON

func (jt *CSTTime) UnmarshalJSON(b []byte) error

UnmarshalJSON FIXME ...

func (CSTTime) Value

func (jt CSTTime) Value() (driver.Value, error)

Value FIXME ...

type ConnectionApiHelper added in v0.12.0

type ConnectionApiHelper struct {
	// contains filtered or unexported fields
}

ConnectionApiHelper is used to write the CURD of connection

func NewConnectionHelper added in v0.12.0

func NewConnectionHelper(
	basicRes core.BasicRes,
	vld *validator.Validate,
) *ConnectionApiHelper

NewConnectionHelper FIXME ...

func (*ConnectionApiHelper) Create added in v0.12.0

func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) errors.Error

Create a connection record based on request body

func (*ConnectionApiHelper) Delete added in v0.12.0

func (c *ConnectionApiHelper) Delete(connection interface{}) errors.Error

Delete connection

func (*ConnectionApiHelper) First added in v0.12.0

func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) errors.Error

First finds connection from db by parsing request input and decrypt it

func (*ConnectionApiHelper) FirstById added in v0.12.0

func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) errors.Error

FirstById finds connection from db by id and decrypt it

func (*ConnectionApiHelper) List added in v0.12.0

func (c *ConnectionApiHelper) List(connections interface{}) errors.Error

List returns all connections with password/token decrypted

func (*ConnectionApiHelper) Patch added in v0.12.0

func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) errors.Error

Patch (Modify) a connection record based on request body

type CursorPager added in v0.13.0

type CursorPager struct {
	SkipCursor *string
	Size       int
}

CursorPager contains pagination information for a graphql request

type DalCursorIterator added in v0.12.0

type DalCursorIterator struct {
	// contains filtered or unexported fields
}

DalCursorIterator FIXME ...

func NewBatchedDalCursorIterator added in v0.13.0

func NewBatchedDalCursorIterator(db dal.Dal, cursor dal.Rows, elemType reflect.Type, batchSize int) (*DalCursorIterator, errors.Error)

NewBatchedDalCursorIterator FIXME ...

func NewDalCursorIterator added in v0.12.0

func NewDalCursorIterator(db dal.Dal, cursor dal.Rows, elemType reflect.Type) (*DalCursorIterator, errors.Error)

NewDalCursorIterator FIXME ...

func (*DalCursorIterator) Close added in v0.12.0

func (c *DalCursorIterator) Close() errors.Error

Close iterator

func (*DalCursorIterator) Fetch added in v0.12.0

func (c *DalCursorIterator) Fetch() (interface{}, errors.Error)

Fetch if batching is disabled, it'll read a single row, otherwise it'll read as many rows up to the batch size, and the runtime return type will be []interface{}. Note, HasNext needs to have been called before invoking this.

func (*DalCursorIterator) HasNext added in v0.12.0

func (c *DalCursorIterator) HasNext() bool

HasNext increments the row curser. If we're at the end, it'll return false.

type DataConvertHandler

type DataConvertHandler func(row interface{}) ([]interface{}, errors.Error)

DataConvertHandler Accept row from source cursor, return list of entities that need to be stored

type DataConverter

type DataConverter struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

DataConverter helps you convert Data from Tool Layer Tables to Domain Layer Tables It reads rows from specified Iterator, and feed it into `Converter` handler you can return arbitrary domain layer entities from this handler, ApiConverter would first delete old data by their RawDataOrigin information, and then perform a batch save operation for you.

func NewDataConverter

func NewDataConverter(args DataConverterArgs) (*DataConverter, errors.Error)

NewDataConverter function helps you create a DataConverter using DataConverterArgs. You can see the usage in plugins/github/tasks/pr_issue_convertor.go or other convertor file.

func (*DataConverter) Execute

func (converter *DataConverter) Execute() errors.Error

Execute function implements Subtask interface. It loads data from Tool Layer Tables using `Ctx.GetDal()`, convert Data using `converter.args.Convert` handler Then save data to Domain Layer Tables using BatchSaveDivider

type DataConverterArgs

type DataConverterArgs struct {
	RawDataSubTaskArgs
	// Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, `github:GithubIssue`
	InputRowType reflect.Type
	Input        dal.Rows
	Convert      DataConvertHandler
	BatchSize    int
}

DataConverterArgs includes the arguments about DataConverter. This will be used in Creating a DataConverter.

DataConverterArgs {
			InputRowType: 		type of inputRow ,
			Input:        		dal cursor,
			RawDataSubTaskArgs: args about raw data task
			Convert: 			main function including conversion logic
			BatchSize: 			batch size

type DateIterator

type DateIterator struct {
	Days    int
	Current int
	// contains filtered or unexported fields
}

DateIterator FIXME ...

func NewDateIterator

func NewDateIterator(days int) (*DateIterator, errors.Error)

NewDateIterator FIXME ...

func (*DateIterator) Close

func (c *DateIterator) Close() errors.Error

Close iterator

func (*DateIterator) Fetch

func (c *DateIterator) Fetch() (interface{}, errors.Error)

Fetch FIXME ...

func (*DateIterator) HasNext

func (c *DateIterator) HasNext() bool

HasNext FIXME ...

type DatePair

type DatePair struct {
	PairStartTime time.Time
	PairEndTime   time.Time
}

DatePair FIXME ...

type DateTimeFormatItem

type DateTimeFormatItem struct {
	Matcher *regexp.Regexp
	Format  string
}

DateTimeFormatItem FIXME ... TODO: move this to helper

type DefaultSubTaskContext

type DefaultSubTaskContext struct {
	LastProgressTime time.Time
	// contains filtered or unexported fields
}

DefaultSubTaskContext is default implementation

func (DefaultSubTaskContext) GetContext

func (c DefaultSubTaskContext) GetContext() context.Context

func (DefaultSubTaskContext) GetData

func (c DefaultSubTaskContext) GetData() interface{}

func (DefaultSubTaskContext) GetName

func (c DefaultSubTaskContext) GetName() string

func (*DefaultSubTaskContext) IncProgress

func (c *DefaultSubTaskContext) IncProgress(quantity int)

IncProgress FIXME ...

func (*DefaultSubTaskContext) SetProgress

func (c *DefaultSubTaskContext) SetProgress(current int, total int)

SetProgress FIXME ...

func (*DefaultSubTaskContext) TaskContext

func (c *DefaultSubTaskContext) TaskContext() core.TaskContext

TaskContext FIXME ...

type DefaultTaskContext

type DefaultTaskContext struct {
	// contains filtered or unexported fields
}

DefaultTaskContext is TaskContext default implementation

func (DefaultTaskContext) GetContext

func (c DefaultTaskContext) GetContext() context.Context

func (DefaultTaskContext) GetData

func (c DefaultTaskContext) GetData() interface{}

func (DefaultTaskContext) GetName

func (c DefaultTaskContext) GetName() string

func (*DefaultTaskContext) IncProgress

func (c *DefaultTaskContext) IncProgress(quantity int)

IncProgress FIXME ...

func (*DefaultTaskContext) SetData

func (c *DefaultTaskContext) SetData(data interface{})

SetData FIXME ...

func (*DefaultTaskContext) SetProgress

func (c *DefaultTaskContext) SetProgress(current int, total int)

SetProgress FIXME ...

func (*DefaultTaskContext) SubTaskContext

func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, errors.Error)

SubTaskContext FIXME ...

type GraphqlAsyncClient added in v0.13.0

type GraphqlAsyncClient struct {
	// contains filtered or unexported fields
}

GraphqlAsyncClient send graphql one by one

func CreateAsyncGraphqlClient added in v0.13.0

func CreateAsyncGraphqlClient(
	taskCtx core.TaskContext,
	graphqlClient *graphql.Client,
	logger core.Logger,
	getRateRemaining func(context.Context, *graphql.Client, core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error),
) (*GraphqlAsyncClient, errors.Error)

CreateAsyncGraphqlClient creates a new GraphqlAsyncClient

func (*GraphqlAsyncClient) GetMaxRetry added in v0.14.3

func (apiClient *GraphqlAsyncClient) GetMaxRetry() (int, time.Duration)

GetMaxRetry returns the maximum retry attempts for a request

func (*GraphqlAsyncClient) NextTick added in v0.13.0

func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err error))

NextTick to return the NextTick of scheduler

func (*GraphqlAsyncClient) Query added in v0.13.0

func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, error)

Query send a graphql request when get lock []graphql.DataError are the errors returned in response body errors.Error is other error

func (*GraphqlAsyncClient) Release added in v0.14.3

func (apiClient *GraphqlAsyncClient) Release()

Release will release the ApiAsyncClient with scheduler

func (*GraphqlAsyncClient) SetGetRateCost added in v0.13.0

func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q interface{}) int)

SetGetRateCost to calculate how many rate cost if not set, all query just cost 1

func (*GraphqlAsyncClient) SetMaxRetry added in v0.14.3

func (apiClient *GraphqlAsyncClient) SetMaxRetry(
	maxRetry int,
	waitBeforeRetry time.Duration,
)

SetMaxRetry sets the maximum retry attempts for a request

func (*GraphqlAsyncClient) Wait added in v0.13.0

func (apiClient *GraphqlAsyncClient) Wait()

Wait blocks until all async requests were done

type GraphqlAsyncResponseHandler added in v0.13.0

type GraphqlAsyncResponseHandler func(res *http.Response) error

GraphqlAsyncResponseHandler callback function to handle the Response asynchronously

type GraphqlCollector added in v0.13.0

type GraphqlCollector struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

GraphqlCollector help you collect data from Graphql services

func NewGraphqlCollector added in v0.13.0

func NewGraphqlCollector(args GraphqlCollectorArgs) (*GraphqlCollector, errors.Error)

NewGraphqlCollector allocates a new GraphqlCollector with the given args. GraphqlCollector can help us collect data from api with ease, pass in a AsyncGraphqlClient and tell it which part of response we want to save, GraphqlCollector will collect them from remote server and store them into database.

func (*GraphqlCollector) Execute added in v0.13.0

func (collector *GraphqlCollector) Execute() errors.Error

Execute api collection

func (*GraphqlCollector) HasError added in v0.14.3

func (collector *GraphqlCollector) HasError() bool

HasError return if any error occurred

type GraphqlCollectorArgs added in v0.13.0

type GraphqlCollectorArgs struct {
	RawDataSubTaskArgs
	// BuildQuery would be sent out as part of the request URL
	BuildQuery func(reqData *GraphqlRequestData) (query interface{}, variables map[string]interface{}, err error)
	// PageSize tells ApiCollector the page size
	PageSize int
	// GraphqlClient is a asynchronize api request client with qps
	GraphqlClient *GraphqlAsyncClient
	// Input helps us collect data based on previous collected data, like collecting changelogs based on jira
	// issue ids
	Input Iterator
	// how many times fetched from input, default 1 means only fetch once
	// NOTICE: InputStep=1 will fill value as item and InputStep>1 will fill value as []item
	InputStep int
	// Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true
	Incremental bool `comment:"indicate if this collection is incremental update"`
	// GetPageInfo is to tell `GraphqlCollector` is page information
	GetPageInfo func(query interface{}, args *GraphqlCollectorArgs) (*GraphqlQueryPageInfo, error)
	BatchSize   int
	// one of ResponseParser and ResponseParserEvenWhenDataErrors is required to parse response
	ResponseParser               func(query interface{}, variables map[string]interface{}) ([]interface{}, error)
	ResponseParserWithDataErrors func(query interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error)
}

GraphqlCollectorArgs arguments needed by GraphqlCollector

type GraphqlQueryPageInfo added in v0.13.0

type GraphqlQueryPageInfo struct {
	EndCursor   string `json:"endCursor"`
	HasNextPage bool   `json:"hasNextPage"`
}

GraphqlQueryPageInfo contains the pagination data

type GraphqlRequestData added in v0.13.0

type GraphqlRequestData struct {
	Pager     *CursorPager
	Params    interface{}
	Input     interface{}
	InputJSON []byte
}

GraphqlRequestData is the input of `UrlTemplate` `BuildQuery` and `Header`, so we can generate them dynamically

type Iso8601Time

type Iso8601Time struct {
	// contains filtered or unexported fields
}

Iso8601Time is type time.Time

func (Iso8601Time) MarshalJSON

func (jt Iso8601Time) MarshalJSON() ([]byte, error)

MarshalJSON FIXME ...

func (*Iso8601Time) Scan added in v0.14.6

func (jt *Iso8601Time) Scan(v interface{}) error

Scan FIXME ...

func (*Iso8601Time) String

func (jt *Iso8601Time) String() string

func (*Iso8601Time) ToNullableTime

func (jt *Iso8601Time) ToNullableTime() *time.Time

ToNullableTime FIXME ...

func (*Iso8601Time) ToTime

func (jt *Iso8601Time) ToTime() time.Time

ToTime FIXME ...

func (*Iso8601Time) UnmarshalJSON

func (jt *Iso8601Time) UnmarshalJSON(b []byte) error

UnmarshalJSON FIXME ...

func (*Iso8601Time) Value added in v0.14.6

func (jt *Iso8601Time) Value() (driver.Value, error)

Value FIXME ...

type Iterator

type Iterator interface {
	HasNext() bool
	Fetch() (interface{}, errors.Error)
	Close() errors.Error
}

Iterator FIXME ...

type ListBaseNode added in v0.12.0

type ListBaseNode struct {
	// contains filtered or unexported fields
}

ListBaseNode 'abstract' base struct for Nodes that are chained in a linked list manner

func NewListBaseNode added in v0.12.0

func NewListBaseNode() *ListBaseNode

NewListBaseNode create and init a new node (only to be called by subclasses)

func (*ListBaseNode) Data added in v0.13.0

func (l *ListBaseNode) Data() interface{}

Data returns data of the node

func (*ListBaseNode) Next added in v0.12.0

func (l *ListBaseNode) Next() interface{}

Next return the next node

func (*ListBaseNode) SetNext added in v0.12.0

func (l *ListBaseNode) SetNext(next interface{})

SetNext updates the next pointer of the node

type Pager

type Pager struct {
	Page int
	Skip int
	Size int
}

Pager contains pagination information for a api request

type Queue added in v0.12.0

type Queue struct {
	// contains filtered or unexported fields
}

Queue represetns a queue

func NewQueue added in v0.12.0

func NewQueue() *Queue

NewQueue create and init a new Queue

func (*Queue) Clean added in v0.12.0

func (q *Queue) Clean()

Clean remove all node on queue

func (*Queue) CleanWithOutLock added in v0.12.0

func (q *Queue) CleanWithOutLock()

CleanWithOutLock is no lock mode of Clean

func (*Queue) GetCount added in v0.12.0

func (q *Queue) GetCount() int64

GetCount get the node count

func (*Queue) GetCountWithOutLock added in v0.12.0

func (q *Queue) GetCountWithOutLock() int64

GetCountWithOutLock is no lock mode of GetCount

func (*Queue) Pull added in v0.12.0

func (q *Queue) Pull(add *int64) QueueNode

Pull get a node from queue

func (*Queue) PullWithOutLock added in v0.12.0

func (q *Queue) PullWithOutLock() QueueNode

PullWithOutLock is no lock mode of Pull

func (*Queue) Push added in v0.12.0

func (q *Queue) Push(node QueueNode)

Push add a node to queue

func (*Queue) PushWithoutLock added in v0.13.0

func (q *Queue) PushWithoutLock(node QueueNode)

PushWithoutLock is no lock mode of Push

type QueueIterator added in v0.12.0

type QueueIterator struct {
	// contains filtered or unexported fields
}

QueueIterator implements Iterator based on Queue

func NewQueueIterator added in v0.12.0

func NewQueueIterator() *QueueIterator

NewQueueIterator creates a new QueueIterator

func (*QueueIterator) Close added in v0.12.0

func (q *QueueIterator) Close() errors.Error

Close releases resources

func (*QueueIterator) Fetch added in v0.12.0

func (q *QueueIterator) Fetch() (interface{}, errors.Error)

Fetch current item

func (*QueueIterator) HasNext added in v0.12.0

func (q *QueueIterator) HasNext() bool

HasNext increments the row curser. If we're at the end, it'll return false.

func (*QueueIterator) Push added in v0.12.0

func (q *QueueIterator) Push(data QueueNode)

Push a data into queue

type QueueIteratorNode added in v0.12.0

type QueueIteratorNode struct {
	// contains filtered or unexported fields
}

QueueIteratorNode implements the helper.Iterator interface with ability to accept new item when being iterated

func NewQueueIteratorNode added in v0.13.0

func NewQueueIteratorNode(data interface{}) *QueueIteratorNode

NewQueueIteratorNode creates a new QueueIteratorNode

func (*QueueIteratorNode) Data added in v0.13.0

func (q *QueueIteratorNode) Data() interface{}

Data returns data of the node

func (*QueueIteratorNode) Next added in v0.12.0

func (q *QueueIteratorNode) Next() interface{}

Next return the next node

func (*QueueIteratorNode) SetNext added in v0.12.0

func (q *QueueIteratorNode) SetNext(next interface{})

SetNext updates the next pointer of the node

type QueueNode added in v0.12.0

type QueueNode interface {
	Next() interface{}
	SetNext(next interface{})
	Data() interface{}
}

QueueNode represents a node in the queue

type RateLimitedApiClient

type RateLimitedApiClient interface {
	DoGetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback)
	DoPostAsync(path string, query url.Values, body interface{}, header http.Header, handler common.ApiAsyncCallback)
	WaitAsync() errors.Error
	HasError() bool
	NextTick(task func() errors.Error)
	GetNumOfWorkers() int
	GetAfterFunction() common.ApiClientAfterResponse
	SetAfterFunction(callback common.ApiClientAfterResponse)
	Release()
}

RateLimitedApiClient FIXME ...

type RawData

type RawData struct {
	ID        uint64 `gorm:"primaryKey"`
	Params    string `gorm:"type:varchar(255);index"`
	Data      []byte
	Url       string
	Input     datatypes.JSON
	CreatedAt time.Time
}

RawData is raw data structure in DB storage

type RawDataSubTask

type RawDataSubTask struct {
	// contains filtered or unexported fields
}

RawDataSubTask is Common features for raw data sub-tasks

func NewRawDataSubTask added in v0.15.0

func NewRawDataSubTask(args RawDataSubTaskArgs) (*RawDataSubTask, errors.Error)

NewRawDataSubTask constructor for RawDataSubTask

func (*RawDataSubTask) GetParams added in v0.15.0

func (r *RawDataSubTask) GetParams() string

GetParams returns the raw params

func (*RawDataSubTask) GetTable added in v0.15.0

func (r *RawDataSubTask) GetTable() string

GetTable returns the raw table name

type RawDataSubTaskArgs

type RawDataSubTaskArgs struct {
	Ctx core.SubTaskContext

	//	Table store raw data
	Table string `comment:"Raw data table name"`

	//	This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
	//	set of data to be process, for example, we process JiraIssues by Board
	Params interface{} `comment:"To identify a set of records with same UrlTemplate, i.e. {ConnectionId, BoardId} for jira entities"`
}

RawDataSubTaskArgs FIXME ...

type RegexEnricher added in v0.15.0

type RegexEnricher struct {
	// contains filtered or unexported fields
}

RegexEnricher process value with regex pattern

func NewRegexEnricher added in v0.15.0

func NewRegexEnricher() *RegexEnricher

NewRegexEnricher initialize a regexEnricher

func (*RegexEnricher) AddRegexp added in v0.15.0

func (r *RegexEnricher) AddRegexp(patterns ...string) errors.Error

AddRegexp will add compiled regular expression for pattern to regexpMap

func (*RegexEnricher) GetEnrichResult added in v0.15.0

func (r *RegexEnricher) GetEnrichResult(pattern string, v string, result string) string

GetEnrichResult will get compiled regular expression from map by pattern, and check if v matches compiled regular expression, lastly, will return corresponding value(result or empty)

type RequestData

type RequestData struct {
	Pager     *Pager
	Params    interface{}
	Input     interface{}
	InputJSON []byte
}

RequestData is the input of `UrlTemplate` `Query` and `Header`, so we can generate them dynamically

type RestConnection added in v0.12.0

type RestConnection struct {
	BaseConnection   `mapstructure:",squash"`
	Endpoint         string `mapstructure:"endpoint" validate:"required" json:"endpoint"`
	Proxy            string `mapstructure:"proxy" json:"proxy"`
	RateLimitPerHour int    `comment:"api request rate limit per hour" json:"rateLimitPerHour"`
}

RestConnection FIXME ...

type WorkerScheduler

type WorkerScheduler struct {
	// contains filtered or unexported fields
}

WorkerScheduler runs asynchronous tasks in parallel with throttling support

func NewWorkerScheduler

func NewWorkerScheduler(
	ctx context.Context,
	workerNum int,
	maxWork int,
	maxWorkDuration time.Duration,
	logger core.Logger,
) (*WorkerScheduler, errors.Error)

NewWorkerScheduler creates a WorkerScheduler

func (*WorkerScheduler) HasError added in v0.12.0

func (s *WorkerScheduler) HasError() bool

HasError return if any error occurred

func (*WorkerScheduler) NextTick added in v0.12.0

func (s *WorkerScheduler) NextTick(task func() errors.Error)

NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by SubmitBlocking method IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory

func (*WorkerScheduler) Release

func (s *WorkerScheduler) Release()

Release resources

func (*WorkerScheduler) SubmitBlocking added in v0.12.0

func (s *WorkerScheduler) SubmitBlocking(task func() errors.Error)

SubmitBlocking enqueues a async task to ants, the task will be executed in future when timing is right. It doesn't return error because it wouldn't be any when with a Blocking semantic, returned error does nothing but causing confusion, more often, people thought it is returned by the task. Since it is async task, the callframes would not be available for production mode, you can export Environment Varaible ASYNC_CF=true to enable callframes capturing when debugging. IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call SubmitNonBlocking instead when number of tasks is relatively small.

func (*WorkerScheduler) Wait added in v0.12.0

func (s *WorkerScheduler) Wait() errors.Error

Wait blocks current go-routine until all workers returned

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL