watch

package
Version: v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2018 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package watch provides the ability to set a watch on a Rego query. A watch will monitor the query and determine when any of it's dependencies change, notifying the client of the new results of the query evaluation whenever this occurs.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event struct {
	Query string         `json:"query"`
	Value rego.ResultSet `json:"value"`
	Error error          `json:"error,omitempty"`

	Metrics metrics.Metrics      `json:"-"`
	Tracer  topdown.BufferTracer `json:"-"`
}

Event represents a change to a query. Query is the query in question and Value is the JSON encoded results of the new query evaluation. Error will be populated if evaluating the new query results encountered an error for any reason. If Error is not nil, the contents of Value are undefined.

Metrics and Tracer represent the metrics and trace from the evaluation of the query.

type Handle

type Handle struct {
	C <-chan Event
	// contains filtered or unexported fields
}

Handle allows a user to listen to and end a watch on a query.

func (*Handle) Start added in v0.7.0

func (h *Handle) Start() error

Start registers and starts the watch.

func (*Handle) Stop

func (h *Handle) Stop()

Stop ends the watch on the query associated with the Handle. It will close the channel that was delivering notifications through the Handle. This may happen before or after Stop returns.

func (*Handle) WithInstrumentation added in v0.7.0

func (h *Handle) WithInstrumentation(yes bool) *Handle

WithInstrumentation enables instrumentation on the query to diagnose performance issues.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher allows for watches to be registed on queries.

func New

func New(ctx context.Context, s storage.Store, c *ast.Compiler, txn storage.Transaction) (w *Watcher, err error)

New creates and returns a new Watcher on the store using the compiler provided. Once a compiler is provided to create a Watcher, it must not be modified, or else the results produced by the Watcher are undefined.

func (*Watcher) Close

func (w *Watcher) Close(txn storage.Transaction)

Close ends the watches on all queries this Watcher has.

Further attempts to register or end watches will result in an error after Close() is called.

func (*Watcher) Migrate

func (w *Watcher) Migrate(c *ast.Compiler, txn storage.Transaction) (*Watcher, error)

Migrate creates a new Watcher with the same watches as w, but using the new compiler. Like when creating a Watcher with New, the provided compiler must not be modified after being passed to Migrate, or else behavior is undefined.

After Migrate returns, the old watcher will be closed, and the new will be ready for use. All Handles from the old watcher will still be active, via the returned Watcher, with the exception of those Handles who's query is no longer valid with the new compiler. Such Handles will be shutdown and a final Event sent along their channel indicating the cause of the error.

If an error occurs creating the new Watcher, the state of the old Watcher will not be changed.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/open-policy-agent/opa/ast"
	"github.com/open-policy-agent/opa/storage"
	"github.com/open-policy-agent/opa/storage/inmem"
	"github.com/open-policy-agent/opa/util"
	"github.com/open-policy-agent/opa/watch"
)

func main() {
	ctx := context.Background()
	store := inmem.NewFromObject(loadSmallTestData())

	// This example syncs the reader and writing to make the output deterministic.
	var notifyAlert chan struct{}
	done := make(chan struct{})
	gotNotification1 := make(chan struct{})
	gotNotification2 := make(chan struct{})

	txn, err := store.NewTransaction(ctx, storage.WriteParams)
	if err != nil {
		// Handle error
	}

	// Create a new Watcher that uses the given store and compiler to monitor
	// queries. The watcher must be creating inside a transaction so that it can
	// properly hook into the store.
	w, err := watch.New(ctx, store, ast.NewCompiler(), txn)
	if err != nil {
		// Handle error
	}

	if err := store.Commit(ctx, txn); err != nil {
		// Handle error
	}

	handle1, err := w.Query("x = data.y")
	if err != nil {
		// Handle error
	}

	go func() {
		for e := range handle1.C {
			value := fmt.Sprint(e.Value)
			if len(e.Value) > 0 {
				value = fmt.Sprint(e.Value[0].Bindings)
			}
			fmt.Printf("%s: %s (%v)\n", e.Query, value, e.Error)

			if notifyAlert != nil {
				notifyAlert <- struct{}{}
			}
			gotNotification1 <- struct{}{}
		}
		done <- struct{}{}
	}()

	// One notification will be sent on watch creation with the initial query
	// value. It will be empty since the document we are watching is not yet defined.
	<-gotNotification1

	mod, err := ast.ParseModule("example", "package y\nr = s { s = data.a }")
	if err != nil {
		// Handle error
	}

	compiler := ast.NewCompiler()
	if compiler.Compile(map[string]*ast.Module{"example": mod}); compiler.Failed() {
		// Handle error
	}

	if txn, err = store.NewTransaction(ctx, storage.WriteParams); err != nil {
		// Handle error
	}

	// The handle from before will still be active after we migrate to the
	// new compiler. Changes to data.a will now cause notifications since data.y now
	// exists.
	m, err := w.Migrate(compiler, txn)
	if err != nil {
		// Handle error
	}

	if err := store.Commit(ctx, txn); err != nil {
		// Handle error
	}

	// After migrating, all existing watches will get a notification, as if they had
	// just started.
	<-gotNotification1

	// The old watcher will be closed. Watches can no longer be registered on it.
	_, err = w.Query("foo")
	fmt.Println(err)

	handle2, err := m.Query("y = data.a")
	if err != nil {
		// Handle error
	}

	go func() {
		for e := range handle2.C {
			if notifyAlert != nil {
				<-notifyAlert
			} else {
				notifyAlert = make(chan struct{})
			}

			value := fmt.Sprint(e.Value)
			if len(e.Value) > 0 {
				value = fmt.Sprint(e.Value[0].Bindings)
			}
			fmt.Printf("%s: %s (%v)\n", e.Query, value, e.Error)
			gotNotification2 <- struct{}{}
		}
		done <- struct{}{}
	}()
	<-gotNotification2

	for i := 0; i < 4; i++ {
		path, _ := storage.ParsePath(fmt.Sprintf("/a/%d", i))
		if err := storage.WriteOne(ctx, store, storage.ReplaceOp, path, json.Number(fmt.Sprint(i))); err != nil {
			// Handle error
		}

		<-gotNotification1
		<-gotNotification2
	}

	// Ending the queries will close the notification channels.
	handle1.Stop()
	handle2.Stop()
	<-done
	<-done

}

func loadSmallTestData() map[string]interface{} {
	var data map[string]interface{}
	err := util.UnmarshalJSON([]byte(`{
        "a": [1,2,3,4],
        "b": {
            "v1": "hello",
            "v2": "goodbye"
        },
        "c": [{
            "x": [true, false, "foo"],
            "y": [null, 3.14159],
            "z": {"p": true, "q": false}
        }],
        "d": {
            "e": ["bar", "baz"]
        },
		"g": {
			"a": [1, 0, 0, 0],
			"b": [0, 2, 0, 0],
			"c": [0, 0, 0, 4]
		},
		"h": [
			[1,2,3],
			[2,3,4]
		]
    }`), &data)
	if err != nil {
		panic(err)
	}
	return data
}
Output:


x = data.y: [] (<nil>)
x = data.y: map[x:map[r:[1 2 3 4]]] (<nil>)
cannot start query watch with closed Watcher
y = data.a: map[y:[1 2 3 4]] (<nil>)
x = data.y: map[x:map[r:[0 2 3 4]]] (<nil>)
y = data.a: map[y:[0 2 3 4]] (<nil>)
x = data.y: map[x:map[r:[0 1 3 4]]] (<nil>)
y = data.a: map[y:[0 1 3 4]] (<nil>)
x = data.y: map[x:map[r:[0 1 2 4]]] (<nil>)
y = data.a: map[y:[0 1 2 4]] (<nil>)
x = data.y: map[x:map[r:[0 1 2 3]]] (<nil>)
y = data.a: map[y:[0 1 2 3]] (<nil>)

func (*Watcher) NewQuery added in v0.7.0

func (w *Watcher) NewQuery(query string) *Handle

NewQuery returns a new watch Handle that can be run. Callers must invoke the Run function on the handle to start the watch.

func (*Watcher) Query

func (w *Watcher) Query(query string) (*Handle, error)

Query registers a watch on the provided Rego query. Whenever changes are made to a base or virtual document that the query depends on, an Event describing the new result of the query will be sent through the Handle.

Query will return an error if registering the watch fails for any reason.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/open-policy-agent/opa/ast"
	"github.com/open-policy-agent/opa/storage"
	"github.com/open-policy-agent/opa/storage/inmem"
	"github.com/open-policy-agent/opa/util"
	"github.com/open-policy-agent/opa/watch"
)

func main() {
	ctx := context.Background()
	store := inmem.NewFromObject(loadSmallTestData())

	// This example syncs the reader and writing to make the output deterministic.
	done := make(chan struct{})
	gotNotification := make(chan struct{})

	mod, err := ast.ParseModule("example", "package y\nr = s { s = data.a }")
	if err != nil {
		// Handle error
	}

	compiler := ast.NewCompiler()
	if compiler.Compile(map[string]*ast.Module{"example": mod}); compiler.Failed() {
		// Handle error
	}

	txn, err := store.NewTransaction(ctx, storage.WriteParams)
	if err != nil {
		// Handle error
	}

	// Create a new Watcher that uses the given store and compiler to monitor
	// queries. The watcher must be creating inside a transaction so that it can
	// properly hook into the store.
	w, err := watch.New(ctx, store, compiler, txn)
	if err != nil {
		// Handle error
	}
	if err := store.Commit(ctx, txn); err != nil {
		// Handle error
	}

	// Create a new watch on the query. Whenever its result changes, the result of
	// the change will be sent to `handle.C`.
	handle, err := w.Query("x = data.y")
	if err != nil {
		// Handle error
	}

	go func() {
		for e := range handle.C {
			fmt.Printf("%s: %v (%v)\n", e.Query, e.Value[0].Bindings, e.Error)

			gotNotification <- struct{}{}
		}
		close(done)
	}()
	<-gotNotification // One notification will be sent on watch creation with the initial query value.

	for i := 0; i < 4; i++ {
		path, _ := storage.ParsePath(fmt.Sprintf("/a/%d", i))
		if err := storage.WriteOne(ctx, store, storage.ReplaceOp, path, json.Number(fmt.Sprint(i))); err != nil {
			// Handle error
		}

		<-gotNotification
	}

	// Ending the query will close `handle.C`.
	handle.Stop()
	<-done

}

func loadSmallTestData() map[string]interface{} {
	var data map[string]interface{}
	err := util.UnmarshalJSON([]byte(`{
        "a": [1,2,3,4],
        "b": {
            "v1": "hello",
            "v2": "goodbye"
        },
        "c": [{
            "x": [true, false, "foo"],
            "y": [null, 3.14159],
            "z": {"p": true, "q": false}
        }],
        "d": {
            "e": ["bar", "baz"]
        },
		"g": {
			"a": [1, 0, 0, 0],
			"b": [0, 2, 0, 0],
			"c": [0, 0, 0, 4]
		},
		"h": [
			[1,2,3],
			[2,3,4]
		]
    }`), &data)
	if err != nil {
		panic(err)
	}
	return data
}
Output:


x = data.y: map[x:map[r:[1 2 3 4]]] (<nil>)
x = data.y: map[x:map[r:[0 2 3 4]]] (<nil>)
x = data.y: map[x:map[r:[0 1 3 4]]] (<nil>)
x = data.y: map[x:map[r:[0 1 2 4]]] (<nil>)
x = data.y: map[x:map[r:[0 1 2 3]]] (<nil>)

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL