package module
v0.9.0-alpha Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2018 License: Apache-2.0 Imports: 15 Imported by: 0




Etre is an entity API for managing primitive entities with labels.

This project is still under development and should not be used for anything in production yet. We are not seeking external contributors at this time


Apache 2.0



Package etre provides API clients and low-level primitive data types.



View Source
const (
	API_ROOT          string = "/api/v1"
	META_LABEL_ID            = "_id"
	META_LABEL_TYPE          = "_type"
	VERSION                  = "0.9.0-alpha"
	CDC_WRITE_TIMEOUT int    = 5 // seconds
View Source


View Source
var (
	ErrTypeMismatch   = errors.New("entity _type and Client entity type are different")
	ErrIdSet          = errors.New("entity _id is set but not allowed on insert")
	ErrIdNotSet       = errors.New("entity _id is not set")
	ErrNoEntity       = errors.New("empty entity or id slice; at least one required")
	ErrNoLabel        = errors.New("empty label slice; at least one required")
	ErrNoQuery        = errors.New("empty query string")
	ErrBadData        = errors.New("data from CDC feed is not event or control")
	ErrCallerBlocked  = errors.New("caller blocked")
	ErrEntityNotFound = errors.New("entity not found")
View Source
var Debug = false


func IsMetalabel

func IsMetalabel(label string) bool


type ByRev

type ByRev []CDCEvent

func (ByRev) Len

func (a ByRev) Len() int

func (ByRev) Less

func (a ByRev) Less(i, j int) bool

func (ByRev) Swap

func (a ByRev) Swap(i, j int)

type CDCClient

type CDCClient interface {
	// Start starts the CDC feed from the given time. On success, a feed channel
	// is returned on which the caller can receive CDC events for as long as
	// the feed remains connected. Calling Start again returns the same feed
	// channel if already started. To restart the feed, call Stop then Start
	// again. On error or abnormal shutdown, the feed channel is closed, and
	// Error returns the error.
	Start(time.Time) (<-chan CDCEvent, error)

	// Stop stops the feed and closes the feed channel returned by Start. It is
	// safe to call multiple times.

	// Ping pings the API and reports latency. Latency values are all zero on
	// timeout or error. On error, the feed is most likely closed.
	Ping(timeout time.Duration) Latency

	// Error returns the error that caused the feed channel to be closed. Start
	// resets the error.
	Error() error

A CDCClient consumes a change feed of Change Data Capture (CDC) events for all entity types. It handles all control messages. The caller should call Stop when done to shutdown the feed. On error or abnormal shutdown, Error returns the last error.

func NewCDCClient

func NewCDCClient(addr string, tlsConfig *tls.Config, bufferSize int, debug bool) CDCClient

NewCDCClient creates a CDC feed consumer on the given websocket address. addr must be ws://host:port or wss://host:port.

bufferSize causes Start to create and return a buffered feed channel. A value of 10 is reasonable. If the channel blocks, it is closed and Error returns ErrCallerBlocked.

Enable debug prints a lot of low-level feed/websocket logging to STDERR.

The client does not automatically ping the server. The caller should run a separate goroutine to periodically call Ping. Every 10-60s is reasonable.

type CDCEvent

type CDCEvent struct {
	EventId    string  `json:"eventId" bson:"eventId"`
	EntityId   string  `json:"entityId" bson:"entityId"`     // _id of entity
	EntityType string  `json:"entityType" bson:"entityType"` // user-defined
	Rev        uint    `json:"rev" bson:"rev"`               // entity revision as of this op, 0 on insert
	Ts         int64   `json:"ts" bson:"ts"`                 // Unix nanoseconds
	User       string  `json:"user" bson:"user"`
	Op         string  `json:"op" bson:"op"`                       // i=insert, u=update, d=delete
	Old        *Entity `json:"old,omitempty" bson:"old,omitempty"` // old values of affected labels, null on insert
	New        *Entity `json:"new,omitempty" bson:"new,omitempty"` // new values of affected labels, null on delete

	// Set op fields are optional, copied from entity if set. The three
	// fields are all or nothing: all should be set, or none should be set.
	// Etre has no semantic awareness of set op values, nor does it validate
	// them. The caller is responsible for ensuring they're correct.
	SetId   string `json:"setId,omitempty" bson:"setId,omitempty"`
	SetOp   string `json:"setOp,omitempty" bson:"setOp,omitempty"`
	SetSize int    `json:"setSize,omitempty" bson:"setSize,omitempty"`

type Entity

type Entity map[string]interface{}

Entity represents a single Etre entity. The caller is responsible for knowing or determining the type of value for each key.

If label _type is set, the Client verifies that it matches its type. For example, if _type = "foo", Insert or Update with a Client bound to entity type "bar" returns ErrTypeMismatch. If label _type is not set, the Client entity type is presumed.

Label _id cannot be set on insert. If set, Insert returns ErrIdSet. On update, label _id must be set; if not, Update returns ErrIdNotSet. _id corresponds to WriteResult.Writes[].Id.

func (Entity) Has

func (e Entity) Has(label string) bool

Has returns true of the entity has the label, regardless of its value.

func (Entity) Id

func (e Entity) Id() string

func (Entity) Labels

func (e Entity) Labels() []string

Labels returns all labels, sorted, including meta-labels (_id, _type, etc.)

func (Entity) Set

func (e Entity) Set() Set

func (Entity) String

func (e Entity) String(label string) string

String returns the string value of the label. If the label is not set or its value is not a string, an empty string is returned.

func (Entity) Type

func (e Entity) Type() string

type EntityClient

type EntityClient interface {
	// Query returns entities that match the query and pass the filter.
	Query(query string, filter QueryFilter) ([]Entity, error)

	// Insert is a bulk operation that creates the given entities.
	Insert([]Entity) (WriteResult, error)

	// Update is a bulk operation that patches entities that match the query.
	Update(query string, patch Entity) (WriteResult, error)

	// UpdateOne patches the given entity by internal ID.
	UpdateOne(id string, patch Entity) (WriteResult, error)

	// Delete is a bulk operation that removes all entities that match the query.
	Delete(query string) (WriteResult, error)

	// DeleteOne removes the given entity by internal ID.
	DeleteOne(id string) (WriteResult, error)

	// Labels returns all labels on the given entity by internal ID.
	Labels(id string) ([]string, error)

	// DeleteLabel removes the given label from the given entity by internal ID.
	// Labels should be stable, long-lived. Consequently, there's no bulk label delete.
	DeleteLabel(id string, label string) (WriteResult, error)

	// EntityType returns the entity type of the client.
	EntityType() string

	// WithSet returns a new EntityClient that uses the given Set for all write operations.
	// The Set cannot be removed. Therefore, when the set is complete, discard the new
	// EntityClient (let its reference count become zero). On insert, the given Set is added
	// to entities that do not have explicit set labels (_setOp, _setId, and _setSize).
	// On update and delete, the given Set is passed as URL query parameteres (setOp, setId,
	// and setSize). Sets do not apply to queries. The Set is not checked or validated; the
	// caller must ensure that Set.Size is greater than zero and Set.Op and Set.Id are nonempty
	// strings.
	WithSet(Set) EntityClient

EntityClient represents a entity type-specific client. No interface method has an entity type argument because a client is bound to only one entity type. Use a EntityClients map to pass multiple clients for different entity types.

func NewEntityClient

func NewEntityClient(entityType, addr string, httpClient *http.Client) EntityClient

NewEntityClient creates a new type-specific Etre API client that makes requests with the given http.Client. An Etre client is bound to the specified entity type. Use an etre.EntityClients map to pass multiple type-specific clients. Like the given http.Client, an Etre client is safe for use by multiple goroutines, so only one entity type-specific client should be created.

type EntityClients

type EntityClients map[string]EntityClient

EntityClients represents type-specific entity clients keyed on user-defined const which define each entity type. For example:

const (
  ENTITY_TYPE_FOO string = "foo"
  ENTITY_TYPE_BAR        = "bar"

Pass an etre.EntityClients to use like:

func CreateFoo(ec etre.EntityClients) {

Using EntityClients and const entity types is optional but helps avoid typos.

type Error

type Error struct {
	Message    string `json:"message"`    // human-readable and loggable error message
	Type       string `json:"type"`       // error slug (e.g. db-error, missing-param, etc.)
	EntityId   string `json:"entityId"`   // entity ID that caused error, if any
	HTTPStatus int    `json:"httpStatus"` // HTTP status code

Error is the standard response for all handled errors. Client errors (HTTP 400 codes) and internal errors (HTTP 500 codes) are returned as an Error, if handled. If not handled (API crash, panic, etc.), Etre returns an HTTP 500 code and the response data is undefined; the client should print any response data as a string.

func (Error) Error

func (e Error) Error() string

func (Error) IsZero

func (e Error) IsZero() bool

func (Error) New

func (e Error) New(msgFmt string, msgArgs ...interface{}) Error

func (Error) String

func (e Error) String() string

type Latency

type Latency struct {
	Send int64 // client -> server
	Recv int64 // server -> client
	RTT  int64 // client -> server -> client

Latency represents network latencies in milliseconds.

type MockCDCClient

type MockCDCClient struct {
	StartFunc func(time.Time) (<-chan CDCEvent, error)
	StopFunc  func()
	PingFunc  func(time.Duration) Latency
	ErrorFunc func() error

func (MockCDCClient) Error

func (c MockCDCClient) Error() error

func (MockCDCClient) Ping

func (c MockCDCClient) Ping(timeout time.Duration) Latency

func (MockCDCClient) Start

func (c MockCDCClient) Start(startTs time.Time) (<-chan CDCEvent, error)

func (MockCDCClient) Stop

func (c MockCDCClient) Stop()

type MockEntityClient

type MockEntityClient struct {
	QueryFunc       func(string, QueryFilter) ([]Entity, error)
	InsertFunc      func([]Entity) (WriteResult, error)
	UpdateFunc      func(query string, patch Entity) (WriteResult, error)
	UpdateOneFunc   func(id string, patch Entity) (WriteResult, error)
	DeleteFunc      func(query string) (WriteResult, error)
	DeleteOneFunc   func(id string) (WriteResult, error)
	LabelsFunc      func(id string) ([]string, error)
	DeleteLabelFunc func(id string, label string) (WriteResult, error)
	EntityTypeFunc  func() string
	WithSetFunc     func(Set) EntityClient

MockEntityClient implements EntityClient for testing. Defined callback funcs are called for the respective interface method, otherwise the default methods return empty slices and no error. Defining a callback function allows tests to intercept, save, and inspect Client calls and simulate Etre API returns.

func (MockEntityClient) Delete

func (c MockEntityClient) Delete(query string) (WriteResult, error)

func (MockEntityClient) DeleteLabel

func (c MockEntityClient) DeleteLabel(id string, label string) (WriteResult, error)

func (MockEntityClient) DeleteOne

func (c MockEntityClient) DeleteOne(id string) (WriteResult, error)

func (MockEntityClient) EntityType

func (c MockEntityClient) EntityType() string

func (MockEntityClient) Insert

func (c MockEntityClient) Insert(entities []Entity) (WriteResult, error)

func (MockEntityClient) Labels

func (c MockEntityClient) Labels(id string) ([]string, error)

func (MockEntityClient) Query

func (c MockEntityClient) Query(query string, filter QueryFilter) ([]Entity, error)

func (MockEntityClient) Update

func (c MockEntityClient) Update(query string, patch Entity) (WriteResult, error)

func (MockEntityClient) UpdateOne

func (c MockEntityClient) UpdateOne(id string, patch Entity) (WriteResult, error)

func (MockEntityClient) WithSet

func (c MockEntityClient) WithSet(set Set) EntityClient

type QueryFilter

type QueryFilter struct {
	// ReturnLabels defines labels included in matching entities. An empty slice
	// returns all labels, including meta-labels. Else, only labels in the slice
	// are returned.
	ReturnLabels []string

QueryFilter represents filtering options for EntityClient.Query().

type RevOrder

type RevOrder struct {
	// contains filtered or unexported fields

RevOrder handles entity revision ordering. Normally, an Etre CDC feed sends entities in revision order (ascending revision numbers). It is possible, although extremely unlikely, that revisions can be received out of order. A RevOrder handles this by buffering the out-of-order revisions until a complete, in-order sequence is received. It can be used by a CDC feed consumer like:

revo := etre.NewRevOrder(0) // 0 = DEFAULT_MAX_ENTITIES

// Handle out-of-order revisions (per entity ID)
ok, prev := revo.InOrder(event)
if !ok {
    // This rev is out of order, skip (buffered in revo)

// Sync ordered set of previously out-of-order revs
if prev != nil {
    for _, p := range prev {
        if err := sync(p); err != nil {
            return err

// Sync the current rev (in order)
if err := sync(event); err != nil {
    return err

Using a RevOrder is optional but a best practice to be safe.

func NewRevOrder

func NewRevOrder(maxEntities uint) *RevOrder

NewRevOrder returns a new RevOrder that tracks the last revision of at most maxEntities using an LRU cache. If NewRevOrder is zero, DEFAULT_MAX_ENTITIES is used. It's not possible or necessary to track all entities. If an entity hasn't been seen in awhile (e.g. 1,000 entities later), we presume that it won't be seen again, but if it is we can further presume that the next revision is the correct next revision to the previous revision seen but evicted from the LRU cache.

func (*RevOrder) InOrder

func (r *RevOrder) InOrder(e CDCEvent) (bool, []CDCEvent)

InOrder returns true if the given event is in order. If not (false), the caller should skip and ignore the event. If true, a non-nil slice of previous revisions is also returned when they complete an in-order sequnce to but not including the given event. In this case, the caller should sync all previous revisions first, then sync the given event.

type Set

type Set struct {
	Id   string
	Op   string
	Size int

A Set is a user-defined logical grouping of writes (insert, update, delete).

type Write

type Write struct {
	Id    string `json:"id"`              // internal _id of entity (all write ops)
	URI   string `json:"uri,omitempty"`   // fully-qualified address of new entity (insert)
	Diff  Entity `json:"diff,omitempty"`  // previous entity label values (update)
	Error string `json:"error,omitempty"` // v0.8 backward-compatibility

Write represents the successful write of one entity.

type WriteResult

type WriteResult struct {
	Writes []Write `json:"writes"`          // successful writes
	Error  *Error  `json:"error,omitempty"` // error before, during, or after writes

WriteResult represents the result of a write operation (insert, update delete). On success or failure, all write ops return a WriteResult.

If Error is set (not nil), some or all writes failed. Writes stop on the first error, so len(Writes) = index into slice of entities sent by client that failed. For example, if the first entity causes an error, len(Writes) = 0. If the third entity fails, len(Writes) = 2 (zero indexed).

func (WriteResult) IsZero

func (wr WriteResult) IsZero() bool


Path Synopsis
Package api provides API endpoints and controllers.
Package api provides API endpoints and controllers.
Package cdc provides interfaces for reading and writing change data capture (CDC) events.
Package cdc provides interfaces for reading and writing change data capture (CDC) events.
Package entity is a connector to execute CRUD commands for a single entity and many entities on a DB instance.
Package entity is a connector to execute CRUD commands for a single entity and many entities on a DB instance.
Package es provides a framework for integration with other programs.
Package es provides a framework for integration with other programs.
Package app provides app-wide data structs and functions.
Package app provides app-wide data structs and functions.
Package config handles config files, -config, and env vars at startup.
Package config handles config files, -config, and env vars at startup.
Package kls provides a light-weight parser for Kubernetes label selectors.
Package kls provides a light-weight parser for Kubernetes label selectors.
Package query is a wrapper for Kubernetes Labels Selector (KLS).
Package query is a wrapper for Kubernetes Labels Selector (KLS).
Package test provides helper functions for tests.
Package test provides helper functions for tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL