Documentation
¶
Index ¶
- Variables
- func AddTempClassifierConfig(dataModel, n3id string, requiredPaths, links, unique []string) string
- func ClearTempClassifierConfig()
- func DecodeCRDT(encoded []byte) (*vvmap.Map, error)
- func EncodeCRDT(vvm *vvmap.Map) ([]byte, error)
- func Flatten(m map[string]interface{}) map[string]interface{}
- func GetCurClassifierConfig() string
- func MergeErrors(cs ...<-chan error) <-chan error
- func WaitForPipeline(errs ...<-chan error) error
- type CRDTData
- type CRDTManager
- func (crdtm *CRDTManager) Close()
- func (crdtm *CRDTManager) SendFromFile(fname string) error
- func (crdtm *CRDTManager) SendFromHTTPRequest(r *http.Request) error
- func (crdtm *CRDTManager) SendFromReader(r io.Reader) error
- func (crdtm *CRDTManager) StartReceiver() (<-chan []byte, error)
- func (crdtm *CRDTManager) StopReceiver()
Constants ¶
This section is empty.
Variables ¶
var DefaultResolver = LexicographicConflictResolver
var LexicographicConflictResolver = func(key string, left, right vvmap.Record) bool { leftVal := fmt.Sprintf("%v", left.Value) rightVal := fmt.Sprintf("%v", right.Value) return strings.Compare(leftVal, rightVal) > 0 }
conflict resolver to choose between versions of a value when versions are the same
needs Go 12, where maps are printed with all elements in key order.
Functions ¶
func AddTempClassifierConfig ¶ added in v0.1.2
AddTempClassifierConfig appends "[[classifier]]/data_model/required_paths/n3id/links/unique" to default coded classifierConfigText. return temp config text
func ClearTempClassifierConfig ¶ added in v0.1.4
func ClearTempClassifierConfig()
ClearTempClassifierConfig :
func DecodeCRDT ¶
binary decoding for messages coming from datastore.
func EncodeCRDT ¶
binary encding for messages going to internal datastore.
func Flatten ¶
Flatten takes a map of a json file and returns a new one where nested maps are replaced by dot-delimited keys.
Also returns a sorted list of keys if updating in same order is important as when setting versions in the crdt.
func GetCurClassifierConfig ¶ added in v0.1.3
func GetCurClassifierConfig() string
GetCurClassifierConfig :
func MergeErrors ¶
MergeErrors merges multiple channels of errors. Based on https://blog.golang.org/pipelines.
func WaitForPipeline ¶
WaitForPipeline waits for results from all error channels. It returns early on the first error.
Types ¶
type CRDTData ¶
type CRDTData struct { // // unique id determined for this object // will be taken from the object if it has a // declared unique id (in config) // If no unique id present in the object one will be // assigned. // N3id string // // The data model this object associated with through // classification // DataModel string // // Type of the object, derived from classifier // Type string // // map containing the original json // RawData map[string]interface{} // // the crdt to hold the data // CRDT *vvmap.Map // // encoded binary of the crdt // EncodedCRDT []byte // // user id to identify the owner of the // changes to the data // UserId string // // streaming topic to pubish to // TopicName string // // Flag whether any new data was added // Updated bool }
data type passed through all stages of the send pipeline
type CRDTManager ¶
type CRDTManager struct { // // set level of audit ouput, one of: none, basic, high // AuditLevel string // // user id to identify who is making changes // UserId string // // topic/context stream name used to exchange data // TopicName string // // conext cancelFunc used to close the // stream-receiver cleanly // ReceiverCancelFunc func() // contains filtered or unexported fields }
func NewCRDTManager ¶
func NewCRDTManager(userid string, topic string) (*CRDTManager, error)
Open a crdt manager with supporting datastores will use the local path ./contexts/[userid]/[topic]/crdt/send & ./contexts/[usierid]/[topic]/crdt/recv by default
func (*CRDTManager) Close ¶
func (crdtm *CRDTManager) Close()
safely shut down all databases & connections
func (*CRDTManager) SendFromFile ¶
func (crdtm *CRDTManager) SendFromFile(fname string) error
Sends a file of json objects through the crdt manager
func (*CRDTManager) SendFromHTTPRequest ¶
func (crdtm *CRDTManager) SendFromHTTPRequest(r *http.Request) error
sends an htttp request contianing json objects through the crdt manager
func (*CRDTManager) SendFromReader ¶
func (crdtm *CRDTManager) SendFromReader(r io.Reader) error
sends the content of the given reader (assumed to be stream of json objects) through the crdt manager.
func (*CRDTManager) StartReceiver ¶
func (crdtm *CRDTManager) StartReceiver() (<-chan []byte, error)
starts a stream listener/processor for the topic associated with this manager
func (*CRDTManager) StopReceiver ¶
func (crdtm *CRDTManager) StopReceiver()
shuts down the receiver gracefully
Source Files
¶
- codec.go
- config.go
- crdtdata.go
- crdtdecode.go
- crdtjson.go
- crdtmanager.go
- crdtmerge.go
- crdtsource.go
- crdtwrap.go
- find.go
- flatten.go
- jsonreader.go
- lexresolver.go
- objectclassifier.go
- package.go
- pipelineHelpers.go
- publishcrdt.go
- receivefromstream.go
- runreceive.go
- runsend.go
- savecrdt.go
- sendfromfile.go
- timetrack.go
- updatecrdt.go