transform

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2022 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TransformOnce       = "once"
	TransformRepeating  = "repeating"
	StepGroupSequential = "sequential"
	StepGroupRepeating  = "repeating"
	StepGroupBackground = "background"
)

Variables

This section is empty.

Functions

func CleanupHandlerDefault

func CleanupHandlerDefault(log logger.Logger, t TransformManager, s StatsManager, cancelFunc context.CancelFunc)

CleanupHandlerDefault handles CTRL-C and SIGTERM. It sends shutdown requests to any steps that have registered themselves as capable of handling shutdown.

func GetPanicHandlerWithChannelsFunc

func GetPanicHandlerWithChannelsFunc(tc *TransformCloser) components.PanicHandlerFunc

GetPanicHandlerWithChannelsFunc will create a func that can be deferred to handle recovery and send the final TransformStatus{} error info to channel chanStatus.

func LaunchTransform

func LaunchTransform(log logger.Logger,
	transformDefn *TransformDefinition,
	transformGuid string,
	stepGroupLaunchFn stepGroupLaunchFunc,
	stats StatsManager,
	cleanupHandlerFn CleanupHandlerFunc,
	panicHandlerFn components.PanicHandlerFunc,
)

LaunchTransform will start all transform groups and their steps found in TransformDefinition t.

func LaunchTransformDefinition

func LaunchTransformDefinition(log logger.Logger, ti *SafeMapTransformInfo, t *TransformDefinition, blockUntilComplete bool, statsDumpFrequencySeconds int) (guid string, err error)

LaunchTransformJson validates the supplied TransformDefinition and launches the transform. It stores the GUID of the new transform in ti and returns it. An error is returned if there is a problem validating the JSON. If blockUntilComplete is false then the transform is launched in a goroutine.

func LaunchTransformJson

func LaunchTransformJson(log logger.Logger, ti *SafeMapTransformInfo, transformJson string, blockUntilComplete bool, statsDumpFrequencySeconds int,
) (guid string, err error)

func LaunchTransformWithControlChannels

func LaunchTransformWithControlChannels(log logger.Logger,
	transformDefn *TransformDefinition,
	transformGuid string,
	s StatsManager,
	tc *TransformCloser,
	cleanupHandlerFn CleanupHandlerFunc,
	panicHandlerFn components.PanicHandlerFunc,
	launcherFn LaunchTransformFunc,
)

LaunchTransformWithControlChannels launches a transform that can be stopped by sending to chanStop. After the transform is complete it responds on chanResponse with a success/failure status message.

func NewMetadataInjection

func NewMetadataInjection(i interface{}) (outputChan chan stream.Record, controlChan chan components.ControlAction)

NewMetadataInjection will launch transform group T after replacing variables with values found in cfg.ReplacementVariableWithFieldNameCSV. It does this by marshalling T to JSON, doing the string replacement and then un-marshalling again. For example, where:

ReplacementVariableWithFieldNameCSV = fromDateFieldName:${fromDate}

We do this:

  1. pull a record off InputChan (call it 'rec').
  2. convert TransformDefinition to JSON.
  3. search for '${fromDate}' and replace it with the value found in rec["fromDateFieldName"].
  4. execute the transform group and wait for completion.
  5. emit the replaced JSON string on output channel, outputChan.

OutputChan rows are the replaced JSON only. Where date-time values are used for replacements, we insert the time.RFC3339 equivalent string with time zone preserved.

func NewStepGroupManager

func NewStepGroupManager(log logger.Logger, g TransformManager, transformGroupName string) *stepGroup

NewStepGroupManager constructs a new stepGroup{}, which satisfies interface StepGroupManager{}. TODO: return the concrete type instead!

func StartStepGroup

func StartStepGroup(
	log logger.Logger,
	sg *StepGroup,
	sgm StepGroupManager,
	stats StatsManager,
	funcs MapComponentFuncs,
	panicHandlerFn components.PanicHandlerFunc)

StartStepGroup will launch all steps defined in StepGroup sg. Dynamically launch worker functions for each step in the StepGroup. The worker function is found by using the step type to lookup registered function metadata. TODO: figure out if DB connections are thread safe or whether each component needs to open its own connection. TODO: ...this will change how connections are passed into StartStepGroup.

Types

type CleanupHandlerFunc

type CleanupHandlerFunc = func(log logger.Logger, g TransformManager, s StatsManager, cancelFunc context.CancelFunc)

func GetCleanupHandlerWithChannelsFunc

func GetCleanupHandlerWithChannelsFunc(log logger.Logger, transformGuid string, tc *TransformCloser) CleanupHandlerFunc

GetCleanupHandlerWithChannelsFunc returns a function that waits for a CTRL-C etc and/or a stop signal on chanShutdown.

type ComponentRegistration

type ComponentRegistration struct {
	// contains filtered or unexported fields
}

type ComponentRegistrationType1

type ComponentRegistrationType1 struct {
	// contains filtered or unexported fields
}

type ComponentRegistrationType2

type ComponentRegistrationType2 struct {
	// contains filtered or unexported fields
}

type ComponentRegistrationType3

type ComponentRegistrationType3 struct {
	// contains filtered or unexported fields
}

type LaunchTransformFunc

type LaunchTransformFunc = func(log logger.Logger,
	transformDefn *TransformDefinition,
	transformGuid string,
	stepGroupLaunchFn stepGroupLaunchFunc,
	stats StatsManager,
	cleanupHandlerFn CleanupHandlerFunc,
	panicHandlerFn components.PanicHandlerFunc,
)

type MapComponentFuncs

type MapComponentFuncs map[string]ComponentRegistration

type MetadataInjectionConfig

type MetadataInjectionConfig struct {
	Log                                 logger.Logger
	Name                                string
	InputChan                           chan stream.Record // input channel containing values to use with variable replacement.
	GlobalTransformManager              TransformManager   // manager able to spawn a new child StepGroup manager of type StepGroupManager.
	TransformGroupName                  string             // transform group to launch.
	ReplacementVariableWithFieldNameCSV string             // CSV string of tokens: "<field name on InputChan>:<variable to replace in T when converted to JSON>".
	ReplacementDateTimeFormat           string             // Time.format format for string conversion used when Time data types are found on the inputChan.
	OutputChanFieldName4JSON            string
	StepWatcher                         *stats.StepWatcher
	WaitCounter                         components.ComponentWaiter
	PanicHandlerFn                      components.PanicHandlerFunc
}

type MockStepGroupManager

type MockStepGroupManager struct {
	// contains filtered or unexported fields
}

type MockTransformManager

type MockTransformManager struct {
	// contains filtered or unexported fields
}

type RepeatMetadata

type RepeatMetadata struct {
	SleepSeconds int `json:"sleepSeconds"`
}

type SafeMapTransformInfo

type SafeMapTransformInfo struct {
	sync.RWMutex
	Internal map[string]TransformInfo
}

TransformDefinition Manager to wrap a map[string]StepGroupManager with locking, via Load() and Store() methods.

func NewSafeMapTransformInfo

func NewSafeMapTransformInfo() *SafeMapTransformInfo

func (*SafeMapTransformInfo) ConsumeTransformStatusChanges

func (t *SafeMapTransformInfo) ConsumeTransformStatusChanges(transformGuid string, chanStatus chan TransformStatus)

ConsumeTransformStatusChanges loops until chanStatus is closed and updates t.Internal[transformGuid] with any statuses received.

func (*SafeMapTransformInfo) Delete

func (t *SafeMapTransformInfo) Delete(key string)

func (*SafeMapTransformInfo) Load

func (t *SafeMapTransformInfo) Load(key string) (ti TransformInfo, ok bool)

func (*SafeMapTransformInfo) Store

func (t *SafeMapTransformInfo) Store(key string, value TransformInfo)

type StatsManager

type StatsManager interface {
	StartDumping()
	StopDumping()
	AddStepWatcher(stepName string) *stats.StepWatcher // TODO: remove dependency on struct in stats package.
}

StatsManager abstracts stats capture for transform group steps. TODO: make interfaces for the StepWatcher type in future when this breaks as this sucks!

type Status

type Status uint32
const (
	StatusMissing         = 0
	StatusStarting Status = iota + 1
	StatusRunning
	StatusComplete
	StatusCompleteWithError
	StatusShutdown
)

func (Status) MarshalJSON

func (s Status) MarshalJSON() ([]byte, error)

type Step

type Step struct {
	Type           string                     `json:"type" errorTxt:"step type" mandatory:"yes"`
	Data           map[string]string          `json:"data" errorTxt:"step data" mandatory:"yes"`
	ComponentSteps []components.ComponentStep `json:"steps" errorTxt:"extra steps" mandatory:"no"`
}

type StepGroup

type StepGroup struct {
	Type       string          `json:"type" errorTxt:"step group type (sequential|repeating|background)" mandatory:"yes"` // const StepGroupSequential, StepGroupRepeating, StepGroupBackground
	RepeatMeta RepeatMetadata  `json:"repeatMetadata"`                                                                    // sleep interval between repeats
	Steps      map[string]Step `json:"steps" errorTxt:"step group steps" mandatory:"yes"`
	Sequence   []string        `json:"sequence" errorTxt:"step group sequence" mandatory:"yes"`
}

type StepGroupManager

type StepGroupManager interface {
	// contains filtered or unexported methods
}

StepGroupManager used to track individual transform step groups.

type StepStatus

type StepStatus uint32

TODO: combine StepStatus and Status (at the transform level) if possible.

const (
	StepStatusStarting StepStatus = iota + 1
	StepStatusRunning
	StepStatusDone
)

type Transform

type Transform struct {
	// contains filtered or unexported fields
}

Transform struct to manage consumers of the channels created by transform nodes.

func NewTransformManager

func NewTransformManager(log logger.Logger, t *TransformDefinition, transformGuid string) (gt *Transform)

NewTransformManager sets up a new top-level transform manager - consider this for global use.

type TransformCloser

type TransformCloser struct {
	// contains filtered or unexported fields
}

TransformCloser tracks the channels used to maintain transform status and whether it is shutdown or not.

func NewTransformCloser

func NewTransformCloser(chanStatus chan TransformStatus, chanShutdown chan error) *TransformCloser

func (*TransformCloser) ChannelsAreOpen

func (c *TransformCloser) ChannelsAreOpen() bool

ChannelsAreOpen inspects flagClosedChanStatusAndShutdown (0 = open; 1 = closed) and returns true if 0.

func (*TransformCloser) CloseChannels

func (c *TransformCloser) CloseChannels(statusToSend *TransformStatus)

CloseChannels closes chanStatus and chanShutdown inside a mutex. flagClosedChanStatusAndShutdown is set to 1 when the channels are closed.

type TransformDefinition

type TransformDefinition struct {
	SchemaVersion int                  `json:"schemaVersion" errorTxt:"schema version" mandatory:"no"`
	Description   string               `json:"description" errorTxt:"description" mandatory:"no"`
	Connections   shared.DBConnections `json:"connections" errorTxt:"database connection" mandatory:"yes"`
	Type          string               `json:"type" errorTxt:"transform type (once|repeating)" mandatory:"yes"` // const TransformOnce, TransformRepeating
	RepeatMeta    RepeatMetadata       `json:"repeatMetadata"`                                                  // sleep interval between repeats
	StepGroups    map[string]StepGroup `json:"transformGroups" errorTxt:"step groups" mandatory:"yes"`
	Sequence      []string             `json:"sequence" errorTxt:"sequence" mandatory:"yes"`
}

TransformDefinition contains groupings of transform steps.

type TransformInfo

type TransformInfo struct {
	Transform TransformDefinition // TODO: implement transform "name" in TransformInfo{} and TransformDefinition{}
	ChanStop  chan error
	Status    TransformStatus `json:"transformStatus"`
	Stats     stats.StatsFetcher
}

type TransformManager

type TransformManager interface {
	// contains filtered or unexported methods
}

TransformManager that can spawn child managers of type StepGroupManager used to track individual transform step groups.

type TransformStatus

type TransformStatus struct {
	StartTime time.Time `json:"startTime"`
	EndTime   time.Time `json:"endTime"`
	Status    Status    `json:"pipeStatus"`
	Error     string    `json:"error"`
}

func (*TransformStatus) TransformIsFinished

func (t *TransformStatus) TransformIsFinished() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL