ingest

package
v0.0.0-...-08598bc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2021 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var RetryError = regexp.MustCompile(`(?i)Please retry`)

RetryError A Regex that matches a healthy transaction aborted error in dgraph. It is part of normal operation for a transaction to be aborted and need to be retried due to the fact that transactions don't hold locks in dgraph. If we get an error matching this regex them it means there wasn't anything wrong with the request, it just wants us t try again later

View Source
var Schema = `` /* 1412-byte string literal not displayed */

Schema Stores the overall schema. I'm sure this is a bad way to do thinks from a future compatability perspective...

Functions

func BatchToUpsert

func BatchToUpsert(batch ItemNodes) *api.Request

BatchToUpsert converts a set of ItemNodes to an Upsert request

func InitConfig

func InitConfig(cfgFile string)

InitConfig reads in config file and ENV variables if set and initialises all other config

func InitConfigFunc

func InitConfigFunc(cfgFile string) func()

InitConfigFunc Returns a function that calls InitConfig(cfgFile)

func MessageToItem

func MessageToItem(msg *nats.Msg) (*sdp.Item, error)

MessageToItem Converts a NATS message to an SDP Item

func NewDGraphClient

func NewDGraphClient(hostname string, port int, connectTimeout time.Duration) (*dgo.Dgraph, error)

NewDGraphClient Create a dgraph client connection

func NewNATSConnection

func NewNATSConnection(urls []string, retries int, sleep int, timeout time.Duration) *nats.Conn

NewNATSConnection connects to a given NATS URL, it also support retries. Servers should be supplied as a slice of URLs e.g.

link.NewNATSConnection([]string{"nats://127.0.0.1:1222", "nats://127.0.0.1:1223"}, 5, 5)

func SetConfigDefaults

func SetConfigDefaults()

SetConfigDefaults Registers default values for config with Viper. This should always be called at some point before trying to do anything with this library

func SetupSchemas

func SetupSchemas(dg *dgo.Dgraph) error

SetupSchemas Will create the schemas required for ingest to work. This will need to be run before anything can actually be inserted into the database

Types

type DGraph

type DGraph struct {
	Conn *dgo.Dgraph
}

DGraph Stores details about the DGraph connection

type Ingestor

type Ingestor struct {
	BatchSize     int           // The number of items to batch before inserting
	MaxWait       time.Duration // Max amount of time to wait before inserting
	Dgraph        *dgo.Dgraph   // The DGraph connection to use
	DebugChannel  chan UpsertResult
	IngestRetries int
	// contains filtered or unexported fields
}

Ingestor is capable of ingesting items into the database

func (*Ingestor) AsyncHandle

func (i *Ingestor) AsyncHandle(msg *nats.Msg)

AsyncHandle Creates a NATS message handler that upserts items into the given database

func (*Ingestor) EnsureItemChannel

func (i *Ingestor) EnsureItemChannel()

EnsureItemChannel Ensures that the item channel exists

func (*Ingestor) ProcessBatches

func (i *Ingestor) ProcessBatches(ctx context.Context)

ProcessBatches will start inserting items into the database in batches. This will block forever

func (*Ingestor) RetryUpsert

func (i *Ingestor) RetryUpsert(insertions []ItemInsertion)

RetryUpsert Will do something about retrying upserts. Maybe put the back in the queue using a TTL, maybe just sleep and retry... TODO: Decide on the retry functionality

func (*Ingestor) Upsert

func (i *Ingestor) Upsert(req *api.Request) (*api.Response, error)

Upsert Runs sn upsert request using the specified timeouts from the config

type ItemInsertion

type ItemInsertion struct {
	Item ItemNode
	TTL  int
}

ItemInsertion Represents an item to be inserted, it includes an item and the TTL. The TTL will be reduced each time it is retried

type ItemNode

type ItemNode struct {
	Type                 string        `json:"Type,omitempty"`
	UniqueAttribute      string        `json:"UniqueAttribute,omitempty"`
	Context              string        `json:"Context,omitempty"`
	Attributes           string        `json:"Attributes,omitempty"`
	UniqueAttributeValue string        `json:"UniqueAttributeValue,omitempty"`
	GloballyUniqueName   string        `json:"GloballyUniqueName,omitempty"`
	Hash                 string        `json:"Hash,omitempty"`
	Metadata             *sdp.Metadata `json:"-"`
	LinkedItems          ItemNodes     `json:"-"`
}

ItemNode Represents an item, it also is able to return a full list of mutations

## Attributes Predicate

Currently attributes are stored as a JSON string. This has made the database queries very easy but will likely cause performance issues in future. This is due to the fact that dgraph does predicate based sharding i.e. data is sharded by predicate and not by UID. This means that all values of "attributes" (which will represent the vast majority of tha database) will be stored in the same shard as it's all in the one predicate. Initially I had tried storing each attribute as its won predicate with links between them. This would be much better from a sharding perspective, but would make life much harder in a number of ways, as detailed below.

### Predicate Expansion

When querying we would need to expand the predicates which would mean that we would either need to know the predicates in advance (difficult due to the fact that attributes are arbitrary) or dynamically generate a named type in dgraph for each and store this in the schema.

### Orphaned Nodes

When updating data we could very easily orphan nodes since we are creating nodes arbitrarily with arbitrary relationships. Think of a kubernetes pod with many statuses, each of these will require a node and once the pod is deleted they would need to be deleted too. Also if a nested hash changed it would probably need to be re-created as opposed to updated since we don't know what makes it unique. This would mean that he old node would still exist but would now be an orphan. There would need to be some regular cleanup of these orphaned nodes probably

func ItemToItemNode

func ItemToItemNode(item *sdp.Item) (ItemNode, error)

ItemToItemNode converts an item to an ItemNode

func MessageToItemNode

func MessageToItemNode(msg *nats.Msg) (ItemNode, error)

MessageToItemNode Converts a NATS message to a DGraph ItemNode

func QueryItem

func QueryItem(d *dgo.Dgraph, globallyUniqueName string) (ItemNode, error)

QueryItem Queries a single item from the database

func (ItemNode) IsPlaceholder

func (i ItemNode) IsPlaceholder() bool

IsPlaceholder Returns true if the item is just a placeholder

func (ItemNode) MarshalJSON

func (i ItemNode) MarshalJSON() ([]byte, error)

MarshalJSON Custom marshalling functionality that adds derived fields required for DGraph

func (*ItemNode) Mutation

func (i *ItemNode) Mutation() *api.Mutation

Mutation Returns a list of mutations that can be

func (*ItemNode) Queries

func (i *ItemNode) Queries() Queries

Queries Returns the queries that should match specifically this item. It will also export the following variables:

  • `{Hash}.item`: UID of this item
  • `{Hash}.item.older`: UID of this item, if it is older than the supplied one
  • `{Hash}.linkedItemsCount`: count() of the linked items

func (*ItemNode) UnmarshalJSON

func (i *ItemNode) UnmarshalJSON(value []byte) error

UnmarshalJSON Converts from JSON to ItemNode

type ItemNodes

type ItemNodes []ItemNode

ItemNodes Represents a list of ItemNodes in dgraph

func (ItemNodes) Deduplicate

func (i ItemNodes) Deduplicate() ItemNodes

Deduplicate Removes duplicate items, with clashes being resolved as follows:

  • Newer items beat older items
  • Complete items beat items that are only references/placeholders (i.e. those that do not have attributes and metadata)

func (ItemNodes) LinkedItems

func (i ItemNodes) LinkedItems() ItemNodes

LinkedItems Returns the linked items as ItemNodes

type Queries

type Queries []Query

Queries Is a list of dgraph queries

func (Queries) Deduplicate

func (q Queries) Deduplicate() Queries

Deduplicate Removes duplicate queries

func (Queries) String

func (q Queries) String() string

type Query

type Query struct {
	QueryFunc string
	Variables []Variable
}

Query Represents a dgraph query

func (Query) String

func (q Query) String() string

type UpsertResult

type UpsertResult struct {
	Context              string
	Type                 string
	UniqueAttributeValue string
	Attributes           string
	Request              *api.Request
	Error                error
}

UpsertResult Represents the result of handling an upsert

type Variable

type Variable struct {
	Name   string
	Source string
}

Variable represents a query variable where Name is the actual name of the variable and Source is the thing in the query that populates it, such as "count(uid)"

func (Variable) String

func (v Variable) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL