Documentation
¶
Index ¶
- Constants
- func AddOutput(name string, factory OutputFactory)
- func CreateMessageAttributes(modelId mdl.ModelId, typ string, version int) map[string]string
- func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories) fixtures.FixtureSetsFactory
- func GetSubscriberConfigKey(name string) string
- func GetSubscriberFQN(name string, sourceModel SubscriberModel) string
- func GetSubscriberOutputConfigKey(name string) string
- func IsDelayOpError(err error) bool
- func NewGenericTransformer(transformer ModelTransformer) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)
- func NewSubscriberCallbackFactory(core SubscriberCore) stream.ConsumerCallbackFactory
- func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory
- func PublisherConfigPostProcessor(config cfg.GosoConf) (bool, error)
- func SubscriberConfigPostProcessor(config cfg.GosoConf) (bool, error)
- func SubscriberFactory(ctx context.Context, config cfg.Config, logger log.Logger, ...) (map[string]kernel.ModuleFactory, error)
- type DelayOpError
- type FetchData
- type FixtureSet
- type FixtureSettings
- type FixtureSettingsDataset
- type Model
- type ModelDb
- type ModelSpecification
- type ModelTransformer
- type ModelTransformers
- type Output
- type OutputDb
- type OutputDdb
- type OutputFactory
- type OutputKvstore
- type Outputs
- type Publisher
- func NewPublisher(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Publisher, error)
- func NewPublisherWithInterfaces(logger log.Logger, producer stream.Producer, settings *PublisherSettings) Publisher
- func NewPublisherWithSettings(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Publisher, error)
- type PublisherSettings
- type Settings
- type SubscriberCallback
- type SubscriberCore
- type SubscriberInputConfigPostProcessor
- type SubscriberModel
- type SubscriberOutputConfigPostProcessor
- type SubscriberSettings
- type TransformerFactory
- type TransformerMapTypeVersionFactories
- type TransformerMapVersionFactories
- type TypesProvider
- type VersionedModelTransformers
Constants ¶
View Source
const ( AttributeModelId = "modelId" AttributeType = "type" AttributeVersion = "version" ConfigKeyMdlSubPublishers = "mdlsub.publishers" TypeCreate = "create" TypeUpdate = "update" TypeDelete = "delete" )
View Source
const ( ConfigKeyMdlSub = "mdlsub" ConfigKeyMdlSubSubscribers = "mdlsub.subscribers" )
View Source
const ( MetricNameSuccess = "ModelEventConsumeSuccess" MetricNameFailure = "ModelEventConsumeFailure" )
View Source
const (
OutputTypeDb = "db"
)
View Source
const (
OutputTypeDdb = "ddb"
)
View Source
const (
OutputTypeKvstore = "kvstore"
)
Variables ¶
This section is empty.
Functions ¶
func AddOutput ¶ added in v0.26.0
func AddOutput(name string, factory OutputFactory)
func CreateMessageAttributes ¶
func FixtureSetFactory ¶ added in v0.26.0
func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories) fixtures.FixtureSetsFactory
func GetSubscriberConfigKey ¶
func GetSubscriberFQN ¶
func GetSubscriberFQN(name string, sourceModel SubscriberModel) string
func IsDelayOpError ¶
func NewGenericTransformer ¶
func NewGenericTransformer(transformer ModelTransformer) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)
func NewSubscriberCallbackFactory ¶
func NewSubscriberCallbackFactory(core SubscriberCore) stream.ConsumerCallbackFactory
func NewSubscriberFactory ¶
func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory
func SubscriberFactory ¶
Types ¶
type DelayOpError ¶
type DelayOpError struct {
Err error
}
func NewDelayOpError ¶
func NewDelayOpError(err error) DelayOpError
func (DelayOpError) As ¶
func (e DelayOpError) As(target interface{}) bool
func (DelayOpError) Error ¶
func (e DelayOpError) Error() string
func (DelayOpError) Unwrap ¶
func (e DelayOpError) Unwrap() error
type FetchData ¶ added in v0.26.0
type FetchData struct {
Data json.RawMessage `json:"data"`
}
type FixtureSet ¶ added in v0.26.0
type FixtureSet struct {
// contains filtered or unexported fields
}
func NewFixtureSet ¶ added in v0.26.0
func NewFixtureSet(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings) *FixtureSet
func NewFixtureSetWithInterfaces ¶ added in v0.26.0
func NewFixtureSetWithInterfaces(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings, httpClient *resty.Client) *FixtureSet
type FixtureSettings ¶ added in v0.26.0
type FixtureSettings struct {
Dataset FixtureSettingsDataset `cfg:"dataset"`
Host string `cfg:"host"`
Path string `cfg:"path"`
}
type FixtureSettingsDataset ¶ added in v0.37.0
type FixtureSettingsDataset struct {
Id int `cfg:"id"`
}
type ModelSpecification ¶
func (ModelSpecification) String ¶ added in v0.26.0
func (m ModelSpecification) String() string
type ModelTransformer ¶
type ModelTransformers ¶
type ModelTransformers map[string]VersionedModelTransformers
type OutputDb ¶
type OutputDb struct {
// contains filtered or unexported fields
}
func NewOutputDb ¶
type OutputDdb ¶
type OutputDdb struct {
// contains filtered or unexported fields
}
func NewOutputDdb ¶
type OutputFactory ¶
type OutputKvstore ¶
type OutputKvstore struct {
// contains filtered or unexported fields
}
func NewOutputKvstore ¶
func NewOutputKvstore(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) (*OutputKvstore, error)
type Publisher ¶
type Publisher interface {
PublishBatch(ctx context.Context, typ string, version int, values []interface{}, customAttributes ...map[string]string) error
Publish(ctx context.Context, typ string, version int, value interface{}, customAttributes ...map[string]string) error
}
func NewPublisher ¶
type PublisherSettings ¶
type Settings ¶
type Settings struct {
Subscribers map[string]*SubscriberSettings `cfg:"subscribers"`
}
type SubscriberCallback ¶
type SubscriberCallback struct {
// contains filtered or unexported fields
}
type SubscriberCore ¶ added in v0.26.0
type SubscriberCore interface {
GetModelIds() []string
GetLatestModelIdVersion(modelId mdl.ModelId) (int, error)
GetTransformer(spec *ModelSpecification) (ModelTransformer, error)
GetOutput(spec *ModelSpecification) (Output, error)
Persist(ctx context.Context, spec *ModelSpecification, model Model) error
Transform(ctx context.Context, spec *ModelSpecification, input any) (Model, error)
}
func NewSubscriberCore ¶ added in v0.26.0
func NewSubscriberCore(ctx context.Context, config cfg.Config, logger log.Logger, subscriberSettings map[string]*SubscriberSettings, transformerFactories TransformerMapTypeVersionFactories) (SubscriberCore, error)
func NewSubscriberCoreWithInterfaces ¶ added in v0.26.0
func NewSubscriberCoreWithInterfaces(transformers ModelTransformers, outputs Outputs) SubscriberCore
type SubscriberModel ¶
func UnmarshalSubscriberSourceModel ¶
func UnmarshalSubscriberSourceModel(config cfg.Config, name string) SubscriberModel
type SubscriberSettings ¶
type SubscriberSettings struct {
Input string `cfg:"input" default:"sns"`
Output string `cfg:"output"`
RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"`
SourceModel SubscriberModel `cfg:"source"`
TargetModel SubscriberModel `cfg:"target"`
}
type TransformerFactory ¶
type TransformerMapTypeVersionFactories ¶
type TransformerMapTypeVersionFactories map[string]TransformerMapVersionFactories
type TransformerMapVersionFactories ¶
type TransformerMapVersionFactories map[int]TransformerFactory
type TypesProvider ¶ added in v0.37.1
func (TypesProvider[I, M]) GetInput ¶ added in v0.37.1
func (t TypesProvider[I, M]) GetInput() any
func (TypesProvider[I, M]) GetModel ¶ added in v0.37.1
func (t TypesProvider[I, M]) GetModel() any
type VersionedModelTransformers ¶
type VersionedModelTransformers map[int]ModelTransformer
Source Files
¶
Click to show internal directories.
Click to hide internal directories.