etre

package module
v0.11.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2021 License: Apache-2.0 Imports: 19 Imported by: 0

README

Etre

GoDoc

Etre is an entity API for managing primitive entities with labels.

This project is still under development and should not be used for anything in production yet. We are not seeking external contributors at this time

License

Apache 2.0

Documentation

Overview

Package etre provides API clients and low-level primitive data types.

Index

Constants

View Source
const (
	VERSION                  = "0.11.9"
	API_ROOT          string = "/api/v1"
	META_LABEL_ID            = "_id"
	META_LABEL_TYPE          = "_type"
	META_LABEL_REV           = "_rev"
	CDC_WRITE_TIMEOUT int    = 5 // seconds

	VERSION_HEADER       = "X-Etre-Version"
	TRACE_HEADER         = "X-Etre-Trace"
	QUERY_TIMEOUT_HEADER = "X-Etre-Query-Timeout"
)
View Source
const DEFAULT_MAX_ENTITIES = 1000

Variables

View Source
var (
	ErrTypeMismatch   = errors.New("entity _type and Client entity type are different")
	ErrIdSet          = errors.New("entity _id is set but not allowed on insert")
	ErrIdNotSet       = errors.New("entity _id is not set")
	ErrNoEntity       = errors.New("empty entity or id slice; at least one required")
	ErrNoLabel        = errors.New("empty label slice; at least one required")
	ErrNoQuery        = errors.New("empty query string")
	ErrBadData        = errors.New("data from CDC feed is not event or control")
	ErrCallerBlocked  = errors.New("caller blocked")
	ErrEntityNotFound = errors.New("entity not found")
	ErrClientTimeout  = errors.New("client timeout")
)
View Source
var (
	DebugEnabled = false
)

Functions

func Debug

func Debug(msg string, v ...interface{})

func IsMetalabel

func IsMetalabel(label string) bool

Types

type ByRev

type ByRev []CDCEvent

func (ByRev) Len

func (a ByRev) Len() int

func (ByRev) Less

func (a ByRev) Less(i, j int) bool

func (ByRev) Swap

func (a ByRev) Swap(i, j int)

type CDCClient

type CDCClient interface {
	// Start starts the CDC feed from the given time. On success, a feed channel
	// is returned on which the caller can receive CDC events for as long as
	// the feed remains connected. Calling Start again returns the same feed
	// channel if already started. To restart the feed, call Stop then Start
	// again. On error or abnormal shutdown, the feed channel is closed, and
	// Error returns the error.
	Start(time.Time) (<-chan CDCEvent, error)

	// Stop stops the feed and closes the feed channel returned by Start. It is
	// safe to call multiple times.
	Stop()

	// Ping pings the API and reports latency. Latency values are all zero on
	// timeout or error. On error, the feed is most likely closed.
	Ping(timeout time.Duration) Latency

	// Error returns the error that caused the feed channel to be closed. Start
	// resets the error.
	Error() error
}

A CDCClient consumes a change feed of Change Data Capture (CDC) events for all entity types. It handles all control messages. The caller should call Stop when done to shutdown the feed. On error or abnormal shutdown, Error returns the last error.

func NewCDCClient

func NewCDCClient(addr string, tlsConfig *tls.Config, bufferSize int, debug bool) CDCClient

NewCDCClient creates a CDC feed consumer on the given websocket address. addr must be ws://host:port or wss://host:port.

bufferSize causes Start to create and return a buffered feed channel. A value of 10 is reasonable. If the channel blocks, it is closed and Error returns ErrCallerBlocked.

Enable debug prints a lot of low-level feed/websocket logging to STDERR.

The client does not automatically ping the server. The caller should run a separate goroutine to periodically call Ping. Every 10-60s is reasonable.

type CDCEvent

type CDCEvent struct {
	Id     string `json:"eventId" bson:"_id,omitempty"`
	Ts     int64  `json:"ts" bson:"ts"` // Unix nanoseconds
	Op     string `json:"op" bson:"op"` // i=insert, u=update, d=delete
	Caller string `json:"user" bson:"caller"`

	EntityId   string  `json:"entityId" bson:"entityId"`           // _id of entity
	EntityType string  `json:"entityType" bson:"entityType"`       // user-defined
	EntityRev  int64   `json:"rev" bson:"entityRev"`               // entity revision as of this op, 0 on insert
	Old        *Entity `json:"old,omitempty" bson:"old,omitempty"` // old values of affected labels, null on insert
	New        *Entity `json:"new,omitempty" bson:"new,omitempty"` // new values of affected labels, null on delete

	// Set op fields are optional, copied from entity if set. The three
	// fields are all or nothing: all should be set, or none should be set.
	// Etre has no semantic awareness of set op values, nor does it validate
	// them. The caller is responsible for ensuring they're correct.
	SetId   string `json:"setId,omitempty" bson:"setId,omitempty"`
	SetOp   string `json:"setOp,omitempty" bson:"setOp,omitempty"`
	SetSize int    `json:"setSize,omitempty" bson:"setSize,omitempty"`
}

type Entity

type Entity map[string]interface{}

Entity represents a single Etre entity. The caller is responsible for knowing or determining the type of value for each key.

If label _type is set, the Client verifies that it matches its type. For example, if _type = "foo", Insert or Update with a Client bound to entity type "bar" returns ErrTypeMismatch. If label _type is not set, the Client entity type is presumed.

Label _id cannot be set on insert. If set, Insert returns ErrIdSet. On update, label _id must be set; if not, Update returns ErrIdNotSet. _id corresponds to WriteResult.Writes[].Id.

func (Entity) Has

func (e Entity) Has(label string) bool

Has returns true of the entity has the label, regardless of its value.

func (Entity) Id

func (e Entity) Id() string

func (Entity) Labels

func (e Entity) Labels() []string

Labels returns all labels, sorted, including meta-labels (_id, _type, etc.)

func (Entity) Rev added in v0.11.0

func (e Entity) Rev() int64

func (Entity) Set

func (e Entity) Set() Set

func (Entity) String

func (e Entity) String(label string) string

String returns the string value of the label. If the label is not set or its value is not a string, an empty string is returned.

func (Entity) Type

func (e Entity) Type() string

type EntityClient

type EntityClient interface {
	// Query returns entities that match the query and pass the filter.
	Query(query string, filter QueryFilter) ([]Entity, error)

	// Insert is a bulk operation that creates the given entities.
	Insert([]Entity) (WriteResult, error)

	// Update is a bulk operation that patches entities that match the query.
	Update(query string, patch Entity) (WriteResult, error)

	// UpdateOne patches the given entity by internal ID.
	UpdateOne(id string, patch Entity) (WriteResult, error)

	// Delete is a bulk operation that removes all entities that match the query.
	Delete(query string) (WriteResult, error)

	// DeleteOne removes the given entity by internal ID.
	DeleteOne(id string) (WriteResult, error)

	// Labels returns all labels on the given entity by internal ID.
	Labels(id string) ([]string, error)

	// DeleteLabel removes the given label from the given entity by internal ID.
	// Labels should be stable, long-lived. Consequently, there's no bulk label delete.
	DeleteLabel(id string, label string) (WriteResult, error)

	// EntityType returns the entity type of the client.
	EntityType() string

	// WithSet returns a new EntityClient that uses the given Set for all write operations.
	// The Set cannot be removed. Therefore, when the set is complete, discard the new
	// EntityClient (let its reference count become zero). On insert, the given Set is added
	// to entities that do not have explicit set labels (_setOp, _setId, and _setSize).
	// On update and delete, the given Set is passed as URL query parameteres (setOp, setId,
	// and setSize). Sets do not apply to queries. The Set is not checked or validated; the
	// caller must ensure that Set.Size is greater than zero and Set.Op and Set.Id are nonempty
	// strings.
	WithSet(Set) EntityClient

	// WithTrace returns a new EntityClient that sends the trace string with every request
	// for server-side metrics. The trace string is a comma-separated list of key=value
	// pairs like: app=foo,host=bar. Invalid trace values are silently ignored by the server.
	WithTrace(string) EntityClient
}

EntityClient represents a entity type-specific client. No interface method has an entity type argument because a client is bound to only one entity type. Use a EntityClients map to pass multiple clients for different entity types.

func NewEntityClient

func NewEntityClient(entityType, addr string, httpClient *http.Client) EntityClient

NewEntityClient creates a new type-specific Etre API client that makes requests with the given http.Client. An Etre client is bound to the specified entity type. Use an etre.EntityClients map to pass multiple type-specific clients. Like the given http.Client, an Etre client is safe for use by multiple goroutines, so only one entity type-specific client should be created.

func NewEntityClientWithConfig added in v0.9.7

func NewEntityClientWithConfig(c EntityClientConfig) EntityClient

type EntityClientConfig added in v0.9.7

type EntityClientConfig struct {
	EntityType   string        // entity type name
	Addr         string        // Etre server address (e.g. https://localhost:3848)
	HTTPClient   *http.Client  // configured http.Client
	Retry        uint          // optional retry count on network or API error
	RetryWait    time.Duration // optional wait time between retries
	RetryLogging bool          // log error on retry to stderr
	QueryTimeout time.Duration // timeout passed to API via etre.QUERY_TIMEOUT_HEADER
	Debug        bool
}

EntityClientConfig represents required and optional configuration for an EntityClient. This is used to make an EntityClient by calling NewEntityClientWithConfig.

type EntityClients

type EntityClients map[string]EntityClient

EntityClients represents type-specific entity clients keyed on user-defined const which define each entity type. For example:

const (
  ENTITY_TYPE_FOO string = "foo"
  ENTITY_TYPE_BAR        = "bar"
)

Pass an etre.EntityClients to use like:

func CreateFoo(ec etre.EntityClients) {
  ec[ENTITY_TYPE_FOO].Insert(...)
}

Using EntityClients and const entity types is optional but helps avoid typos.

type Error

type Error struct {
	Message    string `json:"message"`    // human-readable and loggable error message
	Type       string `json:"type"`       // error slug (e.g. db-error, missing-param, etc.)
	EntityId   string `json:"entityId"`   // entity ID that caused error, if any
	HTTPStatus int    `json:"httpStatus"` // HTTP status code
}

Error is the standard response for all handled errors. Client errors (HTTP 400 codes) and internal errors (HTTP 500 codes) are returned as an Error, if handled. If not handled (API crash, panic, etc.), Etre returns an HTTP 500 code and the response data is undefined; the client should print any response data as a string.

func (Error) Error

func (e Error) Error() string

func (Error) New

func (e Error) New(msgFmt string, msgArgs ...interface{}) Error

func (Error) String

func (e Error) String() string

type Latency

type Latency struct {
	Send int64 // client -> server
	Recv int64 // server -> client
	RTT  int64 // client -> server -> client
}

Latency represents network latencies in milliseconds.

type Metrics

type Metrics struct {
	// System metrics are measurements related to the API, not an entity type.
	// For example, authentication failures are a system metric.
	System *MetricsSystemReport `json:"system"`

	// Groups metrics are measurements related to user-defined groups and entity types.
	// The auth plugin sets groups for each caller (HTTP request). Metrics for the caller
	// are added to each group in the list, so a single call can count toward one or more
	// metric groups. If no groups are specified, no group metrics are recorded.
	Groups []MetricsGroupReport `json:"groups"`
}

Metrics represents all metrics. It is the message returned by the /metrics endpoint. Metrics do not reset when reported (i.e. on GET /metrics). To reset samples (for non-counter metrics), specify URL query parameter "reset=true" (GET /metrics?reset=true).

type MetricsCDCReport

type MetricsCDCReport struct {
	Clients int64 `json:"clients"`
}

type MetricsEntityReport

type MetricsEntityReport struct {
	EntityType string                         `json:"entity-type"`
	Query      *MetricsQueryReport            `json:"query"`
	Label      map[string]*MetricsLabelReport `json:"label"`
	Trace      map[string]map[string]int64    `json:"trace,omitempty"`
}

MetricsEntityReport are measurements related to an entity type. It contains three sub-reports: Query, Label, Trace.

type MetricsGroupReport added in v0.9.2

type MetricsGroupReport struct {
	Ts      int64                           `json:"ts"`
	Group   string                          `json:"group"`
	Request *MetricsRequestReport           `json:"request"`
	Entity  map[string]*MetricsEntityReport `json:"entity"`
	CDC     *MetricsCDCReport               `json:"cdc"`
}

MetricsGroupReport is the top-level metric reporting structure for each metric group. It contains metadata (Ts and Group) and three sub-reports: Request, Entity, CDC.

type MetricsLabelReport

type MetricsLabelReport struct {
	Read   int64 `json:"read"`
	Update int64 `json:"update"`
	Delete int64 `json:"delete"`
}

type MetricsQueryReport

type MetricsQueryReport struct {
	// Query counter is the grand total number of queries. Every authenticated
	// query increments Query by 1. Query = Read + Write.
	Query int64 `json:"query"`

	// Read counter is the total number of read queries. All read queries
	// increment Read by 1. Read = ReadQuery + ReadId + ReadLabels.
	// Read is incremented after authentication and before authorization.
	// All other read metrics are incremented after authorization.
	Read int64 `json:"read"`

	// ReadQuery counter is the number of reads by query. It is a subset of Read.
	// These API endpoints increment ReadQuery by 1:
	//   GET  /api/v1/entities/:type
	//   POST /api/v1/query/:type
	// See Labels stats for the number of labels used in the query.
	ReadQuery int64 `json:"read-query"`

	// ReadId counter is the number of reads by entity ID. It is a subset of Read.
	// These API endpoints increment ReadId by 1:
	//   GET /api/v1/entity/:id
	ReadId int64 `json:"read-id"`

	// ReadLabels counter is the number of read label queries. It is a subset of Read.
	// These API endpoints increment ReadLabels by 1:
	//   GET /api/v1/entity/:type/:id/labels
	ReadLabels int64 `json:"read-labels"`

	// ReadMatch stats represent the number of entities that matched the read
	// query and were returned to the client. See Labels stats for the number
	// of labels used in the query.
	ReadMatch_min int64 `json:"read-match_min"`
	ReadMatch_max int64 `json:"read-match_max"`
	ReadMatch_avg int64 `json:"read-match_avg"`
	ReadMatch_med int64 `json:"read-match_med"`

	// Write counter is the grand total number of write queries. All write queries
	// increment Write by 1. Write = CreateOne + CreateMany + UpdateId +
	// UpdateQuery + DeleteId + DeleteQuery + DeleteLabel.
	//
	// Write is incremented after authentication and before authorization, so it
	// does not count successful writes. Successfully written entities are measured
	// by counters Created, Updated, and Deleted. All other write metrics are
	// incremented after authorization.
	Write int64 `json:"write"`

	// CreateOne and CreateMany counters are the number of create queries.
	// They are subsets of Write. These API endpoints increment the metrics:
	//   POST /api/v1/entity/:type   (one)
	//   POST /api/v1/entities/:type (many/bulk)
	CreateOne  int64 `json:"create-one"`
	CreateMany int64 `json:"create-many"`

	// CreateBulk stats represent the number of entities received for CreateMany
	// (API endpoing POST /api/v1/entities/:type). The Created counter measures
	// the number of entities successfully created. These stats measure the size
	// of bulk create requests.
	CreateBulk_min int64 `json:"create-bulk_min"`
	CreateBulk_max int64 `json:"create-bulk_max"`
	CreateBulk_avg int64 `json:"create-bulk_avg"`
	CreateBulk_med int64 `json:"create-bulk_med"`

	// UpdateId and UpdateQuery counters are the number of update (patch) queries.
	// They are a subset of Write. These API endpoints increment the metrics:
	//   PUT /api/v1/entity/:type/:id (id)
	//   PUT /api/v1/entities/:type   (query)
	// See Labels stats for the number of labels used in the UpdateQuery query.
	UpdateId    int64 `json:"update-id"`
	UpdateQuery int64 `json:"update-query"`

	// UpdateBulk stats represent the number of entities that matched the bulk
	// update query and were updated. The Updated counter measures the number
	// of entities successfully updated. These stats measure the size of bulk
	// update requests.
	UpdateBulk_min int64 `json:"update-bulk_min"`
	UpdateBulk_max int64 `json:"update-bulk_max"`
	UpdateBulk_avg int64 `json:"update-bulk_avg"`
	UpdateBulk_med int64 `json:"update-bulk_med"`

	// DeleteId and DeleteQuery counters are the number of delete queries.
	// They are a subset of Write. These API endpoints increment the metrics:
	//   DELETE /api/v1/entity/:type   (id)
	//   DELETE /api/v1/entities/:type (query)
	// See Labels stats for the number of labels used in the DeleteQuery query.
	DeleteId    int64 `json:"delete-id"`
	DeleteQuery int64 `json:"delete-query"`

	// DeleteBulk stats represent the number of entities that matched the bulk
	// delete query and were deleted. The Deleted counter measures the number
	// of entities successfully deleted. These stats measure the size of bulk
	// delete requests.
	DeleteBulk_min int64 `json:"delete-bulk_min"`
	DeleteBulk_max int64 `json:"delete-bulk_max"`
	DeleteBulk_avg int64 `json:"delete-bulk_avg"`
	DeleteBulk_med int64 `json:"delete-bulk_med"`

	// DeleteLabel counter is the number of delete label queries. It is a subset of Write.
	// These API endpoints increment DeleteLabel:
	//   DELETE /api/v1/entity/:type/:id/labels/:label
	DeleteLabel int64 `json:"delete-label"`

	// Created, Updated, and Deleted counters are the number of entities successfully
	// created, updated, and deleted. These metrics are incremented in their
	// corresponding metric API endpoints when entities are successfully created,
	// updated, or deleted.
	//
	// For example, a request to PUT /api/v1/entity/:type/:id always increments
	// UpdateId by 1, but it increments Updated by 1 only if successful.
	Created int64 `json:"created"`
	Updated int64 `json:"updated"`
	Deleted int64 `json:"deleted"`

	// SetOp counter is the number of queries that used a set op.
	SetOp int64 `json:"set-op"`

	// Labels stats represent the number of labels in read, update, and delete
	// queries. The metric is incremented in these API endpoints:
	//   GET    /api/v1/entities/:type (read)
	//   POST   /api/v1/query/:type    (read)
	//   PUT    /api/v1/entities/:type (update bulk)
	//   DELETE /api/v1/entities/:type (delete bulk)
	// The metric counts all labels in the query. See MetricsLabelReport for
	// label-specific counters.
	//
	// For example, with query "a=1,!b,c in (x,y)" the label count is 3.
	Labels_min int64 `json:"labels_min"`
	Labels_max int64 `json:"labels_max"`
	Labels_avg int64 `json:"labels_avg"`
	Labels_med int64 `json:"labels_med"`

	// LatencyMs stats represent query latency (response time) in milliseconds
	// for all queries (read and write). Low query latency is not a problem,
	// so stats only represent the worst case: high query latency. _p99 is the
	// 99th percentile (ignoring the top 1% as outliers). _p999 is the 99.9th
	// percentile (ignoring the top 0.1% as outliers).
	LatencyMs_max  float64 `json:"latency-ms_max"`
	LatencyMs_p99  float64 `json:"latency-ms_p99"`
	LatencyMs_p999 float64 `json:"latency-ms_p999"`

	// MissSLA counter is the number of queries with LatencyMs greater than
	// the configured query latency SLA (config.metrics.query_latency_sla).
	MissSLA int64 `json:"miss-sla"`

	// QueryTimeout counter is the number of queries which took too  long
	// to execute and were cancelled. The default query timeout is set by
	// server config datasource.query.query_timeout, or by client header
	// X-Etre-Query-Timeout. QueryTimeout and MissSLA are independent.
	QueryTimeout int64 `json:"query-timeout"`
}

MetricsQueryReport are measurements related to querying an entity type. These are the most commonly used metrics: QPS, read size, insert/update/delete rates, etc.

Non-counter metrics are sampled and aggregated on report. The aggregate function name is added as a suffix like "_min" and "_max".

type MetricsRequestReport added in v0.9.2

type MetricsRequestReport struct {
	DbError     int64 `json:"db-error"`
	APIError    int64 `json:"api-error"`
	ClientError int64 `json:"client-error"`

	// AuthorizationFailed counter is the number of authorization failures.
	// The caller authenticated, but ACLs do not allow the request.
	AuthorizationFailed int64 `json:"authorization-failed"`

	// InvalidEntityType counter is the number of invalid entity types the caller
	// tried to query. The API returns HTTP status 400 (bad request) and an etre.Error
	// message.
	InvalidEntityType int64 `json:"invalid-entity-type"`
}

MetricsRequestReport are measurements related to the request, not an entity type. For example, if the caller requests an invalid entity type, the InvalidEntityType counter is incremented and the API returns HTTP status 400 (bad request).

type MetricsSystemReport added in v0.9.2

type MetricsSystemReport struct {
	// Query counter is the grand total number of queries. This counts every API query
	// at the start of the HTTP request before authentication, validation, etc.
	Query int64 `json:"query"`

	// Load gauge is the current number of running queries.
	Load int64 `json:"load"`

	// Error counter is the grand total number of errors. This counts every
	// error regardless of type: auth, client, database, timeout, internal, etc.
	Error int64 `json:"error"`

	// AuthenticationFailed counter is the number of authentication failures.
	// The API returns HTTP status 401 (unauthorized). If the caller fails to
	// authenticate, only Query and AuthenticationFailed are incremented.
	AuthenticationFailed int64 `json:"authentication-failed"`
}

MetricsSystemReport is the report of system metrics.

type MockCDCClient

type MockCDCClient struct {
	StartFunc func(time.Time) (<-chan CDCEvent, error)
	StopFunc  func()
	PingFunc  func(time.Duration) Latency
	ErrorFunc func() error
}

func (MockCDCClient) Error

func (c MockCDCClient) Error() error

func (MockCDCClient) Ping

func (c MockCDCClient) Ping(timeout time.Duration) Latency

func (MockCDCClient) Start

func (c MockCDCClient) Start(startTs time.Time) (<-chan CDCEvent, error)

func (MockCDCClient) Stop

func (c MockCDCClient) Stop()

type MockEntityClient

type MockEntityClient struct {
	QueryFunc       func(string, QueryFilter) ([]Entity, error)
	InsertFunc      func([]Entity) (WriteResult, error)
	UpdateFunc      func(query string, patch Entity) (WriteResult, error)
	UpdateOneFunc   func(id string, patch Entity) (WriteResult, error)
	DeleteFunc      func(query string) (WriteResult, error)
	DeleteOneFunc   func(id string) (WriteResult, error)
	LabelsFunc      func(id string) ([]string, error)
	DeleteLabelFunc func(id string, label string) (WriteResult, error)
	EntityTypeFunc  func() string
	WithSetFunc     func(Set) EntityClient
	WithTraceFunc   func(string) EntityClient
}

MockEntityClient implements EntityClient for testing. Defined callback funcs are called for the respective interface method, otherwise the default methods return empty slices and no error. Defining a callback function allows tests to intercept, save, and inspect Client calls and simulate Etre API returns.

func (MockEntityClient) Delete

func (c MockEntityClient) Delete(query string) (WriteResult, error)

func (MockEntityClient) DeleteLabel

func (c MockEntityClient) DeleteLabel(id string, label string) (WriteResult, error)

func (MockEntityClient) DeleteOne

func (c MockEntityClient) DeleteOne(id string) (WriteResult, error)

func (MockEntityClient) EntityType

func (c MockEntityClient) EntityType() string

func (MockEntityClient) Insert

func (c MockEntityClient) Insert(entities []Entity) (WriteResult, error)

func (MockEntityClient) Labels

func (c MockEntityClient) Labels(id string) ([]string, error)

func (MockEntityClient) Query

func (c MockEntityClient) Query(query string, filter QueryFilter) ([]Entity, error)

func (MockEntityClient) Update

func (c MockEntityClient) Update(query string, patch Entity) (WriteResult, error)

func (MockEntityClient) UpdateOne

func (c MockEntityClient) UpdateOne(id string, patch Entity) (WriteResult, error)

func (MockEntityClient) WithSet

func (c MockEntityClient) WithSet(set Set) EntityClient

func (MockEntityClient) WithTrace

func (c MockEntityClient) WithTrace(trace string) EntityClient

type QueryFilter

type QueryFilter struct {
	// ReturnLabels defines labels included in matching entities. An empty slice
	// returns all labels, including meta-labels. Else, only labels in the slice
	// are returned.
	ReturnLabels []string

	// Distinct returns unique entities if ReturnLabels contains a single value.
	// Etre returns an error if enabled and ReturnLabels has more than one value.
	Distinct bool
}

QueryFilter represents filtering options for EntityClient.Query().

type RevOrder

type RevOrder struct {
	// contains filtered or unexported fields
}

RevOrder handles entity revision ordering. Normally, an Etre CDC feed sends entities in revision order (ascending revision numbers). It is possible, although extremely unlikely, that revisions can be received out of order. A RevOrder handles this by buffering the out-of-order revisions until a complete, in-order sequence is received. It can be used by a CDC feed consumer like:

revo := etre.NewRevOrder(0) // 0 = DEFAULT_MAX_ENTITIES

// Handle out-of-order revisions (per entity ID)
ok, prev := revo.InOrder(event)
if !ok {
    // This rev is out of order, skip (buffered in revo)
    continue
}

// Sync ordered set of previously out-of-order revs
if prev != nil {
    for _, p := range prev {
        if err := sync(p); err != nil {
            return err
        }
    }
}

// Sync the current rev (in order)
if err := sync(event); err != nil {
    return err
}

Using a RevOrder is optional but a best practice to be safe.

func NewRevOrder

func NewRevOrder(maxEntities uint, ignorePastRevs bool) *RevOrder

NewRevOrder returns a new RevOrder that tracks the last revision of at most maxEntities using an LRU cache. If NewRevOrder is zero, DEFAULT_MAX_ENTITIES is used. It's not possible or necessary to track all entities. If an entity hasn't been seen in awhile (e.g. 1,000 entities later), we presume that it won't be seen again, but if it is we can further presume that the next revision is the correct next revision to the previous revision seen but evicted from the LRU cache.

If ignorePastRevs is true, a revision less than the current in-sync revision is ignored; else, it causes a panic. If ordering entities from a single source, such as cdc.Store.Read(), ignorePastRevs should be false. But if merging multiple sources that can overlap (i.e. return the same event more than once, ignorePastRevs sholud be true to ignore past revisions. RevOrder returns correct results in both cases, the only difference is that ignorePastRevs = false is more strict.

func (*RevOrder) InOrder

func (r *RevOrder) InOrder(e CDCEvent) (bool, []CDCEvent)

InOrder returns true if the given event is in order. If not (false), the caller should skip and ignore the event. If true, a non-nil slice of previous revisions is returned when they complete an in-order sequnce including the given event. In this case, the caller should only sync the slice of events.

type Set

type Set struct {
	Id   string
	Op   string
	Size int
}

A Set is a user-defined logical grouping of writes (insert, update, delete).

type Write

type Write struct {
	EntityId string `json:"entityId"`       // internal _id of entity (all write ops)
	URI      string `json:"uri,omitempty"`  // fully-qualified address of new entity (insert)
	Diff     Entity `json:"diff,omitempty"` // previous entity label values (update)
}

Write represents the successful write of one entity.

type WriteResult

type WriteResult struct {
	Writes []Write `json:"writes"`          // successful writes
	Error  *Error  `json:"error,omitempty"` // error before, during, or after writes
}

WriteResult represents the result of a write operation (insert, update delete). On success or failure, all write ops return a WriteResult.

If Error is set (not nil), some or all writes failed. Writes stop on the first error, so len(Writes) = index into slice of entities sent by client that failed. For example, if the first entity causes an error, len(Writes) = 0. If the third entity fails, len(Writes) = 2 (zero indexed).

func (WriteResult) IsZero

func (wr WriteResult) IsZero() bool

Directories

Path Synopsis
Package api provides API endpoints and controllers.
Package api provides API endpoints and controllers.
Package app provides app context and extensions: hooks and plugins.
Package app provides app context and extensions: hooks and plugins.
Package auth provides team-based authentication and authorization.
Package auth provides team-based authentication and authorization.
bin
cdc
Package cdc provides interfaces for reading and writing change data capture (CDC) events.
Package cdc provides interfaces for reading and writing change data capture (CDC) events.
es
Package es provides a framework for integration with other programs.
Package es provides a framework for integration with other programs.
app
Package app provides app-wide data structs and functions.
Package app provides app-wide data structs and functions.
bin
config
Package config handles config files, -config, and env vars at startup.
Package config handles config files, -config, and env vars at startup.
Package metrics provides Etre metrics.
Package metrics provides Etre metrics.
Package query is a wrapper for Kubernetes Labels Selector (KLS).
Package query is a wrapper for Kubernetes Labels Selector (KLS).
Package test provides helper functions for tests.
Package test provides helper functions for tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL