helper

package
v0.11.0-testdockerpush Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2022 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const BATCH_SAVE_UPDATE_ONLY = 0

Variables

View Source
var DateTimeFormats []DateTimeFormatItem
View Source
var HttpMinStatusRetryCode = http.StatusBadRequest

Functions

func AddMissingSlashToURL

func AddMissingSlashToURL(baseUrl *string)

func ConvertStringToTime

func ConvertStringToTime(timeString string) (t time.Time, err error)

func DecodeMapStruct

func DecodeMapStruct(input map[string]interface{}, result interface{}) error

mapstructure.Decode with time.Time and Iso8601Time support

func DecodeStruct

func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, tag string) error

DecodeStruct validates `input` struct with `validator` and set it into viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `input` must be a pointer

func EncodeStruct

func EncodeStruct(input *viper.Viper, output interface{}, tag string) error

EncodeStruct encodes struct from viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `object` must be a pointer

func GetRawMessageArrayFromResponse

func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error)

func GetRawMessageDirectFromResponse

func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error)

func GetURIStringPointer

func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, error)

func Iso8601TimeToTime

func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time

func NewDefaultTaskContext

func NewDefaultTaskContext(
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
	ctx context.Context,
	name string,
	subtasks map[string]bool,
	progress chan core.RunningProgress,
) core.TaskContext

func NewDefaultTaskLogger

func NewDefaultTaskLogger(log *logrus.Logger, prefix string, loggerPool map[string]*logrus.Logger) core.Logger

func NewStandaloneSubTaskContext

func NewStandaloneSubTaskContext(
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
	ctx context.Context,
	name string,
	data interface{},
) core.SubTaskContext

NewStandaloneSubTaskContext returns a stand-alone core.SubTaskContext, not attached to any core.TaskContext. Use this if you need to run/debug a subtask without going through the usual workflow.

func RemoveStartingSlashFromPath

func RemoveStartingSlashFromPath(relativePath string) string

func UnmarshalResponse

func UnmarshalResponse(res *http.Response, v interface{}) error

func UpdateEncryptFields added in v0.12.0

func UpdateEncryptFields(val interface{}, update func(in string) (string, error)) error

UpdateEncryptFields update fields of val with tag `encrypt:"yes|true"`

Types

type AccessToken added in v0.12.0

type AccessToken struct {
	Token string `mapstructure:"token" validate:"required" json:"token" encrypt:"yes"`
}

type ApiAsyncClient

type ApiAsyncClient struct {
	*ApiClient
	// contains filtered or unexported fields
}

ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic You may submit multiple requests at once by calling `GetAsync`, and those requests will be performed in parallel with rate-limit support

func CreateAsyncApiClient

func CreateAsyncApiClient(
	taskCtx core.TaskContext,
	apiClient *ApiClient,
	rateLimiter *ApiRateLimitCalculator,
) (*ApiAsyncClient, error)

CreateAsyncApiClient creates a new ApiAsyncClient

func (*ApiAsyncClient) DoAsync

func (apiClient *ApiAsyncClient) DoAsync(
	method string,
	path string,
	query url.Values,
	body interface{},
	header http.Header,
	handler common.ApiAsyncCallback,
	retry int,
)

DoAsync would carry out a asynchronous request

func (*ApiAsyncClient) GetAsync

func (apiClient *ApiAsyncClient) GetAsync(
	path string,
	query url.Values,
	header http.Header,
	handler common.ApiAsyncCallback,
)

Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests

func (*ApiAsyncClient) GetMaxRetry

func (apiClient *ApiAsyncClient) GetMaxRetry() int

GetMaxRetry returns the maximum retry attempts for a request

func (*ApiAsyncClient) GetNumOfWorkers added in v0.12.0

func (apiClient *ApiAsyncClient) GetNumOfWorkers() int

func (*ApiAsyncClient) HasError added in v0.12.0

func (apiClient *ApiAsyncClient) HasError() bool

func (*ApiAsyncClient) NextTick added in v0.12.0

func (apiClient *ApiAsyncClient) NextTick(task func() error)

func (*ApiAsyncClient) SetMaxRetry

func (apiClient *ApiAsyncClient) SetMaxRetry(
	maxRetry int,
)

SetMaxRetry sets the maximum retry attempts for a request

func (*ApiAsyncClient) WaitAsync

func (apiClient *ApiAsyncClient) WaitAsync() error

WaitAsync blocks until all async requests were done

type ApiClient

type ApiClient struct {
	// contains filtered or unexported fields
}

ApiClient is designed for simple api requests

func NewApiClient

func NewApiClient(
	endpoint string,
	headers map[string]string,
	timeout time.Duration,
	proxy string,
	ctx context.Context,
) (*ApiClient, error)

func (*ApiClient) Do

func (apiClient *ApiClient) Do(
	method string,
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) Get

func (apiClient *ApiClient) Get(
	path string,
	query url.Values,
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) GetEndpoint

func (apiClient *ApiClient) GetEndpoint() string

func (*ApiClient) GetHeaders

func (apiClient *ApiClient) GetHeaders() map[string]string

func (*ApiClient) Post

func (apiClient *ApiClient) Post(
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) SetAfterFunction

func (apiClient *ApiClient) SetAfterFunction(callback ApiClientAfterResponse)

func (*ApiClient) SetBeforeFunction

func (apiClient *ApiClient) SetBeforeFunction(callback ApiClientBeforeRequest)

func (*ApiClient) SetContext

func (apiClient *ApiClient) SetContext(ctx context.Context)

func (*ApiClient) SetEndpoint

func (apiClient *ApiClient) SetEndpoint(endpoint string)

func (*ApiClient) SetHeaders

func (apiClient *ApiClient) SetHeaders(headers map[string]string)

func (*ApiClient) SetLogger

func (apiClient *ApiClient) SetLogger(logger core.Logger)

func (*ApiClient) SetProxy

func (apiClient *ApiClient) SetProxy(proxyUrl string) error

func (*ApiClient) SetTimeout

func (ApiClient *ApiClient) SetTimeout(timeout time.Duration)

func (*ApiClient) Setup

func (apiClient *ApiClient) Setup(
	endpoint string,
	headers map[string]string,
	timeout time.Duration,

)

type ApiClientAfterResponse

type ApiClientAfterResponse func(res *http.Response) error

type ApiClientBeforeRequest

type ApiClientBeforeRequest func(req *http.Request) error

type ApiCollector

type ApiCollector struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

func NewApiCollector

func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error)

NewApiCollector allocates a new ApiCollector with the given args. ApiCollector can help us collecting data from api with ease, pass in a AsyncApiClient and tell it which part of response we want to save, ApiCollector will collect them from remote server and store them into database.

func (*ApiCollector) Execute

func (collector *ApiCollector) Execute() error

Start collection

type ApiCollectorArgs

type ApiCollectorArgs struct {
	RawDataSubTaskArgs
	// UrlTemplate is used to generate the final URL for Api Collector to request
	// i.e. `api/3/issue/{{ .Input.IssueId }}/changelog`
	// For detail of what variables can be used, please check `RequestData`
	UrlTemplate string `comment:"GoTemplate for API url"`
	// Query would be sent out as part of the request URL
	Query func(reqData *RequestData) (url.Values, error) ``
	// Header would be sent out along with request
	Header func(reqData *RequestData) (http.Header, error)
	// PageSize tells ApiCollector the page size
	PageSize int
	// Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true
	Incremental bool `comment:""`
	// ApiClient is a asynchronize api request client with qps
	ApiClient RateLimitedApiClient
	// Input helps us collect data based on previous collected data, like collecting changelogs based on jira
	// issue ids
	Input Iterator
	// GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page.
	// so `ApiCollector` could collect those pages in parallel for us
	GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, error)
	// Concurrency specify qps for api that doesn't return total number of pages/records
	// NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means
	Concurrency    int
	ResponseParser func(res *http.Response) ([]json.RawMessage, error)
}

type ApiExtractor

type ApiExtractor struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data It reads rows from specified raw data table, and feed it into `Extract` handler you can return arbitrary tool layer entities in this handler, ApiExtractor would first delete old data by their RawDataOrigin information, and then perform a batch save for you.

func NewApiExtractor

func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error)

NewApiExtractor creates a new ApiExtractor

func (*ApiExtractor) Execute

func (extractor *ApiExtractor) Execute() error

Execute sub-task

type ApiExtractorArgs

type ApiExtractorArgs struct {
	RawDataSubTaskArgs
	Params    interface{}
	Extract   func(row *RawData) ([]interface{}, error)
	BatchSize int
}

type ApiRateLimitCalculator

type ApiRateLimitCalculator struct {
	UserRateLimitPerHour   int
	GlobalRateLimitPerHour int
	MaxRetry               int
	Method                 string
	ApiPath                string
	DynamicRateLimit       func(res *http.Response) (int, time.Duration, error)
}

A helper to calculate api rate limit dynamically, assuming api returning remaining/resettime information

func (*ApiRateLimitCalculator) Calculate

func (c *ApiRateLimitCalculator) Calculate(apiClient *ApiClient) (int, time.Duration, error)

type AsyncResponseHandler

type AsyncResponseHandler func(res *http.Response) error

type BaseConnection added in v0.12.0

type BaseConnection struct {
	Name string `gorm:"type:varchar(100);uniqueIndex" json:"name" validate:"required"`
	common.Model
}

type BasicAuth added in v0.12.0

type BasicAuth struct {
	Username string `mapstructure:"username" validate:"required" json:"username"`
	Password string `mapstructure:"password" validate:"required" json:"password" encrypt:"yes"`
}

func (BasicAuth) GetEncodedToken added in v0.12.0

func (ba BasicAuth) GetEncodedToken() string

type BatchSave

type BatchSave struct {
	// contains filtered or unexported fields
}

BatchSave performs mulitple records persistence of a specific type in one sql query to improve the performance

func NewBatchSave

func NewBatchSave(basicRes core.BasicRes, slotType reflect.Type, size int) (*BatchSave, error)

NewBatchSave creates a new BatchSave instance

func (*BatchSave) Add

func (c *BatchSave) Add(slot interface{}) error

Add record to cache. BatchSave would flush them into Database when cache is max out

func (*BatchSave) Close

func (c *BatchSave) Close() error

Close would flash the cache and release resources

func (*BatchSave) Flush

func (c *BatchSave) Flush() error

Flush save cached records into database

type BatchSaveDivider

type BatchSaveDivider struct {
	// contains filtered or unexported fields
}

BatchSaveDivider creates and caches BatchSave, this is helpful when dealing with massive amount of data records with arbitrary types.

func NewBatchSaveDivider

func NewBatchSaveDivider(basicRes core.BasicRes, batchSize int, table string, params string) *BatchSaveDivider

NewBatchSaveDivider create a new BatchInsertDivider instance

func (*BatchSaveDivider) Close

func (d *BatchSaveDivider) Close() error

Close all batches so the rest records get saved into db

func (*BatchSaveDivider) ForType

func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error)

ForType returns a `BatchSave` instance for specific type

type CSTTime

type CSTTime time.Time

func (*CSTTime) Scan

func (jt *CSTTime) Scan(v interface{}) error

func (*CSTTime) UnmarshalJSON

func (jt *CSTTime) UnmarshalJSON(b []byte) error

func (CSTTime) Value

func (jt CSTTime) Value() (driver.Value, error)

type ConnectionApiHelper added in v0.12.0

type ConnectionApiHelper struct {
	// contains filtered or unexported fields
}

func NewConnectionHelper added in v0.12.0

func NewConnectionHelper(
	basicRes core.BasicRes,
	vld *validator.Validate,
) *ConnectionApiHelper

func (*ConnectionApiHelper) Create added in v0.12.0

func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) error

Create a connection record based on request body

func (*ConnectionApiHelper) Delete added in v0.12.0

func (c *ConnectionApiHelper) Delete(connection interface{}) error

Delete connection

func (*ConnectionApiHelper) First added in v0.12.0

func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) error

First finds connection from db by parsing request input and decrypt it

func (*ConnectionApiHelper) FirstById added in v0.12.0

func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) error

func (*ConnectionApiHelper) List added in v0.12.0

func (c *ConnectionApiHelper) List(connections interface{}) error

List returns all connections with password/token decrypted

func (*ConnectionApiHelper) Patch added in v0.12.0

func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) error

type CursorIterator deprecated

type CursorIterator struct {
	// contains filtered or unexported fields
}

Deprecated: use DalCursorIterator instead

func NewCursorIterator deprecated

func NewCursorIterator(db *gorm.DB, cursor *sql.Rows, elemType reflect.Type) (*CursorIterator, error)

Deprecated: use NewDalCursorIterator instead

func (*CursorIterator) Close

func (c *CursorIterator) Close() error

func (*CursorIterator) Fetch

func (c *CursorIterator) Fetch() (interface{}, error)

func (*CursorIterator) HasNext

func (c *CursorIterator) HasNext() bool

type DalCursorIterator added in v0.12.0

type DalCursorIterator struct {
	// contains filtered or unexported fields
}

DalCursorIterator

func NewDalCursorIterator added in v0.12.0

func NewDalCursorIterator(db dal.Dal, cursor *sql.Rows, elemType reflect.Type) (*DalCursorIterator, error)

func (*DalCursorIterator) Close added in v0.12.0

func (c *DalCursorIterator) Close() error

func (*DalCursorIterator) Fetch added in v0.12.0

func (c *DalCursorIterator) Fetch() (interface{}, error)

func (*DalCursorIterator) HasNext added in v0.12.0

func (c *DalCursorIterator) HasNext() bool

type DataConvertHandler

type DataConvertHandler func(row interface{}) ([]interface{}, error)

Accept row from source cursor, return list of entities that need to be stored

type DataConverter

type DataConverter struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

DataConverter helps you convert Data from Tool Layer Tables to Domain Layer Tables It reads rows from specified Iterator, and feed it into `Convter` handler you can return arbitrary domain layer entities from this handler, ApiConverter would first delete old data by their RawDataOrigin information, and then perform a batch save operation for you.

func NewDataConverter

func NewDataConverter(args DataConverterArgs) (*DataConverter, error)

func (*DataConverter) Execute

func (converter *DataConverter) Execute() error

type DataConverterArgs

type DataConverterArgs struct {
	RawDataSubTaskArgs
	// Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, `github:GithubIssue`
	InputRowType reflect.Type
	// Cursor to a set of Tool Layer Records
	Input     *sql.Rows
	Convert   DataConvertHandler
	BatchSize int
}

type DateIterator

type DateIterator struct {
	Days    int
	Current int
	// contains filtered or unexported fields
}

DateIterator

func NewDateIterator

func NewDateIterator(days int) (*DateIterator, error)

func (*DateIterator) Close

func (c *DateIterator) Close() error

func (*DateIterator) Fetch

func (c *DateIterator) Fetch() (interface{}, error)

func (*DateIterator) HasNext

func (c *DateIterator) HasNext() bool

type DatePair

type DatePair struct {
	PairStartTime time.Time
	PairEndTime   time.Time
}

type DateTimeFormatItem

type DateTimeFormatItem struct {
	Matcher *regexp.Regexp
	Format  string
}

TODO: move this to helper

type DefaultBasicRes added in v0.12.0

type DefaultBasicRes struct {
	// contains filtered or unexported fields
}

func NewDefaultBasicRes added in v0.12.0

func NewDefaultBasicRes(
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
) *DefaultBasicRes

func (*DefaultBasicRes) GetConfig added in v0.12.0

func (c *DefaultBasicRes) GetConfig(name string) string

func (*DefaultBasicRes) GetDal added in v0.12.0

func (c *DefaultBasicRes) GetDal() dal.Dal

func (*DefaultBasicRes) GetDb added in v0.12.0

func (c *DefaultBasicRes) GetDb() *gorm.DB

func (*DefaultBasicRes) GetLogger added in v0.12.0

func (c *DefaultBasicRes) GetLogger() core.Logger

type DefaultSubTaskContext

type DefaultSubTaskContext struct {
	LastProgressTime time.Time
	// contains filtered or unexported fields
}

SubTaskContext default implementation

func (DefaultSubTaskContext) GetContext

func (c DefaultSubTaskContext) GetContext() context.Context

func (DefaultSubTaskContext) GetData

func (c DefaultSubTaskContext) GetData() interface{}

func (DefaultSubTaskContext) GetName

func (c DefaultSubTaskContext) GetName() string

func (*DefaultSubTaskContext) IncProgress

func (c *DefaultSubTaskContext) IncProgress(quantity int)

func (*DefaultSubTaskContext) SetProgress

func (c *DefaultSubTaskContext) SetProgress(current int, total int)

func (*DefaultSubTaskContext) TaskContext

func (c *DefaultSubTaskContext) TaskContext() core.TaskContext

type DefaultTaskContext

type DefaultTaskContext struct {
	// contains filtered or unexported fields
}

TaskContext default implementation

func (DefaultTaskContext) GetContext

func (c DefaultTaskContext) GetContext() context.Context

func (DefaultTaskContext) GetData

func (c DefaultTaskContext) GetData() interface{}

func (DefaultTaskContext) GetName

func (c DefaultTaskContext) GetName() string

func (*DefaultTaskContext) IncProgress

func (c *DefaultTaskContext) IncProgress(quantity int)

func (*DefaultTaskContext) SetData

func (c *DefaultTaskContext) SetData(data interface{})

func (*DefaultTaskContext) SetProgress

func (c *DefaultTaskContext) SetProgress(current int, total int)

func (*DefaultTaskContext) SubTaskContext

func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, error)

type Iso8601Time

type Iso8601Time struct {
	// contains filtered or unexported fields
}

type Iso8601Time time.Time

func (Iso8601Time) MarshalJSON

func (jt Iso8601Time) MarshalJSON() ([]byte, error)

func (*Iso8601Time) String

func (jt *Iso8601Time) String() string

func (*Iso8601Time) ToNullableTime

func (jt *Iso8601Time) ToNullableTime() *time.Time

func (*Iso8601Time) ToTime

func (jt *Iso8601Time) ToTime() time.Time

func (*Iso8601Time) UnmarshalJSON

func (jt *Iso8601Time) UnmarshalJSON(b []byte) error

type Iterator

type Iterator interface {
	HasNext() bool
	Fetch() (interface{}, error)
	Close() error
}

type Pager

type Pager struct {
	Page int
	Skip int
	Size int
}

Pager contains pagination information for a api request

type RateLimitedApiClient

type RateLimitedApiClient interface {
	GetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback)
	WaitAsync() error
	HasError() bool
	NextTick(task func() error)
	GetNumOfWorkers() int
}

type RawData

type RawData struct {
	ID        uint64 `gorm:"primaryKey"`
	Params    string `gorm:"type:varchar(255);index"`
	Data      []byte
	Url       string
	Input     datatypes.JSON
	CreatedAt time.Time
}

Table structure for raw data storage

type RawDataSubTask

type RawDataSubTask struct {
	// contains filtered or unexported fields
}

Common features for raw data sub tasks

type RawDataSubTaskArgs

type RawDataSubTaskArgs struct {
	Ctx core.SubTaskContext

	//	Table store raw data
	Table string `comment:"Raw data table name"`

	//	This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
	//	set of data to be process, for example, we process JiraIssues by Board
	Params interface{} `comment:"To identify a set of records with same UrlTemplate, i.e. {ConnectionId, BoardId} for jira entities"`
}

type RequestData

type RequestData struct {
	Pager     *Pager
	Params    interface{}
	Input     interface{}
	InputJSON []byte
}

RequestData is the input of `UrlTemplate` `Query` and `Header`, so we can generate them dynamically

type RestConnection added in v0.12.0

type RestConnection struct {
	BaseConnection `mapstructure:",squash"`
	Endpoint       string `mapstructure:"endpoint" validate:"required" json:"endpoint"`
	Proxy          string `mapstructure:"proxy" json:"proxy"`
	RateLimit      int    `comment:"api request rate limit per hour" json:"rateLimit"`
}

type WorkerScheduler

type WorkerScheduler struct {
	// contains filtered or unexported fields
}

WorkerScheduler runs asynchronous tasks in parallel with throttling support

func NewWorkerScheduler

func NewWorkerScheduler(
	workerNum int,
	maxWork int,
	maxWorkDuration time.Duration,
	ctx context.Context,
	maxRetry int,
	logger core.Logger,
) (*WorkerScheduler, error)

NewWorkerScheduler creates a WorkerScheduler

func (*WorkerScheduler) HasError added in v0.12.0

func (s *WorkerScheduler) HasError() bool

HasError return if any error occurred

func (*WorkerScheduler) NextTick added in v0.12.0

func (s *WorkerScheduler) NextTick(task func() error)

NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by SubmitBlocking method IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory

func (*WorkerScheduler) Release

func (s *WorkerScheduler) Release()

Release resources

func (*WorkerScheduler) SubmitBlocking added in v0.12.0

func (s *WorkerScheduler) SubmitBlocking(task func() error)

SubmitBlocking enqueues a async task to ants, the task will be executed in future when timing is right. It doesn't return error because it wouldn't be any when with a Blocking semantic, returned error does nothing but causing confusion, more often, people thought it is returned by the task. Since it is async task, the callframes would not be available for production mode, you can export Environment Varaible ASYNC_CF=true to enable callframes capturing when debugging. IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call SubmitNonBlocking instead when number of tasks is relatively small.

func (*WorkerScheduler) Wait added in v0.12.0

func (s *WorkerScheduler) Wait() error

Wait blocks current go-routine until all workers returned

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL