Documentation ¶
Index ¶
- Variables
- type LogRecordDto
- type LoggedTask
- type Map
- type NotificationChannel
- type NotificationContext
- type NotificationService
- type Priority
- type ResultSaver
- type Status
- type TaskCloser
- type TaskDto
- type TaskExecutor
- type TaskExecutorContext
- type TaskLogger
- func (tl *TaskLogger) Collect() []string
- func (tl *TaskLogger) ERROR(format string, v ...interface{})
- func (tl *TaskLogger) INFO(format string, v ...interface{})
- func (tl *TaskLogger) LOG(format, system string, level logging.Level, v ...interface{})
- func (tl *TaskLogger) WARN(format string, v ...interface{})
- func (tl *TaskLogger) Write(p []byte) (n int, err error)
- type TaskService
- func (ts *TaskService) CancelTask(taskID string) error
- func (ts *TaskService) GetTask(id string) (*TaskDto, error)
- func (ts *TaskService) GetTaskLogs(taskID string, start, end time.Time) ([]LogRecordDto, error)
- func (ts *TaskService) GetTasks(sourceID, collectionID string, statusFilter *Status, start, end time.Time, ...) ([]TaskDto, error)
- func (ts *TaskService) IsConfigured() bool
- func (ts *TaskService) ScheduleSyncFunc(source, collection string, retryCount int)
- func (ts *TaskService) Sync(sourceID, collection string, priority Priority) (string, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrSourceCollectionIsSyncing = errors.New("Source collection is syncing now") ErrSourceCollectionIsStartingToSync = errors.New("Source collection is starting to sync") ErrMetaStorageRequired = errors.New("meta.storage configuration is required for sources synchronization tasks features") )
var ErrTaskHasBeenCanceled = errors.New("Synchronization has been canceled!")
Functions ¶
This section is empty.
Types ¶
type LogRecordDto ¶
type LogRecordDto struct { Time string `json:"time,omitempty"` Message string `json:"message,omitempty"` Level string `json:"level,omitempty"` System string `json:"system,omitempty"` }
LogRecordDto is used in Task API (handlers.TaskHandler)
type LoggedTask ¶
type NotificationChannel ¶
type NotificationChannel func(ctx context.Context, nctx *NotificationContext, configValue interface{}, task LoggedTask) error
var Slack NotificationChannel = func(ctx context.Context, nctx *NotificationContext, configValue interface{}, task LoggedTask) error { var config struct { URL string `mapstructure:"url"` } if err := mapstructure.Decode(configValue, &config); err != nil { return errors.Wrapf(err, "decode config: %+v", configValue) } if config.URL == "" { return nil } projectText := "" sourceID := task.Source projectID := "" if dot := strings.Index(sourceID, "."); dot >= 0 && dot < len(sourceID)-1 { projectID = sourceID[:dot] projectName := task.ProjectName if projectName == "" { projectName = projectID } projectText = fmt.Sprintf("*Project*: %s\n", projectName) sourceID = sourceID[dot+1:] } var source, logs string if nctx.UIBaseURL != "" { source = fmt.Sprintf("<%s/prj-%s/sources/edit/%s|%s>", nctx.UIBaseURL, projectID, sourceID, sourceID) logs = fmt.Sprintf("<%s/prj-%s/sources/logs/%s/%s|See logs>", nctx.UIBaseURL, projectID, sourceID, task.ID) } else { source = sourceID logs = "*Logs:*\n" + strings.Join(task.Collect(), "\n") } if task.Status == SUCCESS.String() { logs = "" } color := grey switch task.Status { case SUCCESS.String(): color = green case FAILED.String(): color = red } return requests.URL(config.URL). Method(http.MethodPost). BodyJSON(Map{ "text": fmt.Sprintf("*%s %s* [%s]: Synchronization %s", nctx.ServiceName, nctx.Version, nctx.ServerName, task.Status), "attachments": []Map{{ "color": color, "blocks": []Map{ { "type": "divider", }, { "type": "section", "text": Map{ "type": "mrkdwn", "text": fmt.Sprintf("%s*Connector type:* %s\n*Connector:* %s\n*Collection:* %s\n%s", projectText, task.SourceType, source, task.Collection, logs), }, }, }, }}, }). CheckStatus(http.StatusOK). Fetch(ctx) }
type NotificationContext ¶
type NotificationService ¶
type NotificationService struct { *NotificationContext // contains filtered or unexported fields }
func NewNotificationService ¶
func NewNotificationService(nctx *NotificationContext, config map[string]interface{}) *NotificationService
func (*NotificationService) Notify ¶
func (s *NotificationService) Notify(task LoggedTask)
type Priority ¶
type Priority int64
func PriorityFromString ¶
type ResultSaver ¶
type ResultSaver struct {
// contains filtered or unexported fields
}
ResultSaver is a Singer/Airbyte result consumer tap is a Singer tap or Airbyte source docker image
func NewResultSaver ¶
func NewResultSaver(task *meta.Task, tap, collectionMetaKey, tableNamePrefix string, taskLogger *TaskLogger, destinations []storages.Storage, metaStorage meta.Storage, streamTableNames map[string]string, configPath string) *ResultSaver
NewResultSaver returns configured ResultSaver instance
func (*ResultSaver) CleanupAfterError ¶
func (rs *ResultSaver) CleanupAfterError(representation *driversbase.CLIOutputRepresentation)
CleanupAfterError do cleanup if necessary. Like deleting temporary tables after errors
func (*ResultSaver) Consume ¶
func (rs *ResultSaver) Consume(representation *driversbase.CLIOutputRepresentation) error
Consume consumes result batch and writes it to destinations and saves the State
func (*ResultSaver) Tap ¶
func (rs *ResultSaver) Tap() string
type TaskCloser ¶
TaskCloser is responsible for graceful task closing (writing Redis status) checks if task is canceled
func (*TaskCloser) CloseWithError ¶
func (tc *TaskCloser) CloseWithError(msg string, systemErr bool)
CloseWithError writes closing with error logs if task isn't canceled - updates task status to FAILED in Redis
func (*TaskCloser) CloseWithSuccess ¶
func (tc *TaskCloser) CloseWithSuccess() error
func (*TaskCloser) HandleCanceling ¶
func (tc *TaskCloser) HandleCanceling() error
HandleCanceling checks if task is canceled and if so, returns ErrTaskHasBeenCanceled otherwise returns nil
type TaskDto ¶
type TaskDto struct { ID string `json:"id,omitempty"` SourceType string `json:"source_type,omitempty"` Source string `json:"source,omitempty"` Collection string `json:"collection,omitempty"` Priority int64 `json:"priority,omitempty"` CreatedAt string `json:"created_at,omitempty"` StartedAt string `json:"started_at,omitempty"` FinishedAt string `json:"finished_at,omitempty"` Status string `json:"status,omitempty"` }
TaskDto is used in Task API (handlers.TaskHandler)
type TaskExecutor ¶
type TaskExecutor struct { *TaskExecutorContext // contains filtered or unexported fields }
func NewTaskExecutor ¶
func NewTaskExecutor(poolSize int, ctx *TaskExecutorContext, sourcesLogWriter io.Writer) (*TaskExecutor, error)
NewTaskExecutor returns TaskExecutor and starts 2 goroutines (monitoring and queue observer)
func (*TaskExecutor) Close ¶
func (te *TaskExecutor) Close() error
type TaskExecutorContext ¶
type TaskExecutorContext struct { SourceService *sources.Service DestinationService *destinations.Service MetaStorage meta.Storage CoordinationService *coordination.Service NotificationService *NotificationService StalledThreshold time.Duration LastActivityThreshold time.Duration ObserverStalledEvery time.Duration }
type TaskLogger ¶
type TaskLogger struct {
// contains filtered or unexported fields
}
TaskLogger is a logger for writing logs to underlying Redis (meta.Storage)
func NewTaskLogger ¶
NewTaskLogger returns configured TaskLogger instance
func (*TaskLogger) Collect ¶
func (tl *TaskLogger) Collect() []string
func (*TaskLogger) ERROR ¶
func (tl *TaskLogger) ERROR(format string, v ...interface{})
ERROR writes record into meta.storage with log level ERROR
func (*TaskLogger) INFO ¶
func (tl *TaskLogger) INFO(format string, v ...interface{})
INFO writes record into meta.storage with log level INFO
func (*TaskLogger) LOG ¶
func (tl *TaskLogger) LOG(format, system string, level logging.Level, v ...interface{})
func (*TaskLogger) WARN ¶
func (tl *TaskLogger) WARN(format string, v ...interface{})
WARN writes record into meta.storage with log level WARN
type TaskService ¶
type TaskService struct {
// contains filtered or unexported fields
}
TaskService handle get all tasks/ task logs requests
func NewTaskService ¶
func NewTaskService(sourceService *sources.Service, destinationService *destinations.Service, metaStorage meta.Storage, coordinationService *coordination.Service, storeTasksLogsForLastRuns int) *TaskService
NewTaskService returns configured TaskService instance
func NewTestTaskService ¶
func NewTestTaskService() *TaskService
NewTestTaskService returns TaskService test instance (only for tests)
func (*TaskService) CancelTask ¶
func (ts *TaskService) CancelTask(taskID string) error
CancelTask saves CANCEL status into the task in Redis
func (*TaskService) GetTask ¶
func (ts *TaskService) GetTask(id string) (*TaskDto, error)
GetTask return task by id
func (*TaskService) GetTaskLogs ¶
func (ts *TaskService) GetTaskLogs(taskID string, start, end time.Time) ([]LogRecordDto, error)
GetTaskLogs return task logs with input filters
func (*TaskService) GetTasks ¶
func (ts *TaskService) GetTasks(sourceID, collectionID string, statusFilter *Status, start, end time.Time, limit int) ([]TaskDto, error)
GetTasks return all tasks with input filters
func (*TaskService) IsConfigured ¶
func (ts *TaskService) IsConfigured() bool
func (*TaskService) ScheduleSyncFunc ¶
func (ts *TaskService) ScheduleSyncFunc(source, collection string, retryCount int)
ScheduleSyncFunc is used in scheduling.CronScheduler for scheduling sync of source&collection with retry and for avoiding dependency cycle