query

package module
v1.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2023 License: MIT Imports: 6 Imported by: 6

README

Go Query Bus

A query bus to fetch all the things.

Maintainability Test Coverage GoDoc

Installation

go get github.com/io-da/query

Overview

  1. Queries
  2. Handlers
  3. Result
  4. Iterator Handlers
  5. Iterator Result
  6. Error Handlers
    1. Available Errors
  7. Cache Adapters
  8. The Bus
    1. Tweaking Performance
    2. Shutting Down
    3. Available Errors
  9. Benchmarks
  10. Examples

Introduction

This library is intended for anyone looking to query for data in a decoupled architecture. No reflection, no closures.

Getting Started

Queries

Queries are any type that implements the Query interface. Ideally they should contain immutable data.

type Query interface {
    ID() []byte
}

Queries can optionally implement the Cacheable interface for builtin caching.

type Cacheable interface {
    CacheKey() []byte
    CacheDuration() time.Duration
}
Handlers

Handlers are any type that implements the Handler interface. Handlers must be instantiated and provided to the bus using the bus.Handlers function.

type Handler interface {
    Handle(qry Query, res *Result) error
}

Handlers catch the query (stop propagation) whenever they explicitly use res.Done(). Otherwise the query will be provided to all the handlers that expect it. This strategy can be used to have multiple fallback handlers for the same query or have the Result be populated by multiple handlers.
Whenever a query fails to be handled, the bus will throw an error. A query is considered handled whenever any data is provided to the result or when the function res.Handled() is explicitly used.

Result

Result is the struct returned from bus.Query. This is where the data fetched will reside.
The handlers provide the data to the result using the functions res.Add or res.Set.
This data can then be retrieved by using the the functions res.First (to retrieve only the first result) or res.All (to return the whole data slice).

Iterator Handlers

Iterator handlers are any type that implements the IteratorHandler interface. Iterator handlers must be instantiated and provided to the bus using the bus.InitializeIteratorHandlers function.

type IteratorHandler interface {
    Handle(qry Query, res *IteratorResult) error
}

These behave nearly identical to normal handlers. However there are a couple of differences:

  • Expect an IteratorResult instead of Result.
  • Can not be cached.

Iterator handlers are intended to be used with large sets of data. Providing a possibility to iterate over the data without additional preloading.

Iterator Result

IteratorResult is the struct returned from bus.IteratorQuery. This struct acts as a proxy between the handlers and the consumer.
The handlers provide the data to the result using the function res.Yield.
This data can then be processed while being populated using the the function res.Iterate.

Error Handlers

Error handlers are any type that implements the ErrorHandler interface. Error handlers are optional (but advised) and provided to the bus using the bus.ErrorHandlers function.

type ErrorHandler interface {
    Handle(qry Query, err error)
}

Any time an error occurs within the bus, it will be passed on to the error handlers. This strategy can be used for decoupled error handling.

Available Errors

Below is a list of errors that can occur.

// query.InvalidQueryError  
// query.QueryBusNotInitializedError
// query.QueryBusIsShuttingDownError
// query.ErrorNoQueryHandlersFound
// query.ErrorQueryTimedOut

type errorHandler struct {}
func (e errorHandler) Handle(qry Query, err error) {
    switch(err.(type)) {
        case query.InvalidQueryError:
            // do something
        case query.QueryBusNotInitializedError:
            // do something
        case query.QueryBusIsShuttingDownError:
            // do something
        case query.ErrorQueryTimedOut, query.ErrorNoQueryHandlersFound:
            // do something
        default:
            // do something
    }
}
bus.ErrorHandlers(errorHandler)

Cache Adapters

Cache adapters are any type that implements the CacheAdapter interface. Cache adapters are optional (but advised) and provided to the bus using the bus.CacheAdapters function.

type CacheAdapter interface {
    Set(qry Cacheable, res *Result) bool
    Get(qry Cacheable) *Result
    Expire(qry Cacheable)
    Shutdown()
}

Just as the query handlers, this approach allows the usage of different cache adapters for different query types.
If the cache adapter returns true on Set the bus will assume the result was successfully cached.
On retrieval the bus will return the results from the first adapter that returns data for the given query. The order of the adapters is always respected.
By default the bus comes with a MemoryCacheAdapter. This adapter will cache the results in memory and supports duration specification on the order of microseconds (accuracy depends on server load). Expired results will be automatically cleared from memory.

The Bus

Bus is the struct that will be used for all the application's queries.
The Bus should be instantiated (NewBus()) and initialized(bus.InitializeIteratorHandlers) on application startup.
The initialization is only required for iterator queries and is separated from the instantiation for dependency injection purposes.
The application should instantiate the Bus once and then use it's reference for all the queries.
The order in which the handlers are provided to the Bus is always respected. This is the order used when propagating queries.

Tweaking Performance

The number of workers for iterator queries can be adjusted.

bus.IteratorWorkerPoolSize(10)

If used, this function must be called before the call to bus.InitializeIteratorHandlers. And it specifies the number of goroutines used to handle iterator queries.
In some scenarios increasing this value can drastically improve performance.
It defaults to the value returned by runtime.GOMAXPROCS(0).

The buffer size of the iterator query queue can also be adjusted.
Depending on the use case, this value may greatly impact performance.

bus.IteratorQueueBuffer(100)

If used, this function must be called before the call to bus.InitializeIteratorHandlers.
It defaults to 100.

The buffer size of the iterator results channel can also be adjusted.
Depending on the use case, this value may greatly impact performance.

bus.IteratorResultBuffer(0)

If used, this function may be called before any iterator query is performed.
It defaults to 0.

Shutting Down

The Bus also provides a shutdown function that attempts to gracefully stop the query bus and all its routines.

bus.Shutdown()

This function will block until the bus is fully stopped.

Benchmarks

The query handler returns a single value for simulation purposes.

Benchmark Type Time
Queries 201 ns/op
IteratorQueries 783 ns/op

Iterator queries add a small overhead and are not worth when used for small sets of data (also due to lack of caching). They are better suited to iterate over large sets of data while avoiding preloading.

Examples

Example Queries

A struct query.

type Foo struct {
    bar string
}
func (*Foo) ID() []byte {
    return []byte("FOO-UUID")
}

A string query that also implements caching.

type Bar string
func (Bar) ID() []byte {
    return []byte("BAR-UUID")
}
func (Bar) CacheKey() []byte {
    return []byte("BAR-CACHE-KEY")
}
func (Bar) CacheDuration() time.Duration {
    return time.Minute * 5
}
Example Handlers

A query handler that listens to multiple query types.

type FooBarHandler struct {
}

func (hdl *FooBarHandler) Handle(qry Query, res *Result) error {
    // check the query type
    switch qry := qry.(type) {
    case *Foo:
        // handler logic
        res.Add("Bar")
    case Bar:
        // handler logic
        res.Add("Foo")
    }
    return nil
}

An iterator query handler.

type FooBarIteratorHandler struct {
}

func (hdl *FooBarIteratorHandler) Handle(qry Query, res *IteratorResult) error {
    // check the query type
    switch qry := qry.(type) {
    case *Foo:
        // handler logic
        res.Yield("Bar")
    }
    return nil
}
Putting it together

Initialization and usage of the exemplified queries and handlers

import (
    "github.com/io-da/query"
)

func main() {
    // instantiate the bus (returns *query.Bus)
    bus := query.NewBus()
    
    // provide the bus with all of the application's query handlers
    bus.Handlers(
        &FooBarHandler{},
    )
    
    // initialize the bus with all of the application's iterator query handlers
    bus.InitializeIteratorHandlers(
        &FooBarIteratorHandler{},
    )
    
    // query away!
    res1, err := bus.Query(&Foo{})
    // get the first result only
    val := res1.First() // "Bar"

    res2, err := bus.Query(Bar("Bar"))
    // get all the results
    vals := res2.All() // ["Foo"]

    res3, err := bus.IteratorQuery(&Foo{})
    // range over the values, processing them while they are being populated
    for val := range res.Iterate() {
        // do something with the val
        // "Bar"
    }
}

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Documentation

Index

Constants

View Source
const (
	// InvalidQueryError is a constant equivalent of the ErrorInvalidQuery error.
	InvalidQueryError = ErrorInvalidQuery("query: invalid query")
	// BusNotInitializedError is a constant equivalent of the ErrorBusNotInitialized error.
	BusNotInitializedError = ErrorBusNotInitialized("query: the bus is not initialized")
	// BusIsShuttingDownError is a constant equivalent of the ErrorBusIsShuttingDown error.
	BusIsShuttingDownError = ErrorBusIsShuttingDown("query: the bus is shutting down")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is the only struct exported and required for the query bus usage. The Bus should be instantiated using the NewBus function.

func NewBus

func NewBus() *Bus

NewBus instantiates the Bus struct. The Initialization of IteratorHandlers is performed separately (InitializeIteratorHandlers function) for dependency injection purposes.

func (*Bus) CacheAdapters

func (bus *Bus) CacheAdapters(adps ...CacheAdapter)

CacheAdapters may optionally be provided. They will be used instead of the default MemoryCacheAdapter.

func (*Bus) ErrorHandlers

func (bus *Bus) ErrorHandlers(hdls ...ErrorHandler)

ErrorHandlers may optionally be provided. They will receive any error thrown during the querying process.

func (*Bus) Handlers

func (bus *Bus) Handlers(hdls ...Handler)

Handlers for the regular queries.

func (*Bus) InitializeIteratorHandlers

func (bus *Bus) InitializeIteratorHandlers(hdls ...IteratorHandler)

InitializeIteratorHandlers initializes the query bus to support iterator queries.

func (*Bus) IteratorQuery

func (bus *Bus) IteratorQuery(qry Query) (*IteratorResult, error)

IteratorQuery uses a channel to iterate the results while they are being populated. *Iterator queries are not cached*.

func (*Bus) IteratorQueueBuffer

func (bus *Bus) IteratorQueueBuffer(buf int)

IteratorQueueBuffer may optionally be provided to tweak the buffer size of the iterator query queue. This value may have high impact on performance depending on the use case. It can only be adjusted *before* the bus is initialized. It defaults to 100.

func (*Bus) IteratorResultBuffer

func (bus *Bus) IteratorResultBuffer(buf int)

IteratorResultBuffer may optionally be provided to tweak the buffer size of the results channel for iterator queries. This value may have high impact on performance depending on the use case. It defaults to 1.

func (*Bus) IteratorWorkerPoolSize

func (bus *Bus) IteratorWorkerPoolSize(workerPoolSize int)

IteratorWorkerPoolSize may optionally be provided to tweak the iteratorWorker pool size for iterator query queue. It can only be adjusted *before* the bus is initialized. It defaults to the value returned by runtime.GOMAXPROCS(0).

func (*Bus) Query

func (bus *Bus) Query(qry Query) (*Result, error)

Query for a single result or a pre-populated collection.

func (*Bus) Shutdown

func (bus *Bus) Shutdown()

Shutdown the query bus gracefully. *Queries handled while shutting down will be disregarded*.

type CacheAdapter

type CacheAdapter interface {
	Set(qry Cacheable, res *Result) bool
	Get(qry Cacheable) *Result
	Expire(qry Cacheable)
	Shutdown()
}

CacheAdapter must be implemented for a type to qualify as a cache adapter.

type Cacheable

type Cacheable interface {
	CacheKey() []byte
	CacheDuration() time.Duration
}

Cacheable is an interface used to allow queries to cache their results.

type ErrorBusIsShuttingDown

type ErrorBusIsShuttingDown string

ErrorBusIsShuttingDown is used when queries are handled but the bus is shutting down.

func (ErrorBusIsShuttingDown) Error

func (e ErrorBusIsShuttingDown) Error() string

Error returns the string message of ErrorBusIsShuttingDown.

type ErrorBusNotInitialized

type ErrorBusNotInitialized string

ErrorBusNotInitialized is used when queries are handled but the bus is not initialized.

func (ErrorBusNotInitialized) Error

func (e ErrorBusNotInitialized) Error() string

Error returns the string message of ErrorBusNotInitialized.

type ErrorHandler

type ErrorHandler interface {
	Handle(qry Query, err error)
}

ErrorHandler must be implemented for a type to qualify as an error handler.

type ErrorInvalidQuery

type ErrorInvalidQuery string

ErrorInvalidQuery is used when invalid queries are handled.

func (ErrorInvalidQuery) Error

func (e ErrorInvalidQuery) Error() string

Error returns the string message of ErrorInvalidQuery.

type ErrorNoQueryHandlersFound

type ErrorNoQueryHandlersFound struct {
	// contains filtered or unexported fields
}

ErrorNoQueryHandlersFound is used when not a single handler is found for a specific query.

func NewErrorNoQueryHandlersFound

func NewErrorNoQueryHandlersFound(query Query) ErrorNoQueryHandlersFound

NewErrorNoQueryHandlersFound creates a new ErrorNoQueryHandlersFound.

func (ErrorNoQueryHandlersFound) Error

Error returns the string message of ErrorNoQueryHandlersFound.

type ErrorQueryTimedOut

type ErrorQueryTimedOut struct {
	// contains filtered or unexported fields
}

ErrorQueryTimedOut is used when the handling of a query times out.

func NewErrorQueryTimedOut

func NewErrorQueryTimedOut(query Query) ErrorQueryTimedOut

NewErrorQueryTimedOut creates a new ErrorQueryTimedOut.

func (ErrorQueryTimedOut) Error

func (e ErrorQueryTimedOut) Error() string

Error returns the string message of ErrorQueryTimedOut.

type Handler

type Handler interface {
	Handle(qry Query, res *Result) error
}

Handler must be implemented for a type to qualify as a query handler.

type IteratorHandler

type IteratorHandler interface {
	Handle(qry Query, res *IteratorResult) error
}

IteratorHandler must be implemented for a type to qualify as an iterator query handler.

type IteratorResult

type IteratorResult struct {
	// contains filtered or unexported fields
}

IteratorResult is the struct returned from iterator queries.

func (*IteratorResult) Done

func (res *IteratorResult) Done()

Done explicitly marks this result as handled and final. Meaning it will not be propagated to other handlers.

func (*IteratorResult) Handled

func (res *IteratorResult) Handled()

Handled explicitly marks this result as handled (so no error is thrown if no data is found)

func (*IteratorResult) IsCached

func (res *IteratorResult) IsCached() bool

IsCached can be used to verify if this result was retrieved from cache.

func (*IteratorResult) IsFresh

func (res *IteratorResult) IsFresh() bool

IsFresh can be used to verify if this result is fresh.

func (*IteratorResult) Iterate

func (res *IteratorResult) Iterate() <-chan interface{}

Iterate is used to process the values that are being yielded

func (*IteratorResult) Yield

func (res *IteratorResult) Yield(data interface{})

Yield is used to provide values while they are being processed

type MemoryCacheAdapter

type MemoryCacheAdapter struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MemoryCacheAdapter is the struct used for memory caching purposes.

func NewMemoryCacheAdapter

func NewMemoryCacheAdapter() *MemoryCacheAdapter

NewMemoryCacheAdapter initializes a new *MemoryCacheAdapter. This function will also initialize the respective cleaner routine.

func (*MemoryCacheAdapter) Expire

func (ad *MemoryCacheAdapter) Expire(qry Cacheable)

Expire can optionally be used to forcibly expire a query cache.

func (*MemoryCacheAdapter) Get

func (ad *MemoryCacheAdapter) Get(qry Cacheable) *Result

Get retrieves the cached result for the provided query.

func (*MemoryCacheAdapter) Set

func (ad *MemoryCacheAdapter) Set(qry Cacheable, res *Result) bool

Set stores the cache value for the given query.

func (*MemoryCacheAdapter) Shutdown

func (ad *MemoryCacheAdapter) Shutdown()

Shutdown is used to stop the cleaner routine.

type Query

type Query interface {
	ID() []byte
}

Query is the interface that must be implemented by any type to be considered a query.

type Result

type Result struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Result is the struct returned from regular queries.

func (*Result) Add

func (res *Result) Add(data interface{})

Add an entry to the data slice

func (*Result) All

func (res *Result) All() []interface{}

All returns the data slice

func (*Result) CacheKey

func (res *Result) CacheKey() []byte

CacheKey is used to identify which key was used to cache this result

func (*Result) CachedAt

func (res *Result) CachedAt() time.Time

CachedAt is used to identify at which point this result was cached

func (*Result) Done

func (res *Result) Done()

Done explicitly marks this result as handled and final. Meaning it will not be propagated to other handlers.

func (*Result) ExpiresAt

func (res *Result) ExpiresAt() time.Time

ExpiresAt is used to identify at which point this result expires

func (*Result) First

func (res *Result) First() interface{}

First returns the first value of the data slice

func (*Result) Handled

func (res *Result) Handled()

Handled explicitly marks this result as handled (so no error is thrown if no data is found)

func (*Result) IsCached

func (res *Result) IsCached() bool

IsCached can be used to verify if this result was retrieved from cache.

func (*Result) IsFresh

func (res *Result) IsFresh() bool

IsFresh can be used to verify if this result is fresh.

func (*Result) Set

func (res *Result) Set(data []interface{})

Set all the data of this result

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL