synchronization

package
v0.0.0-...-8aeb8a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 40 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSourceCollectionIsSyncing        = errors.New("Source collection is syncing now")
	ErrSourceCollectionIsStartingToSync = errors.New("Source collection is starting to sync")
	ErrMetaStorageRequired              = errors.New("meta.storage configuration is required for sources synchronization tasks features")
)
View Source
var ErrTaskHasBeenCanceled = errors.New("Synchronization has been canceled!")

Functions

This section is empty.

Types

type LogRecordDto

type LogRecordDto struct {
	Time    string `json:"time,omitempty"`
	Message string `json:"message,omitempty"`
	Level   string `json:"level,omitempty"`
	System  string `json:"system,omitempty"`
}

LogRecordDto is used in Task API (handlers.TaskHandler)

type LoggedTask

type LoggedTask struct {
	*meta.Task
	*TaskLogger
	Notifications map[string]interface{}
	ProjectName   string
	Status        string
}

type Map

type Map map[string]interface{}

type NotificationChannel

type NotificationChannel func(ctx context.Context, nctx *NotificationContext, configValue interface{}, task LoggedTask) error
var Slack NotificationChannel = func(ctx context.Context, nctx *NotificationContext, configValue interface{}, task LoggedTask) error {
	var config struct {
		URL string `mapstructure:"url"`
	}

	if err := mapstructure.Decode(configValue, &config); err != nil {
		return errors.Wrapf(err, "decode config: %+v", configValue)
	}

	if config.URL == "" {

		return nil
	}

	projectText := ""
	sourceID := task.Source
	projectID := ""
	if dot := strings.Index(sourceID, "."); dot >= 0 && dot < len(sourceID)-1 {
		projectID = sourceID[:dot]
		projectName := task.ProjectName
		if projectName == "" {
			projectName = projectID
		}

		projectText = fmt.Sprintf("*Project*: %s\n", projectName)
		sourceID = sourceID[dot+1:]
	}

	var source, logs string
	if nctx.UIBaseURL != "" {
		source = fmt.Sprintf("<%s/prj-%s/sources/edit/%s|%s>", nctx.UIBaseURL, projectID, sourceID, sourceID)
		logs = fmt.Sprintf("<%s/prj-%s/sources/logs/%s/%s|See logs>", nctx.UIBaseURL, projectID, sourceID, task.ID)
	} else {
		source = sourceID
		logs = "*Logs:*\n" + strings.Join(task.Collect(), "\n")
	}

	if task.Status == SUCCESS.String() {
		logs = ""
	}

	color := grey
	switch task.Status {
	case SUCCESS.String():
		color = green
	case FAILED.String():
		color = red
	}

	return requests.URL(config.URL).
		Method(http.MethodPost).
		BodyJSON(Map{
			"text": fmt.Sprintf("*%s %s* [%s]: Synchronization %s", nctx.ServiceName, nctx.Version, nctx.ServerName, task.Status),
			"attachments": []Map{{
				"color": color,
				"blocks": []Map{
					{
						"type": "divider",
					},
					{
						"type": "section",
						"text": Map{
							"type": "mrkdwn",
							"text": fmt.Sprintf("%s*Connector type:* %s\n*Connector:* %s\n*Collection:* %s\n%s",
								projectText, task.SourceType, source, task.Collection, logs),
						},
					},
				},
			}},
		}).
		CheckStatus(http.StatusOK).
		Fetch(ctx)
}

type NotificationContext

type NotificationContext struct {
	ServiceName string
	Version     string
	ServerName  string
	UIBaseURL   string
}

type NotificationService

type NotificationService struct {
	*NotificationContext
	// contains filtered or unexported fields
}

func NewNotificationService

func NewNotificationService(nctx *NotificationContext, config map[string]interface{}) *NotificationService

func (*NotificationService) Notify

func (s *NotificationService) Notify(task LoggedTask)

type Priority

type Priority int64
const (
	UNKNOWN Priority = -1

	LOW  Priority = 100
	HIGH Priority = 200
	NOW  Priority = 300
)

func PriorityFromString

func PriorityFromString(value string) (Priority, error)

func (Priority) GetValue

func (p Priority) GetValue(t time.Time) int64

GetValue return Priority value based on time (created_at) task_priority * 10^12 - created_at_unix

func (Priority) String

func (p Priority) String() string

type ResultSaver

type ResultSaver struct {
	// contains filtered or unexported fields
}

ResultSaver is a Singer/Airbyte result consumer tap is a Singer tap or Airbyte source docker image

func NewResultSaver

func NewResultSaver(task *meta.Task, tap, collectionMetaKey, tableNamePrefix string, taskLogger *TaskLogger, destinations []storages.Storage, metaStorage meta.Storage, streamTableNames map[string]string, configPath string) *ResultSaver

NewResultSaver returns configured ResultSaver instance

func (*ResultSaver) CleanupAfterError

func (rs *ResultSaver) CleanupAfterError(representation *driversbase.CLIOutputRepresentation)

CleanupAfterError do cleanup if necessary. Like deleting temporary tables after errors

func (*ResultSaver) Consume

func (rs *ResultSaver) Consume(representation *driversbase.CLIOutputRepresentation) error

Consume consumes result batch and writes it to destinations and saves the State

func (*ResultSaver) Tap

func (rs *ResultSaver) Tap() string

type Status

type Status string
const (
	SCHEDULED Status = "SCHEDULED"
	RUNNING   Status = "RUNNING"
	FAILED    Status = "FAILED"
	SUCCESS   Status = "SUCCESS"
	CANCELED  Status = "CANCELED"
)

func StatusFromString

func StatusFromString(value string) (Status, error)

func (Status) String

func (s Status) String() string

type TaskCloser

type TaskCloser struct {
	*meta.Task
	// contains filtered or unexported fields
}

TaskCloser is responsible for graceful task closing (writing Redis status) checks if task is canceled

func (*TaskCloser) CloseWithError

func (tc *TaskCloser) CloseWithError(msg string, systemErr bool)

CloseWithError writes closing with error logs if task isn't canceled - updates task status to FAILED in Redis

func (*TaskCloser) CloseWithSuccess

func (tc *TaskCloser) CloseWithSuccess() error

func (*TaskCloser) HandleCanceling

func (tc *TaskCloser) HandleCanceling() error

HandleCanceling checks if task is canceled and if so, returns ErrTaskHasBeenCanceled otherwise returns nil

func (*TaskCloser) TaskID

func (tc *TaskCloser) TaskID() string

TaskID returns task ID

type TaskDto

type TaskDto struct {
	ID         string `json:"id,omitempty"`
	SourceType string `json:"source_type,omitempty"`
	Source     string `json:"source,omitempty"`
	Collection string `json:"collection,omitempty"`
	Priority   int64  `json:"priority,omitempty"`
	CreatedAt  string `json:"created_at,omitempty"`
	StartedAt  string `json:"started_at,omitempty"`
	FinishedAt string `json:"finished_at,omitempty"`
	Status     string `json:"status,omitempty"`
}

TaskDto is used in Task API (handlers.TaskHandler)

type TaskExecutor

type TaskExecutor struct {
	*TaskExecutorContext
	// contains filtered or unexported fields
}

func NewTaskExecutor

func NewTaskExecutor(poolSize int, ctx *TaskExecutorContext, sourcesLogWriter io.Writer) (*TaskExecutor, error)

NewTaskExecutor returns TaskExecutor and starts 2 goroutines (monitoring and queue observer)

func (*TaskExecutor) Close

func (te *TaskExecutor) Close() error

type TaskExecutorContext

type TaskExecutorContext struct {
	SourceService       *sources.Service
	DestinationService  *destinations.Service
	MetaStorage         meta.Storage
	CoordinationService *coordination.Service
	NotificationService *NotificationService

	StalledThreshold      time.Duration
	LastActivityThreshold time.Duration
	ObserverStalledEvery  time.Duration
}

type TaskLogger

type TaskLogger struct {
	// contains filtered or unexported fields
}

TaskLogger is a logger for writing logs to underlying Redis (meta.Storage)

func NewTaskLogger

func NewTaskLogger(taskID string, metaStorage meta.Storage, sourcesLogWriter io.Writer) *TaskLogger

NewTaskLogger returns configured TaskLogger instance

func (*TaskLogger) Collect

func (tl *TaskLogger) Collect() []string

func (*TaskLogger) ERROR

func (tl *TaskLogger) ERROR(format string, v ...interface{})

ERROR writes record into meta.storage with log level ERROR

func (*TaskLogger) INFO

func (tl *TaskLogger) INFO(format string, v ...interface{})

INFO writes record into meta.storage with log level INFO

func (*TaskLogger) LOG

func (tl *TaskLogger) LOG(format, system string, level logging.Level, v ...interface{})

func (*TaskLogger) WARN

func (tl *TaskLogger) WARN(format string, v ...interface{})

WARN writes record into meta.storage with log level WARN

func (*TaskLogger) Write

func (tl *TaskLogger) Write(p []byte) (n int, err error)

Write writes Singer bytes as a record into meta.Storage

type TaskService

type TaskService struct {
	// contains filtered or unexported fields
}

TaskService handle get all tasks/ task logs requests

func NewTaskService

func NewTaskService(sourceService *sources.Service, destinationService *destinations.Service,
	metaStorage meta.Storage, coordinationService *coordination.Service, storeTasksLogsForLastRuns int) *TaskService

NewTaskService returns configured TaskService instance

func NewTestTaskService

func NewTestTaskService() *TaskService

NewTestTaskService returns TaskService test instance (only for tests)

func (*TaskService) CancelTask

func (ts *TaskService) CancelTask(taskID string) error

CancelTask saves CANCEL status into the task in Redis

func (*TaskService) GetTask

func (ts *TaskService) GetTask(id string) (*TaskDto, error)

GetTask return task by id

func (*TaskService) GetTaskLogs

func (ts *TaskService) GetTaskLogs(taskID string, start, end time.Time) ([]LogRecordDto, error)

GetTaskLogs return task logs with input filters

func (*TaskService) GetTasks

func (ts *TaskService) GetTasks(sourceID, collectionID string, statusFilter *Status, start, end time.Time, limit int) ([]TaskDto, error)

GetTasks return all tasks with input filters

func (*TaskService) IsConfigured

func (ts *TaskService) IsConfigured() bool

func (*TaskService) ScheduleSyncFunc

func (ts *TaskService) ScheduleSyncFunc(source, collection string, retryCount int)

ScheduleSyncFunc is used in scheduling.CronScheduler for scheduling sync of source&collection with retry and for avoiding dependency cycle

func (*TaskService) Sync

func (ts *TaskService) Sync(sourceID, collection string, priority Priority) (string, error)

Sync creates task and return its ID returns error if task has been already scheduled or has been already in progress (lock in coordination service)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL