scramjet

package module
v0.0.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2023 License: MIT Imports: 15 Imported by: 0

README

Vivo Scramjet

module github.com/OIT-ADS-Web/scramjet

A persistent cache of arbitrary json objects associated with an id, that can be validated against a service and, when valid, examined for changes.

This makes it possible to gather some entities for ingest into a store of some sort - and be able to send only adds, updates or deletes to that store.

** NOTE ** this is in very early development and likely to change substantially. So for the time being, it is here for instructional purposes only, I would not recommend using it with any projects.

General Idea (how it works)

There are two tables, staging and resources

  • staging:

    columns:

    • id (text - example "per000001")
    • type (text - example "person")

    Those combined are the primary key

    • json (json representation of object)
    • is_valid (if it's been validated)
    • to_delete (if it's passing through as record to delete)

    actions to do:

    • stash -> put stuff in
    • validate -> mark is_valid = (true|false)
    • delete -> stash and to_delete = true
  • resources:

    columns (from staging):

    • id (text - example "per000001")
    • type (text - example "person")

    Those combined are the primary key

    • hash (hash of json data)

    This enables a quick determination of whether record has changed after importing (whether to change updated_at)

    • data (json representation, from staging)
    • data_b (json representation, from staging - but binary for indexing)

    See postgresql documentation about json_b data type

    • created_at (when record created)

    • updated_at (when record last updated)

      NOTE: CONSTRAINT uniq_id_hash UNIQUE (id, type, hash)

    actions:

    • traject -> move over is_valid from staging (could be updates)
    • list -> all, or actual updates etc...
    • delete -> remove to_delete from staging

Once a record has made it to resources, it is removed from staging

Simplest example

This would be typical usage - bulk importing a type of record. For example - a nightly csv feed, or nightly table dump - that is a full set of records each time


import (
	sj "github.com/OIT-ADS-Web/scramjet"
)

...

	typeName := "person"

	// 1. typically would start with database list
	dbList := func() []IntakePerson {
		person1 := IntakePerson{Id: "per0000001", Name: "Test1"}
		person2 := IntakePerson{Id: "per0000002", Name: "Test2"}
		return []IntakePerson{person1, person2}
	}

  // 2. then turn into a list of 'Storeable' objects
  listMaker := func(i int) ([]sj.Storeable, error) {
		var people []sj.Storeable
		for _, person := range dbList() {
			pass := sj.MakePacket(person.Id, typeName, person)
			people = append(people, pass)
		}
		return people, nil
	}

  // 3. create a validator function that validates json representation
  alwaysOkay := func(json string) bool { return true }

  // 4. then construct configs for intake, trajectory (moving from staging to resources)
  //    and finding deletes (records in resources no longer valid)
	intake := sj.IntakeConfig{TypeName: typeName, Count: 2, ChunkSize: 1, ListMaker: listMaker}
	move := sj.TrajectConfig{TypeName: typeName, Validator: alwaysOkay}

	// 5. this would typically be database call for all ids of 'type'
	//     comparing against resources ids of that 'type'
	ids := func() ([]string, error) {
		var ids []string
		for _, person := range dbList() {
			ids = append(ids, person.Id)
		}
		return ids, nil
	}
	outake := sj.OutakeConfig{TypeName: typeName, ListMaker: ids}

	// 6. main function does all 3 actions on data in one sequence
	err := sj.Scramjet(intake, move, outake)

  ...

Other common use cases

A service to gives updates only

  ... // per example above
	intake := sj.IntakeConfig{TypeName: typeName, Count: 2, ChunkSize: 1, ListMaker: listMaker}
	move := sj.TrajectConfig{TypeName: typeName, Validator: alwaysOkay}

  // except no 'outtake' (defered until later - since those are not part of this import)
	err := sj.ScramjetIntake(intake, move)
  
  ...
  // then later delete
  outake := sj.OutakeConfig{TypeName: typeName, ListMaker: ids}

	err = sj.ScramjetOutake(outake)


One record at a time (for example save/delete triggers)


  ...
  // much same as above but must add a filter to the 'traject' config
  // NOTE: can simplify intake if chunking is not necessary
  intake := sj.IntakeConfig{TypeName: typeName, ListMaker: listMaker}

  // for instance if database save/update person, id = per0000001
  filter := sj.Filter{Field: "id", Value: "per0000001", Compare: sj.Eq}
  move := sj.TrajectConfig{TypeName: typeName, Validator: alwaysOkay, Filter: &filter}

  // this will only process based on the filter
	err := sj.ScramjetIntake(intake, move)


  // to delete, for instance database delete person, id = per0000001
  stub := MakeStub("per0000001", "person")
  err = sj.RemoveRecords(stub)

  ...

  // can also delete a group of stubs... e.g.
  var deletes []sj.Stub
  deletes = append(deletes, MakeStub("per0000001", "person"))
  deletes = append(deletes, MakeStub("per0000002", "person"))

  err = sj.RemoveRecords(deletes...)

A group of records per person

  ...

  // same as above (single), but filter is different
  filter := sj.Filter{Field: "id", Value: "per0000001", Compare: sj.Eq}

	move := sj.TrajectConfig{TypeName: typeName, Validator: alwaysOkay, Filter: &filter}

Controlling each stage of import

It's also possible to do any of those stages individually, if that is more useful

  • Staging Table

import (
	sj "github.com/OIT-ADS-Web/scramjet"
)
...

	typeName := "person"
  // 1) add data
	person1 := TestPerson{Id: "per0000001", Name: "Test1"}
	person2 := TestPerson{Id: "per0000002", Name: "Test2"}
	// must use anything of interface 'Storeable'
	pass1 := sj.Packet{Id: sj.Identifier{Id: person1.Id, Type: typeName}, Obj: person1}
  // there is a also a MakePacket wrapper
	pass2 := sj.MakePacket(person2.Id, typeName, person2)

	people := []sj.Storeable{pass1, pass2}
	err := sj.BulkAddStaging(people...)

  // 2) run through a 'validator' function - would likely
  //    be a json schema validator
	alwaysOkay := func(json string) bool { return true }
	valid, rejects, err := sj.FilterTypeStaging("person", alwaysOkay)

  // 3) can mark them yourself if you want
  err = sj.BatchMarkValidInStaging(valid)
  err = sj.BatchMarkInValidInStaging(rejects)

  // 3) Now the valid ones are marked and ready to go into
  //    'resource' table
    ...


    
  • Resources Table

import (
	sj "github.com/OIT-ADS-Web/scramjet"
)

...

	typeName := "person"
	list, err := sj.RetrieveValidStaging(typeName)
	err = sj.BulkMoveStagingTypeToResources(typeName, list...)

    ...

Example moving entire 'type' as bulk, in incremental steps


  import (
	  sj "github.com/OIT-ADS-Web/scramjet"
  )

	typeName := "person"
  // see above - gather data however it can be gathered
  err := sj.StashStaging(people...)
  // own validator function ...
	alwaysOkay := func(json string) bool { return true }
	valid, rejects, err := sj.FilterTypeStaging("person", alwaysOkay)

  err = sj.BatchMarkValidInStaging(valid)
  err = sj.BatchMarkInValidInStaging(rejects)
  
  list, err := sj.RetrieveValidStaging(typeName)
	err = sj.BulkMoveStagingTypeToResources(typeName, list...)


Example moving by id (single items)


  import (
	  sj "github.com/OIT-ADS-Web/scramjet"
  )

	typeName := "person"
  // see above - grab single record however necessary
  // and stash in staging table, could just be one
	err := sj.StashStaging(person)
  // just need basic 'id' to grab to validate
  //identifier := sj.Identifier{Id: id, Type: typeName}
	//stub := sj.Stub{Id: identifier}
  stub := sj.MakeStub(id, typeName)
  // validate however you want
	alwaysOkay := func(json string) bool { return true }
	err = sj.ProcessSingleStaging(stub, alwaysOkay)

  // move it over
  staging, err := sj.RetrieveSingleStagingValid(id, typeName)
	err = sj.BulkMoveStagingTypeToResources(typeName, staging)

Example moving by query (for instance per person)


  import (
	  sj "github.com/OIT-ADS-Web/scramjet"
  )

	typeName := "person"
  // see above - get records of person and stash in staging table
	err := sj.StashStaging(people...)
  // make a validator
	alwaysOkay := func(json string) bool { return true }
	// make a filter - fairly crude on field matcher at this point
  filter := sj.Filter{Field: "externalId", Value: "x200", Compare: sj.Eq}
	// 2. but only get one out
	valid, rejects, err = sj.FilterTypeStagingByQuery(typeName, filter, alwaysOkay)
	err = sj.BatchMarkValidInStaging(valid)
	// move over to resources, based on same filter
  list2, err := sj.RetrieveValidStagingFiltered(typeName, filter)
	err = sj.BulkMoveStagingTypeToResources(typeName, list2...)


Basic structure

image of basic structure

Tables

image of tables

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DBPool *pgxpool.Pool
View Source
var Name string

Functions

func AddStagingResource

func AddStagingResource(obj interface{}, id string, typeName string) error

only add (presumed existence already checked)

func BatchDeleteResourcesFromResources

func BatchDeleteResourcesFromResources(resources ...Identifiable) error

func BatchDeleteStagingFromResources

func BatchDeleteStagingFromResources(resources ...Identifiable) error

func BatchMarkInvalidInStaging

func BatchMarkInvalidInStaging(resources []Identifiable) error

func BatchMarkValidInStaging

func BatchMarkValidInStaging(resources []Identifiable) error

func BulkAddStaging

func BulkAddStaging(items ...Storeable) error

func BulkAddStagingForDelete

func BulkAddStagingForDelete(items ...Identifiable) error

func BulkAddStagingResources

func BulkAddStagingResources(resources ...StagingResource) error

func BulkMoveStagingToResourcesByFilter added in v0.0.8

func BulkMoveStagingToResourcesByFilter(typeName string, filter Filter, items ...StagingResource) error

NOTE: still need typname to clear from staging

func BulkMoveStagingTypeToResources

func BulkMoveStagingTypeToResources(typeName string, items ...StagingResource) error

NOTE: only need 'typeName' param for clearing out from staging

func BulkRemoveResources

func BulkRemoveResources(items ...Identifiable) error

func BulkRemoveStagingDeletedFromResources

func BulkRemoveStagingDeletedFromResources(typeName string) error

func ClearAllResources

func ClearAllResources() error

func ClearAllStaging

func ClearAllStaging() error

func ClearDeletedFromStaging

func ClearDeletedFromStaging(id string, typeName string) error

func ClearMultipleDeletedFromStaging added in v0.0.10

func ClearMultipleDeletedFromStaging(items ...Identifiable) error

func ClearResourceType

func ClearResourceType(typeName string) error

func ClearStagingType

func ClearStagingType(typeName string) error

call where valid = true? (after transfering to resources)

func ClearStagingTypeDeletes

func ClearStagingTypeDeletes(typeName string) error

func ClearStagingTypeValid

func ClearStagingTypeValid(typeName string) error

leave the is_valid = false for investigation

func ClearStagingTypeValidByFilter added in v0.0.8

func ClearStagingTypeValidByFilter(typeName string, filter Filter) error

func Configure added in v0.0.10

func Configure(conf Config)

func DeleteFromStaging

func DeleteFromStaging(res StagingResource) error

func DropResources

func DropResources() error

func DropStaging

func DropStaging() error

func Eject added in v0.0.10

func Eject(config OutakeConfig) error

func FilterTypeStaging

func FilterTypeStaging(typeName string, validator ValidatorFunc) ([]Identifiable, []Identifiable, error)

func FilterTypeStagingByQuery added in v0.0.6

func FilterTypeStagingByQuery(typeName string,
	filter Filter, validator ValidatorFunc) ([]Identifiable, []Identifiable, error)

NOTE: this needs a 'typeName' param because it assumes validator is different per type

func FlagDeletes added in v0.0.9

func FlagDeletes(sourceDataIds []string, existingData []Resource, config DiffProcessConfig) error

func GetDbName

func GetDbName() string

func GetMaxUpdatedAt

func GetMaxUpdatedAt(typeName string) time.Time

func GetPool

func GetPool() *pgxpool.Pool

func Inject added in v0.0.10

func Inject(config IntakeConfig) error

func IntakeInChunks

func IntakeInChunks(ins IntakeConfig) error

func MakeConnectionPool

func MakeConnectionPool(conf Config) error

NOTE: Prepared statements can be manually created with the Prepare method. However, this is rarely necessary because pgx includes an automatic statement cache by default

func MakeResourceSchema

func MakeResourceSchema()

NOTE: this calls Fatalf with errors

func MakeStagingSchema

func MakeStagingSchema()

func MarkInvalidInStaging

func MarkInvalidInStaging(res Storeable) error

TODO: should probably batch these when validating and mark valid, invalid in groups of 500 or something

func MarkValidInStaging

func MarkValidInStaging(res StagingResource) error

func ProcessDiff added in v0.0.9

func ProcessDiff(config DiffProcessConfig) error

func ProcessOutake

func ProcessOutake(config OutakeConfig) error

func ProcessSingleStaging

func ProcessSingleStaging(item Identifiable, validator ValidatorFunc) error

func ProcessTypeStaging

func ProcessTypeStaging(typeName string, validator ValidatorFunc) error

func ProcessTypeStagingFiltered added in v0.0.6

func ProcessTypeStagingFiltered(typeName string, filter Filter, validator ValidatorFunc) error

TODO: no test for this so far

func RemoveRecords added in v0.0.10

func RemoveRecords(stubs ...Stub) error

func RemoveStagingDeletedFromResources

func RemoveStagingDeletedFromResources(id string, typeName string) error

func ResourceCount

func ResourceCount(typeName string) int

func ResourceTableExists

func ResourceTableExists() bool

TODO: the 'table_catalog' changes

func SaveResource

func SaveResource(obj Storeable) error

only does one at a time (not typically used)

func SaveStagingResource

func SaveStagingResource(obj Storeable) error

is there a need for this function?

func SaveStagingResourceDirect

func SaveStagingResourceDirect(res StagingResource, typeName string) error

func Scramjet added in v0.0.10

func Scramjet(in IntakeConfig, process TrajectConfig, out OutakeConfig) error

func ScramjetIntake added in v0.0.10

func ScramjetIntake(in IntakeConfig, process TrajectConfig) error

func ScramjetOutake added in v0.0.10

func ScramjetOutake(out OutakeConfig) error

func SetConfig added in v0.0.10

func SetConfig(c *Config)

func SetLogLevel added in v0.0.10

func SetLogLevel(lvl LogLevel)

func SetLogger added in v0.0.10

func SetLogger(log Logger)

func Shutdown added in v0.0.10

func Shutdown()

func StagingCount added in v0.0.8

func StagingCount() int

just for verification

func StagingDeleteCount

func StagingDeleteCount(typeName string) int

NOTE: only used in test - for verification

func StagingResourceExists

func StagingResourceExists(id string, typeName string) bool

returns false if error - maybe should not

func StagingTableExists

func StagingTableExists() bool

NOTE: could call Fatalf

func StashStaging

func StashStaging(docs ...Storeable) error

func TimestampString added in v0.0.17

func TimestampString() string

func Traject added in v0.0.10

func Traject(config TrajectConfig) error

func TransferAll added in v0.0.10

func TransferAll(typeName string, validator ValidatorFunc) error

func TransferSubset added in v0.0.10

func TransferSubset(typeName string, filter Filter, validator ValidatorFunc) error

Types

type CompareOpt added in v0.0.6

type CompareOpt string
const (
	Eq  CompareOpt = "="
	Gt  CompareOpt = ">"
	Lt  CompareOpt = "<"
	Gte CompareOpt = ">="
	Lte CompareOpt = "<="
	In  CompareOpt = "IN"
)

type Config

type Config struct {
	Database DatabaseInfo
	Logger   *Logger
	LogLevel LogLevel
}

more flexible?

var Cfg *Config

func GetConfig added in v0.0.10

func GetConfig() *Config

type DatabaseInfo

type DatabaseInfo struct {
	Server         string
	Port           int
	Database       string
	User           string
	Password       string
	MaxConnections int
	AcquireTimeout int
	Application    string
}

type DiffProcessConfig

type DiffProcessConfig struct {
	TypeName          string
	ExistingListMaker ExistingListMaker
	ListMaker         OutakeListMaker
	AllowDeleteAll    bool
}

to look for diffs for duid (for instance) both lists have to be sent in

type ExistingListMaker

type ExistingListMaker func() ([]Resource, error)

type Filter added in v0.0.6

type Filter struct {
	Field     string
	Value     string
	Compare   CompareOpt
	SubFilter *SubFilter
}

type Identifiable

type Identifiable interface {
	Identifier() Identifier
}

func RetrieveDeletedStaging

func RetrieveDeletedStaging(typeName string) ([]Identifiable, error)

type Identifier

type Identifier struct {
	Id, Type string
}

type IntakeConfig added in v0.0.10

type IntakeConfig struct {
	TypeName  string
	ListMaker IntakeListMaker
	Count     int
	ChunkSize int
}

type IntakeListMaker

type IntakeListMaker func(int) ([]Storeable, error)

the parameter (int) is 'offset'

type LogLevel added in v0.0.10

type LogLevel int
const (
	INFO  LogLevel = 0
	DEBUG LogLevel = 1
)

func GetLogLevel added in v0.0.10

func GetLogLevel() LogLevel

type Logger added in v0.0.10

type Logger interface {
	Info(msg string)
	Debug(msg string)
}

func GetLogger added in v0.0.10

func GetLogger() Logger

type OutakeConfig added in v0.0.10

type OutakeConfig struct {
	TypeName  string
	ListMaker OutakeListMaker
	Filter    *Filter
}

type OutakeListMaker

type OutakeListMaker func() ([]string, error)

maybe interface instead of func type in struct?

type Packet

type Packet struct {
	Id  Identifier
	Obj interface{} // this will be serialized
}

func MakePacket added in v0.0.10

func MakePacket(id string, typeName string, obj interface{}) Packet

func (Packet) Identifier

func (p Packet) Identifier() Identifier

func (Packet) Object

func (ps Packet) Object() interface{}

type Resource

type Resource struct {
	Id        string       `db:"id"`
	Type      string       `db:"type"`
	Hash      string       `db:"hash"`
	Data      pgtype.JSON  `db:"data"`
	DataB     pgtype.JSONB `db:"data_b"`
	CreatedAt time.Time    `db:"created_at"`
	UpdatedAt time.Time    `db:"updated_at"`
}

this is the raw structure in the database two json columms: * 'data' can be used for change comparison with hash * 'data_b' can be used for searches

func RetrieveSingleResource added in v0.0.3

func RetrieveSingleResource(id string, typeName string) (Resource, error)

func RetrieveTypeResources

func RetrieveTypeResources(typeName string) ([]Resource, error)

func RetrieveTypeResourcesByQuery added in v0.0.6

func RetrieveTypeResourcesByQuery(typeName string, filter Filter) ([]Resource, error)

func RetrieveTypeResourcesLimited

func RetrieveTypeResourcesLimited(typeName string, limit int) ([]Resource, error)

func ScanResources added in v0.0.6

func ScanResources(rows pgx.Rows) ([]Resource, error)

TODO: could just send in date - leave it up to library user to determine how it's figured out

func (Resource) Identifier

func (res Resource) Identifier() Identifier

type ResourceListMaker added in v0.0.9

type ResourceListMaker func() ([]Resource, error)

type StagingResource

type StagingResource struct {
	Id       string       `db:"id"`
	Type     string       `db:"type"`
	Data     []byte       `db:"data"`
	IsValid  sql.NullBool `db:"is_valid"`
	ToDelete sql.NullBool `db:"to_delete"`
}

NOTE: just making json []byte instead of pgtype.JSON

func RetrieveAllStaging added in v0.0.8

func RetrieveAllStaging() ([]StagingResource, error)

just in case we need to look at all records there

func RetrieveInvalidStaging

func RetrieveInvalidStaging(typeName string) ([]StagingResource, error)

func RetrieveSingleStaging

func RetrieveSingleStaging(id string, typeName string) (StagingResource, error)

func RetrieveSingleStagingDelete

func RetrieveSingleStagingDelete(id string, typeName string) (StagingResource, error)

func RetrieveSingleStagingValid

func RetrieveSingleStagingValid(id string, typeName string) (StagingResource, error)

func RetrieveTypeStaging

func RetrieveTypeStaging(typeName string) ([]StagingResource, error)

func RetrieveTypeStagingFiltered added in v0.0.6

func RetrieveTypeStagingFiltered(typeName string, filter Filter) ([]StagingResource, error)

func RetrieveValidStaging

func RetrieveValidStaging(typeName string) ([]StagingResource, error)

func RetrieveValidStagingFiltered added in v0.0.6

func RetrieveValidStagingFiltered(typeName string, filter Filter) ([]StagingResource, error)

func ScanStaging added in v0.0.6

func ScanStaging(rows pgx.Rows) ([]StagingResource, error)

func (StagingResource) Identifier

func (res StagingResource) Identifier() Identifier

kind of like dual primary key

type Storeable

type Storeable interface {
	Identifier() Identifier
	Object() interface{}
}

type Stub

type Stub struct {
	Id Identifier
}

func MakeStub added in v0.0.10

func MakeStub(id string, typeName string) Stub

func (Stub) Identifier

func (s Stub) Identifier() Identifier

type SubFilter added in v0.0.7

type SubFilter struct {
	Typename    string
	MatchField  string
	Value       string
	ParentMatch string /* could be different than 'Field' */
}

type TrajectConfig added in v0.0.10

type TrajectConfig struct {
	TypeName  string
	Validator ValidatorFunc
	Filter    *Filter
}

type ValidatorFunc

type ValidatorFunc func(json string) bool

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL