coherence

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: UPL-1.0 Imports: 40 Imported by: 2

Documentation

Overview

Package coherence provides a set of functions and interfaces for Go programs to act as cache clients to a Coherence Cluster using gRPC for the network transport.

Your cluster must be running Coherence Community Edition (CE) 22.06.4+ or Coherence commercial version 14.1.1.2206.4+ and must be running a gRPC Proxy.

Two interfaces, NamedMap and NamedCache, are available to access Coherence caches. NamedCache is syntactically identical in behaviour to a NamedMap, but additionally implements the PutWithExpiry operation.

Introduction

The Coherence Go client provides the following features:

  • Familiar Map-like interface for manipulating cache entries including but not limited to Put, PutWithExpiry, PutIfAbsent, PutAll, Get, GetAll, Remove, Clear, GetOrDefault, Replace, ReplaceMapping, Size, IsEmpty, ContainsKey, ContainsValue, ContainsEntry
  • Cluster-side querying, aggregation and filtering of map entries
  • Cluster-side manipulation of map entries using EntryProcessors
  • Registration of listeners to be notified of mutations such as
  • insert, update and delete on Maps, map lifecycle events such as truncated, released or destroyed and session lifecycle events such as connected, disconnected, reconnected and closed
  • Support for storing Go structs as JSON as well as the ability to serialize to Java objects on the server for access from other Coherence language API's
  • Near cache support to cache frequently accessed data in the Go client to avoid sending requests across the network
  • Support for simple and double-ended queues in Coherence Community Edition 24.09+ and commercial version 14.1.2.0+
  • Full support for Go generics in all Coherence API's

For more information on Coherence caches, please see the Coherence Documentation.

Supported Go versions

This API fully supports Go Generics and is only supported for use with Go versions 1.19 and above.

Obtaining a Session

Example:

import (
    coherence "github.com/oracle/coherence-go-client/v2/coherence"
)

...

session, err := coherence.NewSession(ctx)
if err != nil {
    log.Fatal(err)
}
defer session.Close()

The NewSession function creates a new session that will connect to a gRPC proxy server on "localhost:1408" by default.

You can specify the host and port to connect to by specifying the environment variable COHERENCE_SERVER_ADDRESS. See gRPC Naming for information on values for this.

You can also pass coherence.WithAddress("host:port") to specify the gRPC host and port to connect to. The default connection mode is with SSL enabled, but you can use plan-text via using coherence.WithPlainText().

session, err := coherence.NewSession(ctx, coherence.WithPlainText(), coherence.WithAddress("my-host:7574"))

You are also able to use the 'coherence' gRPC resolver address of "coherence:///host:port" to connect to the Coherence Name Service, running on the cluster port, and automatically discover the gRPC endpoints. For example:

session, err := coherence.NewSession(ctx, coherence.WithPlainText(), coherence.WithAddress("coherence:///localhost:7574"))

If you have multiple clusters on the same port, you can also append the cluster name to specify which cluster you wish to contact.

coherence.WithAddress("coherence:///localhost:7574/cluster2")

When using the 'coherence' gRPC resolver, the Go client randomizes any addresses that are returned to help load balance across gRPC proxies. You can turn this off by setting the environment variable COHERENCE_RESOLVER_RANDOMIZER=false.

To Configure SSL, you must first enable SSL on the gRPC Proxy, see gRPC Proxy documentation for details. Refer to the section on NewSession for more information on setting up a SSL connection on the client.

See SessionOptions which lists all the options supported by the Session API.

Controlling timeouts

Most operations you call require you to supply a context.Context. If your context does not contain a deadline, the operation will wrap your context in a new context.WithTimeout using either the default timeout of 30,000 millis or the value you set using option coherence.WithRequestTimeout when you called NewSession.

For example, to override the default request timeout of 30,000 millis with one of 5 seconds for a Session you can do the following:

session, err = coherence.NewSession(ctx, coherence.WithRequestTimeout(time.Duration(5) * time.Second))

You can also override the default request timeout using the environment variable COHERENCE_CLIENT_REQUEST_TIMEOUT.

By default, if an endpoint is not ready, the Go client will fail-fast. You can change this behaviour by setting the option coherence.WithReadyTimeout to a value millis value greater than zero which will cause the Go client to wait until up to the timeout specified until it fails if no endpoint is available. You can also use the environment variable COHERENCE_READY_TIMEOUT.

You also have the ability to control maximum amount of time, in milliseconds, a Session may remain in a disconnected state without successfully reconnecting. For this you use the option coherence.WithDisconnectTimeout or the environment variable COHERENCE_SESSION_DISCONNECT_TIMEOUT.

Obtaining a NamedMap or NamedCache

Once a session has been created, the GetNamedMap(session, name, ...options) or GetNamedCache(session, name, ...options) can be used to obtain an instance of a NamedMap or NamedCache. The key and value types must be provided as generic type arguments. This identifier may be shared across clients. It's also possible to have many [NamedMap]s or [NamedCache]s defined and in use simultaneously.

Example:

session, err := coherence.NewSession(ctx)
if err != nil {
    log.Fatal(err)
}
defer session.Close()

namedMap, err := coherence.GetNamedMap[int, string](session, "customers")
if err != nil {
    log.Fatal(err)
}

If you wish to create a NamedCache, which supports expiry, you can use the GetNamedCache function and then use the PutWithExpiry function call.

namedCache, err := coherence.GetNamedCache[int, string](session, "customers")
if err != nil {
    log.Fatal(err)
}

_, err = namedCache.PutWithExpiry(ctx, person1.ID, person1, time.Duration(5)*time.Second)

If your NamedCache requires the same expiry for every entry, you can use the coherence.WithExpiry cache option. Each call to Put will use the default expiry you have specified. If you use PutWithExpiry, this will override the default expiry for that key.

namedCache, err := coherence.GetNamedCache[int, Person](session, "cache-expiry", coherence.WithExpiry(time.Duration(5)*time.Second))

See CacheOptions which lists all the options supported by the GetNamedCache or GetNamedMap API.

Basic CRUD operations

Note: See the examples on GitHub for detailed examples.

Assuming a very trivial NamedMap with integer keys and string values.

session, err := coherence.NewSession(coherence.WithPlainText())
if err != nil {
    log.Fatal(err)
}

namedMap, err := coherence.GetNamedMap[int, string](session, "my-map")
if err != nil {
    log.Fatal(err)
}

ctx := context.Background()

// put a new key / value
if _, err = namedMap.Put(ctx, 1, "one"); err != nil {
    log.Fatal(err)
}

// get the value for the given key
if value, err = namedMap.Get(ctx, 1); err != nil {
    log.Fatal(err)
}
fmt.Println("Value for key 1 is", *value)

// update the value for key 1
if _, err = namedMap.Put(ctx, 1, "ONE"); err != nil {
    log.Fatal(err)
}

// retrieve the updated value for the given key
if value, err = namedMap.Get(ctx, 1); err != nil {
    log.Fatal(err)
}
fmt.Println("Updated value is", *value)

if _, err = namedMap.Remove(ctx, 1); err != nil {
    log.Fatal(err)
}

Note: Keys and values are serialized to JSON and stored in Coherence as a com.oracle.coherence.io.json.JsonObject. if you wish to store structs as native Java objects, then please see the section further down on "Serializing to Java Objects on the Server".

Working with structs

type Person struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Age  int    `json:"age"`
}

// create a new NamedMap of Person with key int
namedMap, err := coherence.GetNamedMap[int, Person](session, "test")
if err != nil {
    log.Fatal(err)
}

// clear the Map
if err = namedMap.Clear(ctx); err != nil {
    log.Fatal(err)
}

newPerson := Person{ID: 1, Name: "Tim", Age: 21}
fmt.Println("Add new Person", newPerson)
if _, err = namedMap.Put(ctx, newPerson.Id, newPerson); err != nil {
    log.Fatal(err)
}

// retrieve the Person
if person, err = namedMap.Get(ctx, 1); err != nil {
    log.Fatal(err)
}
fmt.Println("Person from Get() is", *person)

// Update the age using and entry processor for in-place processing
_, err = coherence.Invoke[int, Person, bool](ctx, namedMap, 1, processors.Update("age", 56))
if err != nil {
    log.Fatal(err)
}

// retrieve the updatedPerson
if person, err = namedMap.Get(ctx, 1); err != nil {
    log.Fatal(err)
}
fmt.Println("Person is", *person)

Querying and filtering using channels

Channels are used to deal with individual keys, values or entries streamed from the backend using a filter or an open query. Depending upon the operation, each result element is wrapped in one of the structs StreamedEntry, StreamedValue or StreamedKey which wraps an error and a Key and/or a Value. As always, the Err object must be checked for errors before accessing the Key or Value fields. All functions that return channels are EntrySetFilter, KeySetFilter, ValuesFilter, EntrySet, KeySet, Values, InvokeAll and InvokeAllFilter.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// extractors
age := extractors.Extract[int]("age")
name := extractors.Extract[string]("name")

// retrieve all people aged > 30
ch := namedMap.EntrySetFilter(ctx, filters.Greater(age, 20))
for result := range ch {
    if result.Err != nil {
        log.Fatal(result.Err)
    }
    fmt.Println("Key:", result.Key, "Value:", result.Value)
}

// we can also do more complex filtering such as looking for people > 30 and where there name begins with 'T'
ch := namedMap.EntrySetFilter(ctx, filters.Greater(age, 20).And(filters.Like(name, "T%", true)))

Using entry processors for in-place processing

A Processor is an object that allows you to process (update) one or more NamedMap entries on the NamedMap itself, instead of moving the entries to the client across the network. In other words, using processors we send the processing to where the data resides thus avoiding massive data movement across the network. Processors can be executed against all entries, a single key or against a set of entries that match a Filter.

To demonstrate this, lets assume we have a NamedMap populated with Person struct below, and we want to run various scenarios to increase peoples salary by using a processors.Multiply processor.

type Person struct {
    Id     int     `json:"id"`
    Name   string  `json:"name"`
    Salary float32 `json:"salary"`
    Age    int     `json:"age"`
    City   string  `json:"city"`
}

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")

// 1. Increase the salary of the person with Id = 1
newSalary, err = coherence.Invoke[int, Person, float32](ctx, namedMap, 1, processors.Multiply("salary", 1.1, true))

city := extractors.Extract[string]("city")

// 2. Increase the salary of all people in Perth
ch2 := coherence.InvokeAllFilter[int, Person, float32](ctx, namedMap, filters.Equal(city, "Perth"), processors.Multiply("salary", 1.1, true))
for result := range ch2 {
    if result.Err != nil {
        log.Fatal(result.Err)
    }
}

// 3. Increase the salary of people with Id 1 and 5
ch2 := coherence.InvokeAllKeys[int, Person, float32](ctx, namedMap, []int{1, 5}, processors.Multiply("salary", 1.1, true))
for result := range ch2 {
    if result.Err != nil {
        log.Fatal(result.Err)
    }
}

Aggregating cache data

Aggregators can be used to perform operations against a subset of entries to obtain a single result. Entry aggregation occurs in parallel across the grid to provide map-reduce support when working with large amounts of data.

To demonstrate this, lets assume we have a NamedMap populated with Person struct as per the previous example, and we want to run various scenarios to perform aggregations.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// Retrieve the distinct cities from all people
citiesValues, err := coherence.Aggregate(ctx, namedMap, extractors.Extract[string]("city"))
if err != nil {
    log.Fatal(err)
}
fmt.Println(*citiesValues)
// output: [Perth, Melbourne, Brisbane]

age := extractors.Extract[int]("age")

// minimum age across keys 3 and 4
ageResult, err = coherence.AggregateKeys(ctx, namedMap, []int{3, 4}, aggregators.Min(age))

// top 2 people by salary using filter
var salaryResult *[]Person
salaryResult, err = coherence.AggregateFilter[int, Person, []Person](ctx, namedMap, filters.Greater(age, 40),
    aggregators.TopN[float32, Person](extractors.Extract[float32]("salary"), false, 2))

Responding to cache events

The Coherence Go client provides the ability to add a MapListener that will receive events (inserts, updates, deletes) that occur against a NamedMap or NamedCache. You can listen for all events, events based upon a filter or vents based upon a key.

// in your main code, create a new NamedMap and register the listener
namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

listener := coherence.NewMapListener[int, Person]().OnUpdated(
func(e coherence.MapEvent[int, Person]) {
    key, err := e.Key()
    if err != nil {
        panic("unable to deserialize key")
    }

    newValue, err := e.NewValue()
    if err != nil {
        panic("unable to deserialize new value")
    }

    oldValue, err := e.OldValue()
    if err != nil {
        panic("unable to deserialize old value")
    }

    fmt.Printf("**EVENT=Updated: key=%v, oldValue=%v, newValue=%v\n", *key, *oldValue, *newValue)
})

if err = namedMap.AddListener(ctx, listener); err != nil {
    panic(err)
}

// ensure we unregister the listener
defer func(ctx context.Context, namedMap coherence.NamedMap[int, Person], listener coherence.MapListener[int, Person]) {
    _ = namedMap.RemoveListener(ctx, listener)
}(ctx, namedMap, listener)

// As you carry out operations that will mutate the cache entries, update the age to 56, you will see the events printed
_, err = coherence.Invoke[int, Person, bool](ctx, namedMap, 1, processors.Update("age", 56))
if err != nil {
    log.Fatal(err)
}

// output:
// **EVENT=Updated: key=1, oldValue={1 Tim 53}, newValue={1 Tim 53}

// you can also listen based upon filters, for example the following would create a
// listener for all entries where the salary is > 17000
if err = namedMap.AddFilterListener(ctx, listener,
    filters.Greater(extractors.Extract[int]("salary"), 17000)); err != nil {
    log.Fatal("unable to add listener", listener, err)
}

// You can also listen on a specific key, e.g. list on key 1.
listener := NewUpdateEventsListener[int, Person]()
if err = namedMap.AddKeyListener(ctx, listener, 1); err != nil {
    log.Fatal("unable to add listener", listener, err)
}

Responding to cache lifecycle events

The Coherence Go client provides the ability to add a MapLifecycleListener that will receive events (truncated and destroyed) that occur against a NamedMap or NamedCache.

// consider the example below where we want to listen for all 'truncate' events for a NamedMap.
// in your main code, create a new NamedMap and register the listener
namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// Create a listener and add to the cache
listener := coherence.NewMapLifecycleListener[int, Person]().
    OnTruncated(func(e coherence.MapLifecycleEvent[int, Person]) {
        fmt.Printf("**EVENT=%s: source=%v\n", e.Type(), e.Source())
    })

namedMap.AddLifecycleListener(listener)
defer namedMap.RemoveLifecycleListener(listener)

newPerson := Person{ID: 1, Name: "Tim", Age: 21}
fmt.Println("Add new Person", newPerson)
if _, err = namedMap.Put(ctx, newPerson.Id, newPerson); err != nil {
    log.Fatal(err)
}

if size, err = namedMap.Size(ctx); err != nil {
    log.Fatal(err)
}
fmt.Println("Cache size is", size, "truncating cache")

if err = namedMap.Truncate(ctx); err != nil {
    log.Fatal(err)
}

time.Sleep(time.Duration(5) * time.Second)

// output:
// Add new Person {1 Tim 53}
// Cache size is 1 truncating cache
// **EVENT=Truncated: value=NamedMap{name=people, format=json}

Responding to session lifecycle events

The Coherence Go client provides the ability to add a SessionLifecycleListener that will receive events (connected, closed, disconnected or reconnected) that occur against the Session. Note: These events use and experimental gRPC API so may not be reliable or may change in the future. This is due to the experimental nature of the underlying gRPC API.

Consider the example below where we want to listen for all 'All' events for a Session. in your main code, create a new Session and register the listener

// create a new Session
session, err := coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
    log.Fatal(err)
}

// Create a listener to listen for session events
listener := coherence.NewSessionLifecycleListener().
    OnAny(func(e coherence.SessionLifecycleEvent) {
        fmt.Printf("**EVENT=%s: source=%v\n", e.Type(), e.Source())
})

session.AddSessionLifecycleListener(listener)
defer session.RemoveSessionLifecycleListener(listener)

// create a new NamedMap of Person with key int
namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// clear the Map
if err = namedMap.Clear(ctx); err != nil {
    log.Fatal(err)
}

session.Close()

time.Sleep(time.Duration(5) * time.Second)

// output:
// 2023/01/31 11:15:37 connected session 59f3ec81-dda1-41b7-92de-70aad3d26615 to address localhost:1408
// 2023/01/31 11:15:38 closed session 59f3ec81-dda1-41b7-92de-70aad3d26615
// **EVENT=session_closed: source=SessionID=59f3ec81-dda1-41b7-92de-70aad3d26615, closed=true, caches=0, maps=0

Working with Queues

When connecting to a Coherence CE cluster versions 24.09 or above or commercial 14.1.2.0.+, you have the ability to create two main types of queues, a NamedQueue or NamedDequeue.

A NamedQueue is a simple FIFO queue which can be one of two types: either Queue - a simple queue which stores data in a single partition and is limited to approx 2GB of storage, or PagedQueue which distributes data over the cluster and is only limited by the cluster capacity.

A NamedDequeue is a simple double-ended queue that stores data in a single partition.

Queues in general have the following methods. See NamedQueue for the full list.

- PeekHead(ctx context.Context) (*V, error) - retrieve but not remove the value at the head of this queue

- PollHead(ctx context.Context) (*V, error - retrieves and removes the head of this queue

- OfferTail(ctx context.Context, value V) error - inserts the specified value to the end of this queue if it is possible to do so

Consider the example below where we want to create a standard queue and add 10 entries, and then retrieve 10 entries. We have specified coherence.Queue as the type but this could also be coherence.PagedQueue.

namedQueue, err := coherence.GetNamedQueue[string](ctx, session, "my-queue", coherence.Queue)
if err != nil {
    panic(err)
}

// add an entry to the tail of the queue
for i := 1; i <= iterations; i++ {
    v := fmt.Sprintf("value-%v", i)
    log.Printf("OfferTail() %s to the queue\n", v)
    err = namedQueue.OfferTail(ctx, v)
    if err != nil {
        panic(err)
    }
}
// output:
// Offer() value-1 to the queue
// ...
// Offer() value-10 to the queue

// Poll 10 entries from the head of the queue
for i := 1; i <= iterations; i++ {
    value, err = namedQueue.PollHead(ctx)
    if err != nil {
        panic(err)
    }
    log.Printf("Poll() returned: %s\n", *value)
}

// output:
// Poll() returned: value-1
// ...
// Poll() returned: value-10

// try to read again should get nil as nothing left on the queue
value, err = namedQueue.PollHead(ctx)
if err != nil {
    panic(err)
}
log.Println("last value is", value)
// output: last value is nil

The NamedDequeue is a double-ended queue and has the following additional functions:

- OfferHead(ctx context.Context, value V) error - inserts the specific value at the head of this queue

- PollTail(ctx context.Context) (*V, error) - retrieves and removes the tail of this queue

- PeekTail(ctx context.Context) (*V, error) - retrieves, but does not remove, the tail of this queue

In the following example, we are using a NamedDequeue or double-ended queue, where we have the ability to add or offer data to the head of the queue as well as the end of the queue, and also poll and peek the the end of the queue.

namedQueue, err := coherence.GetNamedDeQueue[string](ctx, session, "double-ended-queue")
if err != nil {
    panic(err)
}

// add 10 entries to the end (tail) of the queue
for i := 1; i <= iterations; i++ {
    v := fmt.Sprintf("value-%v", i)
    log.Printf("OfferTail() %s to the queue\n", v)
    err = namedQueue.OfferTail(ctx, v)
    if err != nil {
        panic(err)
    }
}

// output:
// 2024/11/27 11:05:37 OfferTail() value-1 to the queue
// ..
// 2024/11/27 11:05:37 OfferTail() value-10 to the queue

// Offer a value to the head
err = namedQueue.OfferHead(ctx, "value-head")
if err != nil {
    panic(err)
}

// peek the tail of the queue
value, err = namedQueue.PeekTail(ctx)
if err != nil {
    panic(err)
}
log.Printf("PeekTail() returned: %s\n", *value)

// output:
// 2024/11/27 11:05:37 PeekTail() returned: value-10

// poll for iterations +1 because we added another entry to the head
for i := 1; i <= iterations+1; i++ {
    value, err = namedQueue.PollHead(ctx)
    if err != nil {
        panic(err)
    }
    log.Printf("PollHead() returned: %s\n", *value)
}

// output:
// 2024/11/27 11:05:37 PollHead() returned: value-head (the value we added to the head)
// 2024/11/27 11:05:37 PollHead() returned: value-1
// ..
// 2024/11/27 11:05:37 PollHead() returned: value-10

// try to read again should get nil
value, err = namedQueue.PollHead(ctx)
if err != nil {
    panic(err)
}
log.Println("last value is", value)

// output:
// 2024/11/27 11:05:37 last value is <nil>

See the Queues documentation for more information on using queues on the Coherence Server.

Responding to queue lifecycle events

The Coherence Go client provides the ability to add a QueueLifecycleListener that will receive events (truncated, released and destroyed) that occur against a NamedQueue.

// consider the example below where we want to listen for all 'QueueReleased' events for a NamedQueue.
// in your main code, create a new NamedQueue and register the listener.
// Note: this is a contrived example, but you can listen for QueueDestroyed and QueueTruncated events
// in a similar way.

namedQueue, err := coherence.GetNamedQueue[string](session, "queue", coherence.Queue)
if err != nil {
    log.Fatal(err)
}

// Create a listener to monitor
listener := coherence.NewQueueLifecycleListener[string]().
    OnTruncated(func(e coherence.QueueLifecycleEvent[string]) {
        fmt.Printf("**EVENT=%s: source=%v\n", e.Type(), e.Source())
    })

_ = namedQueue.AddLifecycleListener(listener)
defer namedQueue.RemoveLifecycleListener(listener)

namedQueue.Release()

// sleep to ensure we receive the event before we close
time.Sleep(5 * time.Second)

// output:
// 2024/11/28 11:40:58 INFO: Session [b1435a16-f210-4289-97e4-e1654947acd5] connected to [localhost:1408] Coherence version: 24.09, serverProtocolVersion: 1, proxyMemberId: 1
// **EVENT=queue_released: source=NamedQueue{name=queue-events, type=Queue, queueID=1198559040}
// 2024/11/28 11:41:03 INFO: Session [b1435a16-f210-4289-97e4-e1654947acd5] closed

Serializing to Java objects on the server

By default, the Coherence Go client serializes any keys and values to JSON and then stores them as JsonObjects in Coherence. This is usually sufficient for most applications where you are only accessing your data via the Go client.

If you wish to access your data via other clients such as Java, JavaScript, C++, .NET or Python, it's best to use Java classes, known to Coherence server, representing the data model. The following describes how to achieve interoperability with Java.

Step 1. Create your Java Classes

Firstly you must define your data model for all Java classes and configure for JSON serialization. You do not need to annotate all the attributes with @JsonbProperty, but it is a good practice so that you have consistent names with Go. Below is a shorted version of a Customer class without all the extras such as getters, setters, hashCode, etc, that you know you need. In the example below I am using standard Java serialization, but you can use POF serialization if you have that configured.

package com.oracle.demo;

public class Customer implements Serializable {
	public Customer() {} // required

	@JsonbProperty("id")
	private int id;

	@JsonbProperty("customerName")
	private String customerName;

	@JsonbProperty("outstandingBalance")
	private double outstandingBalance;

	...

Step 2. Define your type alias.

In the code deployed to your Coherence storage-nodes, you need to create a file in your resources root called META-INF/type-aliases.properties which contains an alias and fully qualified class name for each of your classes.

# Example META-INF/type-aliases.properties file
customer=com.oracle.demo.Customer
order=com.oracle.demo.Order

Step 3. Define your Go structs

Next you need to define your Go structs with JSON names matching your Java objects. You also need to include a Class attribute with the JSON attribute name of "@class". We will set this in our object to the value "customer" matching the value in the type-aliases.properties on the server.

type Customer struct {
    Class              string   `json:"@class"`
    ID                 int      `json:"id"`
    CustomerName       string   `json:"customerName"`
    OutstandingBalance float32  `json:"outstandingBalance"`
}

Step 4. Create and put the value

Lastly, when you create a Customer object you must set the Class value matching the alias above.

customer := Customer{
    Class:              "customer",
    ID:                 1,
    CustomerName:       "Tim",
    OutstandingBalance: 10000,
}

// store the entry in Coherence, it will be stored as a com.oracle.demo.Customer POJO!

_, err = namedMap.Put(ctx, customer.ID, customer)
if err != nil {
    log.Fatal(err)
}

Using Near Caches

The Coherence Go client allows you to specify a near cache to cache frequently accessed data in your Go application. When you access data using Get() or GetAll() operations, returned entries are stored in the near cache and subsequent data access for keys in the near cache is almost instant where without a near cache each operation above always results in a network call.

On creating a near cache, Coherence automatically adds a MapListener to your NamedMap or NamedCache which listens on all cache events and updates or invalidates entries in the near cache that have been changed or removed on the server.

To manage the amount of memory used by the near cache, the following options are supported when creating one:

  • time-to-live (TTL) – objects expired after time in near cache, e.g. 5 minutes
  • High-Units – maximum number of cache entries in the near cache
  • Memory – maximum amount of memory used by cache entries

Note: You can specify either High-Units or Memory and in either case, optionally, a TTL.

Note: The minimum expiry time for a near cache entry is 1/4 second. This is to ensure that expiry of elements is as efficient as possible. You will receive an error if you try to set the TTL to a lower value.

The above can be specified by passing NearCacheOptions within WithNearCache when creating a NamedMap or NamedCache. See below for various ways of creating near caches.

You can ask a NamedMap or NamedCache for its near cache statistics by calling GetNearCacheStats(). Various statistics are recorded in regard to the near cache and can be seen via the CacheStats interface. If the NamedMap or NamedCache does not have a near cache, nil will be returned.

1. Creating a Near Cache specifying time-to-live (TTL)

The following example shows how to get a named cache that will cache entries from Get() or GetAll() for up to 30 seconds.

// specify a TTL of 30 seconds
nearCacheOptions := coherence.NearCacheOptions{TTL: time.Duration(30) * time.Second}

namedMap, err := coherence.GetNamedMap[int, string](session, "customers", coherence.WithNearCache(&nearCacheOptions))
if err != nil {
    log.Fatal(err)
}

// issue first Get for data in the cache on the storage-nodes. Entries found will be stored in near cache
value, err = namedMap.Get(ctx, 1)
if err != nil {
    panic(err)
}

// subsequent access will be almost instant from near cache
value, err = namedMap.Get(ctx, 1)

// you can check the near cache stats
fmt.Println("Near cache size is", namedMap.GetNearCacheStats().Size())

// output "Near cache size is 1"

2. Creating a Near Cache specifying maximum number of entries to store

The following example shows how to get a named cache that will cache up to 100 entries from Get() or GetAll(). When the threshold of HighUnits is reached, the near cache is pruned to the default of 80% of its size and evicts least recently accessed and created entries.

Note: The default prune percentage is 0.8 (80%) which indicates the percentage of the total number of units that will remain after the cache manager prunes the near cache( i.e. this is the "low watermark" value). This can be changed by setting the PruneFactory to a value in the range 0.1 to 1.0 in NearCacheOptions.

// specify HighUnits of 1000
nearCacheOptions := coherence.NearCacheOptions{HighUnits: 1000}

namedMap, err := coherence.GetNamedMap[int, string](session, "customers", coherence.WithNearCache(&nearCacheOptions))
if err != nil {
    log.Fatal(err)
}

// assume we have 2000 entries in the coherence cache, issue 1000 gets and the near cache will have 100 entries
for i := 1; i <= 1000; i++ {
	_, err = namedMap.Get(ctx, i)
	if err != nil {
		panic(err)
	}
}

fmt.Println("Near cache size is", namedMap.GetNearCacheStats().Size())
// output: "Near cache size is 1000"

// issue a subsequent Get() for an entry not in the near cache and the cache will be pruned to 80%
customer, err = namedMap.Get(ctx, 1)

fmt.Println("Near cache size is", namedCache.GetNearCacheStats().Size())
// output: "Near cache size is 800"

3. Creating a Near Cache specifying maximum memory to use

The following example shows how to get a named cache that will cache up to 10KB of entries from Get() or GetAll(). When the threshold of HighUnits is reached, the near cache is pruned to 80% of its size and evicts least recently accessed and created entries.

// specify HighUnits of 1000
nearCacheOptions := coherence.NearCacheOptions{HighUnitsMemory: 10 * 1024}

namedMap, err := coherence.GetNamedMap[int, string](session, "customers", coherence.WithNearCache(&nearCacheOptions))
if err != nil {
    log.Fatal(err)
}

// assume we have 5000 entries in the coherence cache, issue 5000 gets and the near cache will be pruned and
// not have the full 5000 entries as it does not fit within 10KB.
for i := 1; i <= 5000; i++ {
	_, err = namedMap.Get(ctx, i)
	if err != nil {
		panic(err)
	}
}

// print the near cache stats via String()
fmt.Println(namedMap.GetNearCacheStats())
// localCache{name=my-near-cache-high-units, options=localCacheOptions{ttl=0s, highUnits=1000, highUnitsMemory=0B, pruneFactor=0.80, invalidation=ListenAll}, stats=CacheStats{puts=1001, gets=1002, hits=1, misses=1001, missesDuration=4.628931138s,
// hitRate=0.0998004, prunes=1, prunesDuration=181.533µs, expires=0, expiresDuration=0s, size=200, memoryUsed=53.2KB}}

Index

Constants

View Source
const (
	// EntryInserted this event indicates that an entry has been added to the cache.
	EntryInserted MapEventType = "insert"

	// EntryUpdated this event indicates that an entry has been updated in the cache.
	EntryUpdated MapEventType = "update"

	// EntryDeleted this event indicates that an entry has been removed from the cache.
	EntryDeleted MapEventType = "delete"

	// Destroyed raised when a storage for a given cache is destroyed
	// usually as a result of a call to NamedMap.Destroy().
	Destroyed MapLifecycleEventType = "map_destroyed"

	// Truncated raised when a storage for a given cache is truncated
	// as a result of a call to NamedMap.Truncate().
	Truncated MapLifecycleEventType = "map_truncated"

	// Released raised when the local resources for a cache has been released
	// as a result of a call to NamedMap.Release().
	Released MapLifecycleEventType = "map_released"

	// Connected raised when the session has connected.
	Connected SessionLifecycleEventType = "session_connected"

	// Disconnected raised when the session has disconnected.
	Disconnected SessionLifecycleEventType = "session_disconnected"

	// Reconnected raised when the session has re-connected.
	Reconnected SessionLifecycleEventType = "session_reconnected"

	// Closed raised when the session has been closed.
	Closed SessionLifecycleEventType = "session_closed"
)
View Source
const (
	KB = 1024
	MB = KB * KB
	GB = MB * KB
)

Variables

View Source
var (
	// ErrDestroyed indicates that the NamedMap or NamedCache has been destroyed and can no-longer be used.
	ErrDestroyed = errors.New("the NamedMap or NamedCache has been destroyed and is not usable")

	// ErrReleased indicates that the NamedMap or NamedCache has been released and can no-longer be used.
	ErrReleased = errors.New("the NamedMap or NamedCache has been released and is not usable")

	// ErrClosed indicates that the session has been closed.
	ErrClosed = errors.New("the session is closed and is not usable")

	// ErrShutdown indicates the gRPC channel has been shutdown.
	ErrShutdown = errors.New("gRPC channel has been shutdown")
)
View Source
var (
	ErrQueueFailedOffer         = errors.New("did not return success for offer")
	ErrQueueDestroyedOrReleased = errors.New("this queue has been destroyed or released")
	ErrQueueNoSupported         = errors.New("the coherence server version must support protocol version 1 or above to use queues")
)
View Source
var (
	ErrInvalidFormat             = errors.New("format can only be 'json'")
	ErrInvalidNearCache          = errors.New("you must specify at least one near cache option")
	ErrInvalidNearCacheWithTTL   = errors.New("when using TTL for near cache you can only specify highUnits or highUnitsMemory")
	ErrInvalidNearCacheTTL       = errors.New("minimum near cache TTL is 1/4 of a second")
	ErrInvalidNearCacheWithNoTTL = errors.New("you can only specify highUnits or highUnitsMemory, not both")
	ErrNegativeNearCacheOptions  = errors.New("you cannot specify negative values for near cache options")
	ErrInvalidPruneFactor        = errors.New("prune factor must be between 0.1 and 1.0")
)

ErrInvalidFormat indicates that the serialization format can only be JSON.

View Source
var (
	INFO    logLevel = "INFO:"
	WARNING logLevel = "WARN:"
	ERROR   logLevel = "ERROR:"
	DEBUG   logLevel = "DEBUG:"
	RCV              = "RCV"
	SND              = "SND"
)
View Source
var (

	// ErrDone indicates that there are no more entries to return.
	ErrDone = errors.New("iterator done")
)
View Source
var (
	ErrNotSupported = errors.New("this attribute is not support for gRPC v0 clients")
)

Functions

func AddIndex

func AddIndex[K comparable, V, T, E any](ctx context.Context, nm NamedMap[K, V], extractor extractors.ValueExtractor[T, E], sorted bool) error

AddIndex adds the index based upon the supplied extractors.ValueExtractor. The type parameters are T = type to extract from and E = type of the extracted value.

The example below shows how to add a sorted index (on age) on the age attribute.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = coherence.AddIndex(ctx, namedMap, extractors.Extract[int]("age"), true); err != nil {
    log.Fatal(err)
}

func AddIndexWithComparator

func AddIndexWithComparator[K comparable, V, T, E any](ctx context.Context, nm NamedMap[K, V], extractor extractors.ValueExtractor[T, E], comparator extractors.ValueExtractor[T, E]) error

AddIndexWithComparator adds the index based upon the supplied extractors.ValueExtractor and comparator. The type parameters are T = type to extract from and E = type of the extracted value.

The example below shows how to add an index on the age attribute sorted by name.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

err = coherence.AddIndexWithComparator(ctx, namedMap, extractors.Extract[int]("age"), extractors.Extract[int]("name"))
if err != nil {
    log.Fatal(err)
}

func Aggregate

func Aggregate[K comparable, V, R any](ctx context.Context, nm NamedMap[K, V], aggr aggregators.Aggregator[R]) (*R, error)

Aggregate performs an aggregating operation (identified by aggregator) against all the entries in a NamedMap or NamedCache. The type parameter is R = type of the result of the aggregation.

The example below shows how to get the average age of all people.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// Note: the Average aggregator returns a big.Rat
bigRat, err = coherence.Aggregate(ctx, namedMap aggregators.Average(extractors.Extract[int]("age")))
if err != nil {
    log.Fatal(err)
}
value, _ := bigRat.Float32()
fmt.Printf("Average age of people is %.2f\n", value)

func AggregateFilter

func AggregateFilter[K comparable, V, R any](ctx context.Context, nm NamedMap[K, V], filter filters.Filter, aggr aggregators.Aggregator[R]) (*R, error)

AggregateFilter performs an aggregating operation (identified by aggregator) against the set of entries selected by the specified filter. The type parameter is R = type of the result of the aggregation.

The example below shows how to get the count of people ages older than 19.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

count, err = coherence.AggregateFilter(ctx, namedMap, filters.Greater(extractors.Extract[int]("age"), 19), aggregators.Count())
if err != nil {
    log.Fatal(err)
}
fmt.Println("Number of people aged greater than 19 is", *count)

func AggregateKeys

func AggregateKeys[K comparable, V, R any](ctx context.Context, nm NamedMap[K, V], keys []K, aggr aggregators.Aggregator[R]) (*R, error)

AggregateKeys performs an aggregating operation (identified by aggregator) against the set of entries selected by the specified keys. The type parameter is R = type of the result of the aggregation.

The example below shows how to get the minimum age across the people with keys 3, 4, and 5.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

minAge, err = coherence.AggregateKeys(ctx, namedMap, []int{3, 4, 5}, aggregators.Min(extractors.Extract[int]("age")))
if err != nil {
    log.Fatal(err)
}
fmt.Println("Minimum age of people with keys 3, 4 and 5 is", *minAge)

func GetFilterListenerGroupMap

func GetFilterListenerGroupMap[K comparable, V any](namedMap NamedMap[K, V]) map[filters.Filter]*listenerGroupV1[K, V]

revive:disable:unexported-return

func GetKeyListenerGroupMap

func GetKeyListenerGroupMap[K comparable, V any](namedMap NamedMap[K, V]) map[K]*listenerGroupV1[K, V]

revive:disable:unexported-return

func GetNearCachePruneFactor

func GetNearCachePruneFactor[K comparable, V any](namedMap NamedMap[K, V]) float32

func GetSessionCacheID

func GetSessionCacheID(session *Session, cache string) *int32

GetSessionCacheID returns the cache id for a cache name

func GetSessionQueueID

func GetSessionQueueID(session *Session, queue string) *int32

GetSessionQueueID returns the queue id for a cache name

func Invoke

func Invoke[K comparable, V, R any](ctx context.Context, nm NamedMap[K, V], key K, proc processors.Processor) (*R, error)

Invoke the specified processor against the entry mapped to the specified key. Processors are invoked atomically against a specific entry as the process may mutate the entry. The type parameter is R = type of the result of the invocation.

The example below shows how to run an entry processor to increment the age of person identified by the key 1.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

newAge, err := coherence.Invoke[int, Person, int](ctx, namedMap, 1, processors.Increment("age", 1))
fmt.Println("New age is", *newAge)

func InvokeAll

func InvokeAll[K comparable, V any, R any](ctx context.Context, nm NamedMap[K, V], proc processors.Processor) <-chan *StreamedEntry[K, R]

InvokeAll invokes the specified function against all entries in a NamedMap. Functions are invoked atomically against a specific entry as the function may mutate the entry. The type parameter is R = type of the result of the invocation.

The example below shows how to run an entry processor to increment the age of all people. This function returns a stream of StreamedEntry[K, R] of the values changed.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := coherence.InvokeAll[int, Person, int](ctx, namedMap, processors.Increment("age", 1))
for se := range ch {
    // Check the error
    if se.Err != nil {
        // process the error
        log.Println(se.Err)
    } else {
        // process the result which will be the key of the person changed
        fmt.Println(se.Key, se.Value)
    }
}

func InvokeAllBlind

func InvokeAllBlind[K comparable, V any](ctx context.Context, nm NamedMap[K, V], proc processors.Processor) error

InvokeAllBlind invokes the specified function against all entries in a NamedMap but does not return results via a channel. This is a utility function that differs from InvokeAll and is useful when you do not care about the result of the InvokeAll, just if it succeeded or not. If error is nil then the operation was successful, otherwise the first error encountered is returned. If you wish to know all the errors use the InvokeAll function. When an error occurs the result of all the operations is undefined.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

err := coherence.InvokeAllBlind[int, Person](ctx, namedMap, processors.Increment("age", 1))
if err != nil {
    log.Fatal(err)
}

func InvokeAllFilter

func InvokeAllFilter[K comparable, V any, R any](ctx context.Context, nm NamedMap[K, V], fltr filters.Filter, proc processors.Processor) <-chan *StreamedEntry[K, R]

InvokeAllFilter invokes the specified function against the entries matching the specified filter. Functions are invoked atomically against a specific entry as the function may mutate the entry. The type parameter is R = type of the result of the invocation.

The example below shows how to run an entry processor to increment the age of any people older than 1. This function returns a stream of StreamedValue[R] of the values changed.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

age := extractors.Extract[int]("age")

ch := coherence.InvokeAllFilter[int, Person, int](ctx, namedMap, filters.Greater(age, 1), processors.Increment("age", 1))
for se := range ch {
    // Check the error
    if se.Err != nil {
        // process the error
        log.Println(se.Err)
    } else {
        // process the result which will be the person changed
        fmt.Println(se.Value)
    }
}

func InvokeAllFilterBlind

func InvokeAllFilterBlind[K comparable, V any](ctx context.Context, nm NamedMap[K, V], fltr filters.Filter, proc processors.Processor) error

InvokeAllFilterBlind invokes the specified function against the entries matching the specified filter but does not return results via a channel. This is a utility function that differs from InvokeAllFilter and is useful when you do not care about the result of the InvokeAllFilter, just if it succeeded or not. If error is nil then the operation was successful, otherwise the first error encountered is returned. If you wish to know all the errors use the InvokeAllFilter function. When an error occurs the result of all the operations is undefined.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

age := extractors.Extract[int]("age")

err := coherence.InvokeAllFilter[int, Person](ctx, namedMap, filters.Greater(age, 1), processors.Increment("age", 1))
if err != nil {
    log.Fatal(err)
}

func InvokeAllKeys

func InvokeAllKeys[K comparable, V any, R any](ctx context.Context, nm NamedMap[K, V], keys []K, proc processors.Processor) <-chan *StreamedEntry[K, R]

InvokeAllKeys invokes the specified function against the entries matching the specified keys. Functions are invoked atomically against a specific entry as the function may mutate the entry. The type parameter is R = type of the result of the invocation.

The example below shows how to run an entry processor to increment the age of any people with keys 1 and 2. This function returns a stream of StreamedEntry[K, R] of the values changed.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := coherence.InvokeAllKeys[int, Person, int](ctx, namedMap, []int{1, 2}, processors.Increment("age", 1))
for se := range ch {
    // Check the error
    if se.Err != nil {
        // process the error
        log.Println(se.Err)
    } else {
        // process the result which will be the key of the person changed
        fmt.Println(se.Key, se.Value)
    }
}

func InvokeAllKeysBlind

func InvokeAllKeysBlind[K comparable, V any](ctx context.Context, nm NamedMap[K, V], keys []K, proc processors.Processor) error

InvokeAllKeysBlind invokes the specified function against the entries matching the specified keys but does not return results via a channel. This is a utility function that differs from InvokeAllKeys and is useful when you do not care about the result of the InvokeAllKeys, just if it succeeded or not. If error is nil then the operation was successful, otherwise the first error encountered is returned. If you wish to know all the errors use the InvokeAllKeys function. When an error occurs the result of all the operations is undefined.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := coherence.InvokeAllKeysBlind[int, Person](ctx, namedMap, []int{1, 2}, processors.Increment("age", 1))
if err != nil {
    log.Fatal(err)
}

func NewEnsureCacheRequest

func NewEnsureCacheRequest(session *Session, cache string) (*pb1.ProxyRequest, error)

func NsLookupGrpcAddresses

func NsLookupGrpcAddresses(address string) ([]string, error)

NsLookupGrpcAddresses looks up grpc proxy server addresses based upon the provided name service address provided as host:port, e.g. localhost:7574[/cluster].

func RemoveIndex

func RemoveIndex[K comparable, V, T, E any](ctx context.Context, nm NamedMap[K, V], extractor extractors.ValueExtractor[T, E]) error

RemoveIndex removes index based upon the supplied extractors.ValueExtractor. The type parameters are T = type to extract from and E = type of the extracted value.

The example below shows how to remove and index on the age attribute.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = coherence.RemoveIndex(ctx, namedMap, extractors.Extract[int]("age")); err != nil {
    log.Fatal(err)
}

func TestAggregate

func TestAggregate(ctx context.Context, session *Session, cache string, agent []byte, keysOrFilter *pb1.KeysOrFilter) (*[]byte, error)

func TestClearCache

func TestClearCache(ctx context.Context, session *Session, cache string) error

func TestContainsEntry

func TestContainsEntry(ctx context.Context, session *Session, cache string, key []byte, value []byte) (bool, error)

func TestContainsKey

func TestContainsKey(ctx context.Context, session *Session, cache string, key []byte) (bool, error)

func TestContainsValue

func TestContainsValue(ctx context.Context, session *Session, cache string, value []byte) (bool, error)

func TestDestroyCache

func TestDestroyCache(ctx context.Context, session *Session, cache string) error

func TestEnsureCache

func TestEnsureCache(ctx context.Context, session *Session, cache string) (*int32, error)

func TestGet

func TestGet(ctx context.Context, session *Session, cache string, key []byte) (*[]byte, error)

func TestGetAll

func TestGetAll(ctx context.Context, session *Session, cache string, keys [][]byte) (<-chan BinaryKeyAndValue, error)

func TestInvoke

func TestInvoke(ctx context.Context, session *Session, cache string, agent []byte, keysOrFilter *pb1.KeysOrFilter) (<-chan BinaryKeyAndValue, error)

func TestIsEmpty

func TestIsEmpty(ctx context.Context, session *Session, cache string) (bool, error)

func TestIsReady

func TestIsReady(ctx context.Context, session *Session, cache string) (bool, error)

func TestKeyAndValuePage

func TestKeyAndValuePage(ctx context.Context, session *Session, cache string, cookie []byte) (<-chan BinaryKeyAndValue, error)

func TestMapListenerRequest

func TestMapListenerRequest(ctx context.Context, session *Session, cache string, subscribe bool, keyOrFilter *pb1.KeyOrFilter,
	lite bool, synchronous bool, priming bool, filterID int64) error

func TestPut

func TestPut(ctx context.Context, session *Session, cache string, key []byte, value []byte, ttl time.Duration) (*[]byte, error)

func TestPutAll

func TestPutAll(ctx context.Context, session *Session, cache string, entries []*pb1.BinaryKeyAndValue, ttl time.Duration) error

func TestPutIfAbsent

func TestPutIfAbsent(ctx context.Context, session *Session, cache string, key []byte, value []byte) (*[]byte, error)

func TestRemove

func TestRemove(ctx context.Context, session *Session, cache string, key []byte) (*[]byte, error)

func TestRemoveMapping

func TestRemoveMapping(ctx context.Context, session *Session, cache string, key []byte, value []byte) (bool, error)

func TestReplace

func TestReplace(ctx context.Context, session *Session, cache string, key []byte, value []byte) (*[]byte, error)

func TestReplaceMapping

func TestReplaceMapping(ctx context.Context, session *Session, cache string, key []byte, prevValue []byte, newValue []byte) (bool, error)

func TestSize

func TestSize(ctx context.Context, session *Session, cache string) (int32, error)

func TestTruncateCache

func TestTruncateCache(ctx context.Context, session *Session, cache string) error

func WithAddress

func WithAddress(host string) func(sessionOptions *SessionOptions)

WithAddress returns a function to set the address for session.

func WithDisconnectTimeout

func WithDisconnectTimeout(timeout time.Duration) func(sessionOptions *SessionOptions)

WithDisconnectTimeout returns a function to set the maximum amount of time, in millis, a Session may remain in a disconnected state without successfully reconnecting.

func WithExpiry

func WithExpiry(ttl time.Duration) func(cacheOptions *CacheOptions)

WithExpiry returns a function to set the default expiry for a NamedCache. This option is not valid on NamedMap.

func WithFormat

func WithFormat(format string) func(sessionOptions *SessionOptions)

WithFormat returns a function to set the format for a session. Currently, only "json" is supported.

func WithIgnoreInvalidCerts

func WithIgnoreInvalidCerts() func(sessionOptions *SessionOptions)

WithIgnoreInvalidCerts returns a function to set the connection to ignore invalid certificates for a session.

func WithNearCache

func WithNearCache(options *NearCacheOptions) func(cacheOptions *CacheOptions)

WithNearCache returns a function to set NearCacheOptions.

func WithPlainText

func WithPlainText() func(sessionOptions *SessionOptions)

WithPlainText returns a function to set the connection to plan text (insecure) for a session.

func WithReadyTimeout

func WithReadyTimeout(timeout time.Duration) func(sessionOptions *SessionOptions)

WithReadyTimeout returns a function to set the maximum amount of time an NamedMap or NamedCache operations may wait for the underlying gRPC channel to be ready. This is independent of the request timeout which sets a deadline on how long the call may take after being dispatched.

func WithRequestTimeout

func WithRequestTimeout(timeout time.Duration) func(sessionOptions *SessionOptions)

WithRequestTimeout returns a function to set the request timeout in millis.

func WithScope

func WithScope(scope string) func(sessionOptions *SessionOptions)

WithScope returns a function to set the scope for a session. This will prefix all caches with the provided scope to make them unique within a scope.

func WithTLSCertsPath

func WithTLSCertsPath(path string) func(sessionOptions *SessionOptions)

WithTLSCertsPath returns a function to set the (CA) certificates to be added for a session.

func WithTLSClientCert

func WithTLSClientCert(path string) func(sessionOptions *SessionOptions)

WithTLSClientCert returns a function to set the client certificate to be added for a session.

func WithTLSClientKey

func WithTLSClientKey(path string) func(sessionOptions *SessionOptions)

WithTLSClientKey returns a function to set the client key to be added for a session.

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) func(sessionOptions *SessionOptions)

WithTLSConfig returns a function to set the tls.Config directly. This is typically used when you require fine-grained control over these options.

Types

type BinaryKey

type BinaryKey struct {
	Key    []byte
	Err    error
	Cookie []byte
}

BinaryKey is an internal type exported only for serialization.

type BinaryKeyAndValue

type BinaryKeyAndValue struct {
	Key    []byte
	Value  []byte
	Err    error
	Cookie []byte
}

BinaryKeyAndValue is an internal type exported only for serialization.

type BinaryValue

type BinaryValue struct {
	Value  []byte
	Err    error
	Cookie []byte
}

BinaryValue is an internal type exported only for serialization.

type CacheOptions

type CacheOptions struct {
	DefaultExpiry    time.Duration
	NearCacheOptions *NearCacheOptions
}

CacheOptions holds various cache options.

type CacheStats

type CacheStats interface {
	GetCacheHits() int64                    // the number of entries served from the near cache
	GetCacheMisses() int64                  // the number of entries that had to be retrieved from the cluster
	GetCacheMissesDuration() time.Duration  // the total duration of all misses
	GetHitRate() float32                    // the hit rate of the near cache
	GetCachePuts() int64                    // the number of entries put in the near cache
	GetTotalGets() int64                    // the number of gets against the near cache
	GetCachePrunes() int64                  // the number of times the near cache was pruned
	GetCachePrunesDuration() time.Duration  // the duration of all prunes
	GetCacheEntriesPruned() int64           // the actual number of cache entries that were pruned
	GetCacheExpires() int64                 // the number of times the near cache expired entries
	GetCacheExpiresDuration() time.Duration // the duration of all expires
	GetCacheEntriesExpired() int64          // the actual number of cache entries that were expired
	Size() int                              // the number of entries in the near cache
	SizeBytes() int64                       // the number of bytes used by the entries (keys and values) in the near cache
	ResetStats()                            // reset the stats for the near cache, not including Size() or SizeBytes()
}

CacheStats contains various statistics for near caches.

type Entry

type Entry[K comparable, V any] struct {
	Key   K
	Value V
}

Entry represents a returned entry from entryPageIterator.

type EventSubmitter

type EventSubmitter interface {
	// contains filtered or unexported methods
}

type InvalidationStrategyType

type InvalidationStrategyType int

InvalidationStrategyType described the type if invalidation strategies for near cache.

const (
	ListenAll InvalidationStrategyType = 0
)

type JSONSerializer

type JSONSerializer[T any] struct {
	// contains filtered or unexported fields
}

JSONSerializer serializes data using JSON.

func (JSONSerializer[T]) Deserialize

func (s JSONSerializer[T]) Deserialize(data []byte) (*T, error)

Deserialize deserialized an object and returns the correct type of T.

func (JSONSerializer[T]) Format

func (s JSONSerializer[T]) Format() string

Format returns the format used for the serializer.

func (JSONSerializer[T]) Serialize

func (s JSONSerializer[T]) Serialize(object T) ([]byte, error)

Serialize serializes an object of type T and returns the []byte representation.

type MapEvent

type MapEvent[K comparable, V any] interface {
	// Source returns the source of this MapEvent.
	Source() NamedMap[K, V]

	// Key returns the key of the entry for which this event was raised.
	Key() (*K, error)

	// OldValue returns the old value, if any, of the entry for which this event
	// was raised.
	OldValue() (*V, error)

	// NewValue returns the new value, if any, of the entry for which this event
	// was raised.
	NewValue() (*V, error)

	// Type returns the MapEventType for this MapEvent.
	Type() MapEventType

	// IsExpired returns true if the event was generated from an expiry event. Only valid for gRPC v1 connections.
	IsExpired() (bool, error)

	// IsPriming returns true if the event is a priming event. Only valid for gRPC v1 connections.
	IsPriming() (bool, error)

	// IsSynthetic returns true if the event is a synthetic event. Only valid for gRPC v1 connections.
	IsSynthetic() (bool, error)
}

MapEvent an event which indicates that the content of the NamedMap or NamedCache has changed (i.e., an entry has been added, updated, and/or removed).

type MapEventType

type MapEventType string

MapEventType describes an event raised by a cache mutation.

type MapLifecycleEvent

type MapLifecycleEvent[K comparable, V any] interface {
	// Source returns the source of this MapLifecycleEvent.
	Source() NamedMap[K, V]

	// Type returns the MapLifecycleEventType for this MapLifecycleEvent.
	Type() MapLifecycleEventType
}

MapLifecycleEvent describes an event that may be raised during the lifecycle of a cache.

type MapLifecycleEventType

type MapLifecycleEventType string

MapLifecycleEventType describes an event type that may be raised during the lifecycle of a cache.

type MapLifecycleListener

type MapLifecycleListener[K comparable, V any] interface {
	OnAny(callback func(MapLifecycleEvent[K, V])) MapLifecycleListener[K, V]
	OnDestroyed(callback func(MapLifecycleEvent[K, V])) MapLifecycleListener[K, V]
	OnTruncated(callback func(MapLifecycleEvent[K, V])) MapLifecycleListener[K, V]
	OnReleased(callback func(MapLifecycleEvent[K, V])) MapLifecycleListener[K, V]
	// contains filtered or unexported methods
}

MapLifecycleListener allows registering callbacks to be notified when lifecycle events (truncated or released) occur against a NamedMap or NamedCache.

func NewMapLifecycleListener

func NewMapLifecycleListener[K comparable, V any]() MapLifecycleListener[K, V]

NewMapLifecycleListener creates and returns a pointer to a new MapLifecycleListener instance.

type MapListener

type MapListener[K comparable, V any] interface {
	// OnInserted registers a callback that will be notified when an entry is inserted.
	OnInserted(callback func(MapEvent[K, V])) MapListener[K, V]

	// OnUpdated registers a callback that will be notified when an entry is updated.
	OnUpdated(callback func(MapEvent[K, V])) MapListener[K, V]

	// OnDeleted registers a callback that will be notified when an entry is deleted.
	OnDeleted(callback func(MapEvent[K, V])) MapListener[K, V]

	// OnAny registers a callback that will be notified when any entry mutation has occurred.
	OnAny(callback func(MapEvent[K, V])) MapListener[K, V]

	// SetSynchronous sets this [MapListener] as synchronous.
	SetSynchronous()

	// SetPriming sets this [MapListener] as priming.
	SetPriming()

	// IsSynchronous indicates this [MapListener] is synchronous.
	IsSynchronous() bool

	// IsPriming indicates this [MapListener] is priming.
	IsPriming() bool
	// contains filtered or unexported methods
}

MapListener allows registering callbacks to be notified when mutations events occur within a NamedMap or NamedCache.

func GetFilterListenerGroupListeners

func GetFilterListenerGroupListeners[K comparable, V any](namedMap NamedMap[K, V], f filters.Filter) []MapListener[K, V]

func GetKeyListenerGroupListeners

func GetKeyListenerGroupListeners[K comparable, V any](namedMap NamedMap[K, V], key K) []MapListener[K, V]

revive:disable:unexported-return

func NewMapListener

func NewMapListener[K comparable, V any]() MapListener[K, V]

NewMapListener creates and returns a pointer to a new MapListener instance.

type NamedCache

type NamedCache[K comparable, V any] interface {
	NamedMap[K, V]

	// PutWithExpiry associates the specified value with the specified key. If the cache
	// previously contained a value for this key, the old value is replaced.
	// This variation of the Put(ctx context.Context, key K, value V)
	// function allows the caller to specify an expiry (or "time to live")
	// for the cache entry.  If coherence.ExpiryNever < ttl < 1 millisecond,
	// ttl is set to 1 millisecond.
	// V will be nil if there was no previous value.
	PutWithExpiry(ctx context.Context, key K, value V, ttl time.Duration) (*V, error)
}

NamedCache is syntactically identical in behaviour to a NamedMap, but additionally implements the PutWithExpiry operation. The type parameters are K = type of the key and V = type of the value.

func GetNamedCache

func GetNamedCache[K comparable, V any](session *Session, cacheName string, options ...func(session *CacheOptions)) (NamedCache[K, V], error)

GetNamedCache returns a NamedCache from a session. NamedCache is syntactically identical in behaviour to a NamedMap, but additionally implements the PutWithExpiry function. If there is an existing NamedCache defined with the same type parameters and name it will be returned, otherwise a new instance will be returned. An error will be returned if there already exists a NamedCache with the same name and different type parameters.

// connect to the default address localhost:1408
session, err = coherence.NewSession(ctx)
if err != nil {
    log.Fatal(err)
}

namedMap, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

If you wish to create a NamedCache that has the same expiry for each entry you can use the coherence.WithExpiry cache option. Each call to Put will use the default expiry you have specified. If you use PutWithExpiry, this will override the default expiry for that key.

namedCache, err := coherence.GetNamedCache[int, Person](session, "cache-expiry", coherence.WithExpiry(time.Duration(5)*time.Second))

type NamedCacheClient

type NamedCacheClient[K comparable, V any] struct {
	NamedCache[K, V]
	// contains filtered or unexported fields
}

NamedCacheClient is the implementation of the NamedCache interface. The type parameters are K = type of the key and V= type of the value.

func (*NamedCacheClient[K, V]) AddFilterListener

func (nc *NamedCacheClient[K, V]) AddFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

AddFilterListener Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map where entries satisfy the specified filters.Filter, with the key, and optionally, the old-value and new-value included.

func (*NamedCacheClient[K, V]) AddFilterListenerLite

func (nc *NamedCacheClient[K, V]) AddFilterListenerLite(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

AddFilterListenerLite Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map where entries satisfy the specified filters.Filter, with the key, the old-value and new-value included.

func (*NamedCacheClient[K, V]) AddKeyListener

func (nc *NamedCacheClient[K, V]) AddKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

AddKeyListener Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the specified key within the map, with the key, old-value and new-value included.

func (*NamedCacheClient[K, V]) AddKeyListenerLite

func (nc *NamedCacheClient[K, V]) AddKeyListenerLite(ctx context.Context, listener MapListener[K, V], key K) error

AddKeyListenerLite Adds a ]MapListener] that will receive events (inserts, updates, deletes) that occur against the specified key within the map, with the key, and optionally, the old-value and new-value included.

func (*NamedCacheClient[K, V]) AddLifecycleListener

func (nc *NamedCacheClient[K, V]) AddLifecycleListener(listener MapLifecycleListener[K, V])

AddLifecycleListener Adds a MapLifecycleListener that will receive events (truncated or released) that occur against the NamedCache.

func (*NamedCacheClient[K, V]) AddListener

func (nc *NamedCacheClient[K, V]) AddListener(ctx context.Context, listener MapListener[K, V]) error

AddListener Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map, with the key, old-value and new-value included. This call is equivalent to calling AddFilterListener with filters.Always as the filter.

func (*NamedCacheClient[K, V]) AddListenerLite

func (nc *NamedCacheClient[K, V]) AddListenerLite(ctx context.Context, listener MapListener[K, V]) error

AddListenerLite Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map, with the key, and optionally, the old-value and new-value included. This call is equivalent to calling [AddFilterListenerLite] with filters.Always as the filter.

func (*NamedCacheClient[K, V]) Clear

func (nc *NamedCacheClient[K, V]) Clear(ctx context.Context) error

Clear removes all mappings from this NamedCache. This operation is observable and will trigger any registered events.

func (*NamedCacheClient[K, V]) ContainsEntry

func (nc *NamedCacheClient[K, V]) ContainsEntry(ctx context.Context, key K, value V) (bool, error)

ContainsEntry returns true if this NamedCache contains a mapping for the specified key and value.

The example below shows how to check if a NamedCache contains a mapping for the key 1 and person.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}
person := Person{ID: 1, Name: "Tim"}

if found, err = namedCache.ContainsEntry(ctx, person.ID, person); err != nil {
   log.Fatal(err)
}

func (*NamedCacheClient[K, V]) ContainsKey

func (nc *NamedCacheClient[K, V]) ContainsKey(ctx context.Context, key K) (bool, error)

ContainsKey returns true if this NamedCache contains a mapping for the specified key.

The example below shows how to check if a NamedCache contains a mapping for the key 1.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if found, err = namedCache.ContainsKey(ctx, 1); err != nil {
   log.Fatal(err)
}

func (*NamedCacheClient[K, V]) ContainsValue

func (nc *NamedCacheClient[K, V]) ContainsValue(ctx context.Context, value V) (bool, error)

ContainsValue returns true if this NamedCache contains a mapping for the specified value.

The example below shows how to check if a NamedCache contains a mapping for the person.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}
person := Person{ID: 1, Name: "Tim"}

if found, err = namedCache.ContainsValue(ctx, person); err != nil {
   log.Fatal(err)
}

func (*NamedCacheClient[K, V]) Destroy

func (nc *NamedCacheClient[K, V]) Destroy(ctx context.Context) error

Destroy releases and destroys this instance of NamedCache. Warning This method is used to completely destroy the specified NamedCache across the cluster. All references in the entire cluster to this cache will be invalidated, the data will be cleared, and all internal resources will be released. Note: the removal of entries caused by this operation will not be observable.

func (*NamedCacheClient[K, V]) EntrySet

func (nc *NamedCacheClient[K, V]) EntrySet(ctx context.Context) <-chan *StreamedEntry[K, V]

EntrySet returns a channel from which all entries can be obtained.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against NamedCaches with large number of entries.

The example below shows how to iterate the entries in a NamedCache.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.EntrySet(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedCacheClient[K, V]) EntrySetFilter

func (nc *NamedCacheClient[K, V]) EntrySetFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedEntry[K, V]

EntrySetFilter returns a channel from which entries satisfying the specified filter can be obtained. Each entry in the channel is of type *StreamEntry which wraps an error and the result. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to iterate the entries in a NamedCache where the age > 20.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.EntrySetFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedCacheClient[K, V]) Get

func (nc *NamedCacheClient[K, V]) Get(ctx context.Context, key K) (*V, error)

Get returns the value to which the specified key is mapped. V will be nil if this NamedCache contains no mapping for the key.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

person, err = namedCache.Get(1)
if err != nil {
    log.Fatal(err)
}
if person != nil {
    fmt.Println("Person is", *value)
} else {
    fmt.Println("No person found")
}

func (*NamedCacheClient[K, V]) GetAll

func (nc *NamedCacheClient[K, V]) GetAll(ctx context.Context, keys []K) <-chan *StreamedEntry[K, V]

GetAll returns a channel from which entries satisfying the specified filter can be obtained. Each entry in the channel is of type *StreamedEntry which wraps an error and the result. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to get all the entries for keys 1, 3 and 4.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.GetAll(ctx, []int{1, 3, 4})
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedCacheClient[K, V]) GetCacheName

func (nc *NamedCacheClient[K, V]) GetCacheName() string

GetCacheName returns the cache name of the NamedCache.

func (*NamedCacheClient[K, V]) GetNearCacheStats

func (nc *NamedCacheClient[K, V]) GetNearCacheStats() CacheStats

GetNearCacheStats returns the CacheStats for a near cache for a NamedMap. If no near cache is defined, nil is returned.

func (*NamedCacheClient[K, V]) GetOrDefault

func (nc *NamedCacheClient[K, V]) GetOrDefault(ctx context.Context, key K, def V) (*V, error)

GetOrDefault will return the value mapped to the specified key, or if there is no mapping, it will return the specified default.

func (*NamedCacheClient[K, V]) GetSession

func (nc *NamedCacheClient[K, V]) GetSession() *Session

GetSession returns the session.

func (*NamedCacheClient[K, V]) IsEmpty

func (nc *NamedCacheClient[K, V]) IsEmpty(ctx context.Context) (bool, error)

IsEmpty returns true if this NamedCache contains no mappings.

func (*NamedCacheClient[K, V]) IsReady

func (nc *NamedCacheClient[K, V]) IsReady(ctx context.Context) (bool, error)

IsReady returns whether this NamedCache is ready to be used. An example of when this method would return false would be where a partitioned cache service that owns this cache has no storage-enabled members. If it is not supported by the gRPC proxy, an error will be returned.

func (*NamedCacheClient[K, V]) KeySet

func (nc *NamedCacheClient[K, V]) KeySet(ctx context.Context) <-chan *StreamedKey[K]

KeySet returns a channel from which keys of all entries can be obtained.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against [NamedCache]s with large number of entries.

The example below shows how to iterate the keys in a NamedCache.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.KeySet(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key)
    }
}

func (*NamedCacheClient[K, V]) KeySetFilter

func (nc *NamedCacheClient[K, V]) KeySetFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedKey[K]

KeySetFilter returns a channel from which keys of the entries that satisfy the filter can be obtained. Each entry in the channel is of type *StreamEntry which wraps an error and the key. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to iterate the keys in a NamedCache where the age > 20.

namedCache, err := coherence.GetNamedCache[int, Person](session,"people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.KeySetFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key)
    }
}

func (*NamedCacheClient[K, V]) Name

func (nc *NamedCacheClient[K, V]) Name() string

Name returns the name of the NamedCache].

func (*NamedCacheClient[K, V]) Put

func (nc *NamedCacheClient[K, V]) Put(ctx context.Context, key K, value V) (*V, error)

Put associates the specified value with the specified key returning the previously mapped value, if any. V will be nil if there was no previous value.

func (*NamedCacheClient[K, V]) PutAll

func (nc *NamedCacheClient[K, V]) PutAll(ctx context.Context, entries map[K]V) error

PutAll copies all the mappings from the specified map to this NamedCache. This is the most efficient way to add multiple entries into a NamedCache as it is carried out in parallel and no previous values are returned.

var peopleData = map[int]Person{
    1: {ID: 1, Name: "Tim", Age: 21},
    2: {ID: 2, Name: "Andrew", Age: 44},
    3: {ID: 3, Name: "Helen", Age: 20},
    4: {ID: 4, Name: "Alexa", Age: 12},
}

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = namedCache.PutAll(ctx, peopleData); err != nil {
    log.Fatal(err)
}

func (*NamedCacheClient[K, V]) PutAllWithExpiry

func (nc *NamedCacheClient[K, V]) PutAllWithExpiry(ctx context.Context, entries map[K]V, ttl time.Duration) error

PutAllWithExpiry copies all the mappings from the specified map to this NamedCache and sets the ttl for each entry. This is the most efficient way to add multiple entries into a NamedCache as it is carried out in parallel and no previous values are returned.

var peopleData = map[int]Person{
    1: {ID: 1, Name: "Tim", Age: 21},
    2: {ID: 2, Name: "Andrew", Age: 44},
    3: {ID: 3, Name: "Helen", Age: 20},
    4: {ID: 4, Name: "Alexa", Age: 12},
}

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = namedCache.PutAll(ctx, peopleData, time.Duration(5) * time.Second); err != nil {
    log.Fatal(err)
}

Note: If PutAllWithExpiry is not supported by the Coherence server, an error will be thrown

func (*NamedCacheClient[K, V]) PutIfAbsent

func (nc *NamedCacheClient[K, V]) PutIfAbsent(ctx context.Context, key K, value V) (*V, error)

PutIfAbsent adds the specified mapping if the key is not already associated with a value in the NamedCache and returns nil, else returns the current value.

func (*NamedCacheClient[K, V]) PutWithExpiry

func (nc *NamedCacheClient[K, V]) PutWithExpiry(ctx context.Context, key K, value V, ttl time.Duration) (*V, error)

PutWithExpiry associates the specified value with the specified key. If the NamedCache previously contained a value for this key, the old value is replaced. This variation of the Put(ctx context.Context, key K, value V) function allows the caller to specify an expiry (or "time to live") for the cache entry. If coherence.ExpiryNever < ttl < 1 millisecond, ttl is set to 1 millisecond. V will be nil if there was no previous value.

func (*NamedCacheClient[K, V]) Release

func (nc *NamedCacheClient[K, V]) Release()

Release releases the instance of NamedCache. This operation does not affect the contents of the NamedCache, but only releases the client resources. To access the NamedCache, you must get a new instance.

func (*NamedCacheClient[K, V]) Remove

func (nc *NamedCacheClient[K, V]) Remove(ctx context.Context, key K) (*V, error)

Remove removes the mapping for a key from this NamedCache if it is present and returns the previous value or nil if there wasn't one.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

oldValue, err = namedCache.Remove(ctx, 1)
if err != nil {
    log.Fatal(err)
}

if oldValue == nil {
    fmt.Println("No previous person was found")
} else {
    fmt.Println("Previous person was", *oldValue)
}

func (*NamedCacheClient[K, V]) RemoveFilterListener

func (nc *NamedCacheClient[K, V]) RemoveFilterListener(ctx context.Context, listener MapListener[K, V], f filters.Filter) error

RemoveFilterListener removes the listener that was previously registered to receive events where entries satisfy the specified filters.Filter.

func (*NamedCacheClient[K, V]) RemoveKeyListener

func (nc *NamedCacheClient[K, V]) RemoveKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

RemoveKeyListener removes the listener that was previously registered to receive events against the specified key.

func (*NamedCacheClient[K, V]) RemoveLifecycleListener

func (nc *NamedCacheClient[K, V]) RemoveLifecycleListener(listener MapLifecycleListener[K, V])

RemoveLifecycleListener removes the lifecycle listener that was previously registered to receive events.

func (*NamedCacheClient[K, V]) RemoveListener

func (nc *NamedCacheClient[K, V]) RemoveListener(ctx context.Context, listener MapListener[K, V]) error

RemoveListener removes the listener that was previously registered to receive events.

func (*NamedCacheClient[K, V]) RemoveMapping

func (nc *NamedCacheClient[K, V]) RemoveMapping(ctx context.Context, key K, value V) (bool, error)

RemoveMapping removes the entry for the specified key only if it is currently mapped to the specified value. Returns true if the entry was removed.

func (*NamedCacheClient[K, V]) Replace

func (nc *NamedCacheClient[K, V]) Replace(ctx context.Context, key K, value V) (*V, error)

Replace replaces the entry for the specified key only if it is currently mapped to some value.

func (*NamedCacheClient[K, V]) ReplaceMapping

func (nc *NamedCacheClient[K, V]) ReplaceMapping(ctx context.Context, key K, prevValue V, newValue V) (bool, error)

ReplaceMapping replaces the entry for the specified key only if it is currently mapped to some value. Returns true if the value was replaced.

func (*NamedCacheClient[K, V]) Size

func (nc *NamedCacheClient[K, V]) Size(ctx context.Context) (int, error)

Size returns the number of mappings contained within this NamedCache.

func (*NamedCacheClient[K, V]) String

func (nc *NamedCacheClient[K, V]) String() string

String returns a string representation of a NamedCacheClient.

func (*NamedCacheClient[K, V]) Truncate

func (nc *NamedCacheClient[K, V]) Truncate(ctx context.Context) error

Truncate removes all mappings from this NamedCache. Note: the removal of entries caused by this truncate operation will not be observable.

func (*NamedCacheClient[K, V]) Values

func (nc *NamedCacheClient[K, V]) Values(ctx context.Context) <-chan *StreamedValue[V]

Values returns a view of all values contained in this NamedCache.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against NamedCaches with large number of entries.

The example below shows how to iterate the values in a NamedCache.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.Values(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Value:", result.Value)
    }
}

func (*NamedCacheClient[K, V]) ValuesFilter

func (nc *NamedCacheClient[K, V]) ValuesFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedValue[V]

ValuesFilter returns a view of filtered values contained in this NamedCache. The returned channel will be asynchronously filled with values in the NamedCache that satisfy the filter.

The example below shows how to iterate the values in a NamedCache where the age > 20.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.ValuesFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Value:", result.Value)
    }
}

type NamedDequeue

type NamedDequeue[V any] interface {
	NamedQueue[V]

	// OfferHead inserts the specific value at the head of this queue if it is possible to do
	// so immediately without violating capacity restrictions. If queue is full then
	// [ErrQueueFailedCapacity] is returned, if error is nil the element was added to the queue.
	OfferHead(ctx context.Context, value V) error

	// PollTail retrieves and removes the tail of this queue. If error is nil and the returned
	// value and error is nil this means that there was no entry on the head of the queue.
	PollTail(ctx context.Context) (*V, error)

	// PeekTail retrieves, but does not remove, the tail of this queue. If error is nil and nil value
	// return then there is no entry on the tail of the queue.
	PeekTail(ctx context.Context) (*V, error)
}

func GetNamedDeQueue

func GetNamedDeQueue[V any](ctx context.Context, session *Session, queueName string) (NamedDequeue[V], error)

type NamedMap

type NamedMap[K comparable, V any] interface {
	// AddLifecycleListener Adds a MapLifecycleListener that will receive events (truncated, released) that occur
	// against the NamedMap.
	AddLifecycleListener(listener MapLifecycleListener[K, V])

	// AddFilterListener adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the NamedMap where entries satisfy the specified filters.Filter, with the key, and optionally,
	// the old-value and new-value included.
	AddFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

	// AddFilterListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the NamedMap where entries satisfy the specified filters.Filter, with the key,
	// the old-value and new-value included.
	AddFilterListenerLite(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

	// AddKeyListener adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the specified key within the NamedMap, with the key, old-value and new-value included.
	AddKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

	// AddKeyListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the specified key within the NamedMap, with the key, and optionally, the old-value and new-value included.
	AddKeyListenerLite(ctx context.Context, listener MapListener[K, V], key K) error

	// AddListener adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the NamedMap, with the key, old-value and new-value included.
	// This call is equivalent to calling AddFilterListener with filters.Always as the filter.
	AddListener(ctx context.Context, listener MapListener[K, V]) error

	// AddListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the NamedMap, with the key, and optionally, the old-value and new-value included.
	// This call is equivalent to calling AddFilterListenerLite with filters.Always as the filter.
	AddListenerLite(ctx context.Context, listener MapListener[K, V]) error

	// Clear removes all mappings from the NamedMap.
	Clear(ctx context.Context) error

	// Truncate removes all mappings from the NamedMap.
	// Note: the removal of entries caused by this truncate operation will not be observable.
	Truncate(ctx context.Context) error

	// Destroy releases and destroys this instance of NamedMap.
	// Warning This method is used to completely destroy the specified
	// NamedMap across the cluster. All references in the entire cluster to this
	// cache will be invalidated, the data will be cleared, and all
	// internal resources will be released.
	// Note: the removal of entries caused by this truncate operation will not be observable.
	Destroy(ctx context.Context) error

	// Release releases the instance of NamedMap.
	// This operation does not affect the contents of the NamedMap, but only releases the client
	// resources. To access the NamedMap, you must get a new instance.
	Release()

	// ContainsKey returns true if the NamedMap contains a mapping for the specified key.
	ContainsKey(ctx context.Context, key K) (bool, error)

	// ContainsValue returns true if the NamedMap maps one or more keys to the specified value
	ContainsValue(ctx context.Context, value V) (bool, error)

	// ContainsEntry returns true if the NamedMap contains a mapping for the specified key and value.
	ContainsEntry(ctx context.Context, key K, value V) (bool, error)

	// IsEmpty returns true if the NamedMap contains no mappings.
	IsEmpty(ctx context.Context) (bool, error)

	// EntrySetFilter returns a channel from which entries satisfying the specified filter can be obtained.
	// Each entry in the channel is of type *StreamEntry which basically wraps an error and the result.
	// As always, the result must be accessed (and will be valid) only if the error is nil.
	EntrySetFilter(ctx context.Context, filter filters.Filter) <-chan *StreamedEntry[K, V]

	// EntrySet returns a channel from which all entries can be obtained.
	// Note: the entries are paged internally to avoid excessive memory usage, but you need to be
	// careful when running this operation against NamedMaps with large number of entries.
	EntrySet(ctx context.Context) <-chan *StreamedEntry[K, V]

	// Get returns the value to which the specified key is mapped. V will be nil if there was no previous value.
	Get(ctx context.Context, key K) (*V, error)

	// GetAll returns a channel from which entries satisfying the specified filter can be obtained.
	// Each entry in the channel is of type *StreamEntry which basically wraps an error and the result.
	// As always, the result must be accessed (and will be valid) only of the error is nil.
	GetAll(ctx context.Context, keys []K) <-chan *StreamedEntry[K, V]

	// GetOrDefault will return the value mapped to the specified key,
	// or if there is no mapping, it will return the specified default.
	GetOrDefault(ctx context.Context, key K, def V) (*V, error)

	// InvokeAll invokes the specified processor against the entries matching the specified keys or filter.  If no
	// keys or filter are specified, then the function will be run against all entries.
	// Functions are invoked atomically against a specific entry as the function may mutate the entry.
	InvokeAll(ctx context.Context, keysOrFilter any, proc processors.Processor) <-chan *StreamedValue[V]

	// KeySetFilter returns a channel from which keys of the entries that satisfy the filter can be obtained.
	// Each entry in the channel is of type *StreamEntry which basically wraps an error and the key.
	// As always, the result must be accessed (and will be valid) only of the error is nil.
	KeySetFilter(ctx context.Context, filter filters.Filter) <-chan *StreamedKey[K]

	// KeySet returns a channel from which keys of all entries can be obtained.
	// Note: the entries are paged internally to avoid excessive memory usage, but you need to be
	// careful when running this operation against NamedMaps with large number of entries.
	KeySet(ctx context.Context) <-chan *StreamedKey[K]

	// Name returns the name of the NamedMap.
	Name() string

	// Put associates the specified value with the specified key returning the previously
	// mapped value. V will be nil if there was no previous value.
	Put(ctx context.Context, key K, value V) (*V, error)

	// PutAll copies all the mappings from the specified map to the NamedMap.
	PutAll(ctx context.Context, entries map[K]V) error

	// PutAllWithExpiry copies all the mappings from the specified map to the NamedMap and set the ttl.
	PutAllWithExpiry(ctx context.Context, entries map[K]V, ttl time.Duration) error

	// PutIfAbsent adds the specified mapping if the key is not already associated with a value in the NamedMap.
	// Error will be equal to coherence. V will be nil if there was no previous value.
	PutIfAbsent(ctx context.Context, key K, value V) (*V, error)

	// Remove removes the mapping for a key from the NamedMap if it is present and returns the previously
	// mapped value, if any. V will be nil if there was no previous value.
	Remove(ctx context.Context, key K) (*V, error)

	// RemoveLifecycleListener removes the lifecycle listener that was previously registered to receive events.
	RemoveLifecycleListener(listener MapLifecycleListener[K, V])

	// RemoveFilterListener removes the listener that was previously registered to receive events.
	RemoveFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

	// RemoveKeyListener removes the listener that was previously registered to receive events.
	RemoveKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

	// RemoveListener removes the listener that was previously registered to receive events.
	RemoveListener(ctx context.Context, listener MapListener[K, V]) error

	// RemoveMapping removes the entry for the specified key only if it is currently
	// mapped to the specified value. Returns true if the value was removed.
	RemoveMapping(ctx context.Context, key K, value V) (bool, error)

	// Replace replaces the entry for the specified key only if it is
	// currently mapped to some value.
	Replace(ctx context.Context, key K, value V) (*V, error)

	// ReplaceMapping replaces the entry for the specified key only if it is
	// currently mapped to the value. Returns true if the value was replaced.
	ReplaceMapping(ctx context.Context, key K, prevValue V, newValue V) (bool, error)

	// Size returns the number of mappings contained within the NamedMap.
	Size(ctx context.Context) (int, error)

	// GetSession returns the Session associated with the NamedMap.
	GetSession() *Session

	// ValuesFilter returns a view of filtered values contained in the NamedMap.
	// The returned channel will be asynchronously filled with values in the
	// NamedMap that satisfy the filter.
	ValuesFilter(ctx context.Context, filter filters.Filter) <-chan *StreamedValue[V]

	// Values return a view of all values contained in the NamedMap.
	// Note: the entries are paged internally to avoid excessive memory usage, but you need to be
	// careful when running this operation against NamedMaps with large number of entries.
	Values(ctx context.Context) <-chan *StreamedValue[V]

	// IsReady returns whether this NamedMap is ready to be used.
	// An example of when this method would return false would
	// be where a partitioned cache service that owns this cache has no
	// storage-enabled members.
	// If it is not supported by the gRPC proxy, an error will be returned.
	IsReady(ctx context.Context) (bool, error)

	// GetNearCacheStats returns the [CacheStats] for a near cache for a [NamedMap] or [NamedCache].
	// If no near cache is defined, nil is returned.
	GetNearCacheStats() CacheStats

	// GetCacheName returns the cache name of the [NamedMap] or [NamedCache].
	GetCacheName() string
	// contains filtered or unexported methods
}

NamedMap defines the APIs to cache data, mapping keys to values, supporting full concurrency of retrievals and high expected concurrency for updates. Like traditional maps, this object cannot contain duplicate keys; each key can map to at most one value.

As keys and values must be serializable in some manner. The current supported serialization method is JSON.

Instances of this interface are typically acquired via a coherence.Session.

Although all operations are thread-safe, retrieval operations do not entail locking, and there is no support for locking an entire map in a way to prevent all access. Retrievals reflect the results of the most recently completed update operations holding upon their onset.

The type parameters are K = type of the key and V = type of the value.

func GetNamedMap

func GetNamedMap[K comparable, V any](session *Session, cacheName string, options ...func(session *CacheOptions)) (NamedMap[K, V], error)

GetNamedMap returns a NamedMap from a session. If there is an existing NamedMap defined with the same type parameters and name it will be returned, otherwise a new instance will be returned. An error will be returned if there already exists a NamedMap with the same name and different type parameters.

// connect to the default address localhost:1408
session, err = coherence.NewSession(ctx)
if err != nil {
    log.Fatal(err)
}

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

type NamedMapClient

type NamedMapClient[K comparable, V any] struct {
	NamedMap[K, V]
	// contains filtered or unexported fields
}

NamedMapClient is the implementation of the NamedMap interface. The type parameters are K = type of the key and V = type of the value.

func (*NamedMapClient[K, V]) AddFilterListener

func (nm *NamedMapClient[K, V]) AddFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

AddFilterListener adds a MapListener that will receive events (inserts, updates, deletes) that occur against the NamedMap where entries satisfy the specified filters.Filter, with the key, and optionally, the old-value and new-value included.

func (*NamedMapClient[K, V]) AddFilterListenerLite

func (nm *NamedMapClient[K, V]) AddFilterListenerLite(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

AddFilterListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur against the NamedMap where entries satisfy the specified filters.Filter, with the key, the old-value and new-value included.

func (*NamedMapClient[K, V]) AddKeyListener

func (nm *NamedMapClient[K, V]) AddKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

AddKeyListener adds a MapListener that will receive events (inserts, updates, deletes) that occur against the specified key within the NamedMap, with the key, old-value and new-value included.

func (*NamedMapClient[K, V]) AddKeyListenerLite

func (nm *NamedMapClient[K, V]) AddKeyListenerLite(ctx context.Context, listener MapListener[K, V], key K) error

AddKeyListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur against the specified key within the NamedMap, with the key, and optionally, the old-value and new-value included.

func (*NamedMapClient[K, V]) AddLifecycleListener

func (nm *NamedMapClient[K, V]) AddLifecycleListener(listener MapLifecycleListener[K, V])

AddLifecycleListener adds a MapLifecycleListener that will receive events (truncated or released) that occur against the NamedMap.

func (*NamedMapClient[K, V]) AddListener

func (nm *NamedMapClient[K, V]) AddListener(ctx context.Context, listener MapListener[K, V]) error

AddListener adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map, with the key, old-value and new-value included. This call is equivalent to calling [AddFilterListener] with filters.Always as the filter.

func (*NamedMapClient[K, V]) AddListenerLite

func (nm *NamedMapClient[K, V]) AddListenerLite(ctx context.Context, listener MapListener[K, V]) error

AddListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map, with the key, and optionally, the old-value and new-value included. This call is equivalent to calling [AddFilterListenerLite] with filters.Always as the filter.

func (*NamedMapClient[K, V]) Clear

func (nm *NamedMapClient[K, V]) Clear(ctx context.Context) error

Clear removes all mappings from the NamedMap. This operation is observable and will trigger any registered events.

func (*NamedMapClient[K, V]) ContainsEntry

func (nm *NamedMapClient[K, V]) ContainsEntry(ctx context.Context, key K, value V) (bool, error)

ContainsEntry returns true if the NamedMap contains a mapping for the specified key and value.

The example below shows how to check if a NamedMap contains a mapping for the key 1 and person.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}
person := Person{ID: 1, Name: "Tim"}

if found, err = namedMap.ContainsEntry(ctx, person.ID, person); err != nil {
   log.Fatal(err)
}

func (*NamedMapClient[K, V]) ContainsKey

func (nm *NamedMapClient[K, V]) ContainsKey(ctx context.Context, key K) (bool, error)

ContainsKey returns true if the NamedMap contains a mapping for the specified key.

The example below shows how to check if a NamedMap contains a mapping for the key 1.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if found, err = namedMap.ContainsKey(ctx, 1); err != nil {
   log.Fatal(err)
}

func (*NamedMapClient[K, V]) ContainsValue

func (nm *NamedMapClient[K, V]) ContainsValue(ctx context.Context, value V) (bool, error)

ContainsValue returns true if the NamedMap contains a mapping for the specified value.

The example below shows how to check if a NamedMap contains a mapping for the person.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}
person := Person{ID: 1, Name: "Tim"}

if found, err = namedMap.ContainsValue(ctx, person); err != nil {
   log.Fatal(err)
}

func (*NamedMapClient[K, V]) Destroy

func (nm *NamedMapClient[K, V]) Destroy(ctx context.Context) error

Destroy releases and destroys this instance of NamedMap. Warning This method is used to completely destroy the specified NamedMap across the cluster. All references in the entire cluster to this cache will be invalidated, the data will be cleared, and all internal resources will be released. Note: the removal of entries caused by this operation will not be observable.

func (*NamedMapClient[K, V]) EntrySet

func (nm *NamedMapClient[K, V]) EntrySet(ctx context.Context) <-chan *StreamedEntry[K, V]

EntrySet returns a channel from which all entries can be obtained.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against [NamedMap]s with large number of entries.

The example below shows how to iterate the entries in a NamedMap.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.EntrySet(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedMapClient[K, V]) EntrySetFilter

func (nm *NamedMapClient[K, V]) EntrySetFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedEntry[K, V]

EntrySetFilter returns a channel from which entries satisfying the specified filter can be obtained. Each entry in the channel is of type *StreamedEntry which wraps an error and the result. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to iterate the entries in a NamedMap where the age > 20.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.EntrySetFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedMapClient[K, V]) Get

func (nm *NamedMapClient[K, V]) Get(ctx context.Context, key K) (*V, error)

Get returns the value to which the specified key is mapped. V will be nil if the NamedMap contains no mapping for the key.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

person, err = namedMap.Get(1)
if err != nil {
    log.Fatal(err)
}
if person != nil {
    fmt.Println("Person is", *value)
} else {
    fmt.Println("No person found")
}

func (*NamedMapClient[K, V]) GetAll

func (nm *NamedMapClient[K, V]) GetAll(ctx context.Context, keys []K) <-chan *StreamedEntry[K, V]

GetAll returns a channel from which entries satisfying the specified filter can be obtained. Each entry in the channel is of type *StreamedEntry which wraps an error and the result. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to get all the entries for keys 1, 3 and 4.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.GetAll(ctx, []int{1, 3, 4})
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedMapClient[K, V]) GetCacheName

func (nm *NamedMapClient[K, V]) GetCacheName() string

GetCacheName returns the cache name of the NamedMap.

func (*NamedMapClient[K, V]) GetNearCacheStats

func (nm *NamedMapClient[K, V]) GetNearCacheStats() CacheStats

GetNearCacheStats returns the CacheStats for a near cache for a NamedMap. If no near cache is defined, nil is returned.

func (*NamedMapClient[K, V]) GetOrDefault

func (nm *NamedMapClient[K, V]) GetOrDefault(ctx context.Context, key K, def V) (*V, error)

GetOrDefault will return the value mapped to the specified key, or if there is no mapping, it will return the specified default.

func (*NamedMapClient[K, V]) GetSession

func (nm *NamedMapClient[K, V]) GetSession() *Session

GetSession returns the session.

func (*NamedMapClient[K, V]) IsEmpty

func (nm *NamedMapClient[K, V]) IsEmpty(ctx context.Context) (bool, error)

IsEmpty returns true if the NamedMap contains no mappings.

func (*NamedMapClient[K, V]) IsReady

func (nm *NamedMapClient[K, V]) IsReady(ctx context.Context) (bool, error)

IsReady returns whether this NamedMap is ready to be used. An example of when this method would return false would be where a partitioned cache service that owns this cache has no storage-enabled members. If it is not supported by the gRPC proxy, an error will be returned.

func (*NamedMapClient[K, V]) KeySet

func (nm *NamedMapClient[K, V]) KeySet(ctx context.Context) <-chan *StreamedKey[K]

KeySet returns a channel from which keys of all entries can be obtained.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against [NamedMap]s with large number of entries.

The example below shows how to iterate the keys in a NamedMap.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.KeySet(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key)
    }
}

func (*NamedMapClient[K, V]) KeySetFilter

func (nm *NamedMapClient[K, V]) KeySetFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedKey[K]

KeySetFilter returns a channel from which keys of the entries that satisfy the filter can be obtained. Each entry in the channel is of type *StreamEntry which wraps an error and the key. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to iterate the keys in a NamedMap where the age > 20.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.KeySetFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key)
    }
}

func (*NamedMapClient[K, V]) Name

func (nm *NamedMapClient[K, V]) Name() string

Name returns the name of the NamedMap.

func (*NamedMapClient[K, V]) Put

func (nm *NamedMapClient[K, V]) Put(ctx context.Context, key K, value V) (*V, error)

Put associates the specified value with the specified key returning the previously mapped value, if any. V will be nil if there was no previous value.

func (*NamedMapClient[K, V]) PutAll

func (nm *NamedMapClient[K, V]) PutAll(ctx context.Context, entries map[K]V) error

PutAll copies all the mappings from the specified map to the NamedMap. This is the most efficient way to add multiple entries into a NamedMap as it is carried out in parallel and no previous values are returned.

var peopleData = map[int]Person{
    1: {ID: 1, Name: "Tim", Age: 21},
    2: {ID: 2, Name: "Andrew", Age: 44},
    3: {ID: 3, Name: "Helen", Age: 20},
    4: {ID: 4, Name: "Alexa", Age: 12},
}

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = namedMap.PutAll(ctx, peopleData); err != nil {
    log.Fatal(err)
}

func (*NamedMapClient[K, V]) PutIfAbsent

func (nm *NamedMapClient[K, V]) PutIfAbsent(ctx context.Context, key K, value V) (*V, error)

PutIfAbsent adds the specified mapping if the key is not already associated with a value in the NamedMap and returns nil, else returns the current value.

func (*NamedMapClient[K, V]) Release

func (nm *NamedMapClient[K, V]) Release()

Release releases the instance of NamedMap. This operation does not affect the contents of the NamedMap, but only releases the client resources. To access the NamedMap, you must get a new instance.

func (*NamedMapClient[K, V]) Remove

func (nm *NamedMapClient[K, V]) Remove(ctx context.Context, key K) (*V, error)

Remove removes the mapping for a key from the NamedMap if it is present and returns the previous value or nil if there wasn't one.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

oldValue, err = namedMap.Remove(ctx, 1)
if err != nil {
    log.Fatal(err)
}

if oldValue == nil {
    fmt.Println("No previous person was found")
} else {
    fmt.Println("Previous person was", *oldValue)
}

func (*NamedMapClient[K, V]) RemoveFilterListener

func (nm *NamedMapClient[K, V]) RemoveFilterListener(ctx context.Context, listener MapListener[K, V], f filters.Filter) error

RemoveFilterListener removes the listener that was previously registered to receive events where entries satisfy the specified filters.Filter.

func (*NamedMapClient[K, V]) RemoveKeyListener

func (nm *NamedMapClient[K, V]) RemoveKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

RemoveKeyListener removes the listener that was previously registered to receive events against the specified key.

func (*NamedMapClient[K, V]) RemoveLifecycleListener

func (nm *NamedMapClient[K, V]) RemoveLifecycleListener(listener MapLifecycleListener[K, V])

RemoveLifecycleListener removes the lifecycle listener that was previously registered to receive events.

func (*NamedMapClient[K, V]) RemoveListener

func (nm *NamedMapClient[K, V]) RemoveListener(ctx context.Context, listener MapListener[K, V]) error

RemoveListener removes the listener that was previously registered to receive events.

func (*NamedMapClient[K, V]) RemoveMapping

func (nm *NamedMapClient[K, V]) RemoveMapping(ctx context.Context, key K, value V) (bool, error)

RemoveMapping removes the entry for the specified key only if it is currently mapped to the specified value.

func (*NamedMapClient[K, V]) Replace

func (nm *NamedMapClient[K, V]) Replace(ctx context.Context, key K, value V) (*V, error)

Replace replaces the entry for the specified key only if it is currently mapped to some value.

func (*NamedMapClient[K, V]) ReplaceMapping

func (nm *NamedMapClient[K, V]) ReplaceMapping(ctx context.Context, key K, prevValue V, newValue V) (bool, error)

ReplaceMapping replaces the entry for the specified key only if it is currently mapped to some value. Returns true if the value was replaced.

func (*NamedMapClient[K, V]) Size

func (nm *NamedMapClient[K, V]) Size(ctx context.Context) (int, error)

Size returns the number of mappings contained within the NamedMap.

func (*NamedMapClient[K, V]) String

func (nm *NamedMapClient[K, V]) String() string

String returns a string representation of a NamedMapClient.

func (*NamedMapClient[K, V]) Truncate

func (nm *NamedMapClient[K, V]) Truncate(ctx context.Context) error

Truncate removes all mappings from the NamedMap. Note: the removal of entries caused by this truncate operation will not be observable.

func (*NamedMapClient[K, V]) Values

func (nm *NamedMapClient[K, V]) Values(ctx context.Context) <-chan *StreamedValue[V]

Values returns a view of all values contained in the NamedMap.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against [NamedMap]s with large number of entries.

The example below shows how to iterate the values in a NamedMap.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.Values(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Value:", result.Value)
    }
}

func (*NamedMapClient[K, V]) ValuesFilter

func (nm *NamedMapClient[K, V]) ValuesFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedValue[V]

ValuesFilter returns a view of filtered values contained in the NamedMap. The returned channel will be asynchronously filled with values in the NamedMap that satisfy the filter.

The example below shows how to iterate the values in a NamedMap where the age > 20.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.ValuesFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Value:", result.Value)
    }
}

type NamedQueue

type NamedQueue[V any] interface {
	// Clear clears all the entries from the queue.
	Clear(ctx context.Context) error

	// Destroy destroys this queue on the server and releases all resources. After this operation it is no longer usable.
	Destroy(ctx context.Context) error

	// IsEmpty returns true if this queue is empty.
	IsEmpty(ctx context.Context) (bool, error)

	// IsReady returns true if this queue is ready to receive requests.
	IsReady(ctx context.Context) (bool, error)

	// Size returns the current size of this queue.
	Size(ctx context.Context) (int32, error)

	// OfferTail inserts the specified value to the end of this queue if it is possible to do
	// so immediately without violating capacity restrictions. If queue is full then
	// [ErrQueueFailedCapacity] is returned, if error is nil the element was added to the queue.
	OfferTail(ctx context.Context, value V) error

	// PeekHead retrieves, but does not remove, the head of this queue. If error is nil and nil value
	// return then there is no entry on the head of the queue.
	PeekHead(ctx context.Context) (*V, error)

	// PollHead retrieves and removes the head of this queue. If error is nil and the returned
	// value and error is nil this means that there was no entry on the head of the queue.
	PollHead(ctx context.Context) (*V, error)

	// GetName returns the cache name used for this queue.
	GetName() string

	// Release releases a queue and removes any resources associated with it on the client side.
	Release()

	// GetType returns the type of the [NamedQueue].
	GetType() NamedQueueType

	// AddLifecycleListener Adds a MapLifecycleListener that will receive events (truncated, destroyed) that occur
	// against the [NamedQueue].
	AddLifecycleListener(listener QueueLifecycleListener[V]) error

	// RemoveLifecycleListener removes the lifecycle listener that was previously registered to receive events.
	RemoveLifecycleListener(listener QueueLifecycleListener[V]) error
}

NamedQueue defines a non-blocking Queue implementation.

func GetNamedQueue

func GetNamedQueue[V any](ctx context.Context, session *Session, queueName string, queueType NamedQueueType) (NamedQueue[V], error)

GetNamedQueue returns a new NamedQueue.

type NamedQueueType

type NamedQueueType int
const (
	// Queue defines a simple queue which stores data in a single partition and is limited to approx 2GB of storage.
	Queue NamedQueueType = NamedQueueType(pb1.NamedQueueType_Queue)

	// PagedQueue defines a queue which distributes data over the cluster and is only limited by the cluster capacity.
	PagedQueue NamedQueueType = NamedQueueType(pb1.NamedQueueType_PagedQueue)

	// Dequeue defines a simple double-ended queue that stores data in a single partition.
	Dequeue NamedQueueType = NamedQueueType(pb1.NamedQueueType_Deque)
)

func (NamedQueueType) String

func (qt NamedQueueType) String() string

type NearCacheOptions

type NearCacheOptions struct {
	// TTL is the maximum time to keep the entry in the near cache. When this time has been reached it will be expired.
	TTL time.Duration

	// HighUnits is the maximum number of cache entries to keep in the near cache.
	HighUnits int64

	// HighUnitsMemory is the maximum amount of memory to use for entries in the near cache.
	HighUnitsMemory int64

	InvalidationStrategy InvalidationStrategyType // currently only supports ListenAll

	// PruneFactor indicates the percentage of the total number of units that will remain
	// after the cache manager prunes the near cache(i.e. this is the "low watermark" value)
	// this value is in the range 0.1 to 1.0 and the default is 0.8 or 80%.
	PruneFactor float32
}

NearCacheOptions defines options when creating a near cache.

func (NearCacheOptions) String

func (n NearCacheOptions) String() string

type QueueEventSubmitter

type QueueEventSubmitter interface {
	// contains filtered or unexported methods
}

type QueueLifecycleEvent

type QueueLifecycleEvent[V any] interface {
	Source() NamedQueue[V]
	Type() QueueLifecycleEventType
}

type QueueLifecycleEventType

type QueueLifecycleEventType string

QueueLifecycleEventType describes an event that may be raised during the lifecycle of a queue.

const (
	// QueueDestroyed raised when a queue is destroyed usually as a result of a call to NamedQueue.Destroy().
	QueueDestroyed QueueLifecycleEventType = "queue_destroyed"

	// QueueTruncated raised when a queue is truncated.
	QueueTruncated QueueLifecycleEventType = "queue_truncated"

	// QueueReleased raised when a queue is released but the session.
	QueueReleased QueueLifecycleEventType = "queue_released"
)

type QueueLifecycleListener

type QueueLifecycleListener[V any] interface {
	// OnAny registers a callback that will be notified when any [NamedQueue] event occurs.
	OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

	// OnDestroyed registers a callback that will be notified when a [NamedQueue] is destroyed.
	OnDestroyed(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

	// OnTruncated registers a callback that will be notified when a [Queue] is truncated.
	OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

	// OnReleased registers a callback that will be notified when a [NamedQueue] is released.
	OnReleased(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]
	// contains filtered or unexported methods
}

QueueLifecycleListener allows registering callbacks to be notified when lifecycle events (truncated, released or destroyed) occur against a NamedQueue.

func NewQueueLifecycleListener

func NewQueueLifecycleListener[V any]() QueueLifecycleListener[V]

NewQueueLifecycleListener creates and returns a pointer to a new QueueLifecycleListener instance.

type Serializer

type Serializer[T any] interface {
	// Serialize serializes an object of type T and returns the []byte representation.
	Serialize(object T) ([]byte, error)

	// Deserialize deserialized an object and returns the correct type of T.
	Deserialize(data []byte) (*T, error)

	// Format returns the format used for the serializer.
	Format() string
}

Serializer defines how to serialize/ de-serialize objects.

func NewSerializer

func NewSerializer[T any](format string) Serializer[T]

NewSerializer returns a new Serializer based upon the format and the type.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session provides APIs to create NamedCaches. The NewSession method creates a new instance of a Session. This method also takes a variable number of arguments, called options, that can be passed to configure the Session.

func NewSession

func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (*Session, error)

NewSession creates a new Session with the specified sessionOptions.

Example 1: Create a Session that will eventually connect to host "localhost" and gRPC port: 1408 using an insecure connection.

ctx := context.Background()
session, err = coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
    log.Fatal(err)
}

Example 2: Create a Session that will eventually connect to host "acme.com" and gRPC port: 1408 using an insecure connection.

session, err := coherence.NewSession(ctx, coherence.WithAddress("acme.com:1408"), coherence.WithPlainText())

You can also set the environment variable COHERENCE_SERVER_ADDRESS to specify the address.

Example 3: Create a Session that will eventually connect to default "localhost:1408" using a secured connection.

session, err := coherence.NewSession(ctx)

Example 4: Create a Session that will use the Coherence Name Service to discover the gRPC endpoints using a secured connection.

session, err := coherence.NewSession(ctx, coherence.WithAddress("coherence:///localhost:7574"))

Example 5: Create a Session that will use the Coherence Name Service to discover the gRPC endpoints when multiple clusters are listening on the same cluster port using a secured connection.

session, err := coherence.NewSession(ctx, coherence.WithAddress("coherence:///localhost:7574/cluster2"))

A Session can also be configured using environment variable COHERENCE_SERVER_ADDRESS. See gRPC Naming for information on values for this.

To Configure SSL, you must first enable SSL on the gRPC Proxy, see gRPC Proxy Server for details.

There are a number of ways to set the TLS options when creating a session. You can use WithTLSConfig to specify a custom tls.Config or specify the client certificate, key and trust certificate using additional session options or using environment variables. See below for more details.

myTlSConfig = &tls.Config{....}
session, err := coherence.NewSession(ctx, coherence.WithTLSConfig(myTLSConfig))

You can also use the following to set the required TLS options when creating a session:

session, err := coherence.NewSession(ctx, coherence.WithTLSClientCert("/path/to/client/certificate"),
                                          coherence.WithTLSClientKey("/path/path/to/client/key"),
                                          coherence.WithTLSCertsPath("/path/to/cert/to/be/added/for/trust"))

You can also use coherence.WithIgnoreInvalidCerts() to ignore self-signed certificates for testing only, not to be used in production.

The following environment variables can also be set for the client, and will override any options.

export COHERENCE_TLS_CLIENT_CERT=/path/to/client/certificate
export COHERENCE_TLS_CLIENT_KEY=/path/path/to/client/key
export COHERENCE_TLS_CERTS_PATH=/path/to/cert/to/be/added/for/trust
export COHERENCE_IGNORE_INVALID_CERTS=true    // option to ignore self-signed certificates for testing only, not to be used in production

Finally, the Close() method can be used to close the Session. Once a Session is closed, no APIs on the NamedMap instances should be invoked. If invoked they will return an error.

func (*Session) AddSessionLifecycleListener

func (s *Session) AddSessionLifecycleListener(listener SessionLifecycleListener)

AddSessionLifecycleListener adds a SessionLifecycleListener that will receive events (connected, closed, disconnected or reconnected) that occur against the session.

func (*Session) Close

func (s *Session) Close()

Close closes a connection.

func (*Session) GetDisconnectTimeout

func (s *Session) GetDisconnectTimeout() time.Duration

GetDisconnectTimeout returns the session disconnect timeout in millis.

func (*Session) GetOptions

func (s *Session) GetOptions() *SessionOptions

GetOptions returns the options that were passed during this session creation.

func (*Session) GetProtocolVersion

func (s *Session) GetProtocolVersion() int32

GetProtocolVersion returns the protocol version used by the server.

func (*Session) GetReadyTimeout

func (s *Session) GetReadyTimeout() time.Duration

GetReadyTimeout returns the session disconnect timeout in millis.

func (*Session) GetRequestTimeout

func (s *Session) GetRequestTimeout() time.Duration

GetRequestTimeout returns the session timeout in millis.

func (*Session) ID

func (s *Session) ID() string

ID returns the identifier of a session.

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed returns true if the Session is closed. Returns false otherwise.

func (*Session) NextFilterID

func (s *Session) NextFilterID() int64

func (*Session) NextRequestID

func (s *Session) NextRequestID() int64

func (*Session) RemoveSessionLifecycleListener

func (s *Session) RemoveSessionLifecycleListener(listener SessionLifecycleListener)

RemoveSessionLifecycleListener removes SessionLifecycleListener for a session.

func (*Session) String

func (s *Session) String() string

type SessionLifecycleEvent

type SessionLifecycleEvent interface {
	Type() SessionLifecycleEventType
	Source() *Session
}

SessionLifecycleEvent defines a session lifecycle event

type SessionLifecycleEventType

type SessionLifecycleEventType string

SessionLifecycleEventType describes an event that may be raised during the lifecycle of a Session.

type SessionLifecycleListener

type SessionLifecycleListener interface {
	OnAny(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	OnConnected(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	OnClosed(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	OnDisconnected(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	OnReconnected(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	// contains filtered or unexported methods
}

func NewSessionLifecycleListener

func NewSessionLifecycleListener() SessionLifecycleListener

NewSessionLifecycleListener creates and returns a pointer to a new SessionLifecycleListener instance.

type SessionOptions

type SessionOptions struct {
	Address            string
	Scope              string
	Format             string
	ClientCertPath     string
	ClientKeyPath      string
	CaCertPath         string
	PlainText          bool
	IgnoreInvalidCerts bool
	RequestTimeout     time.Duration
	DisconnectTimeout  time.Duration
	ReadyTimeout       time.Duration
	TlSConfig          *tls.Config
}

SessionOptions holds the session attributes like host, port, tls attributes etc.

func (*SessionOptions) IsPlainText

func (s *SessionOptions) IsPlainText() bool

IsPlainText returns true if plain text, e.g. Non TLS. Returns false otherwise.

func (*SessionOptions) String

func (s *SessionOptions) String() string

String returns a string representation of SessionOptions.

type StreamedEntry

type StreamedEntry[K comparable, V any] struct {
	// Err contains the error (if any) while obtaining the value.
	Err error
	// Key contains the key of the entry.
	Key K
	// Value contains the value of the entry.
	Value V
}

StreamedEntry is wrapper object that wraps an error and a Key and a Value . As always, the Err object must be checked for errors before accessing the Key or the Value fields.

type StreamedKey

type StreamedKey[K comparable] struct {
	// Err contains the error (if any) while obtaining the key.
	Err error
	// Key contains the key of the entry.
	Key K
}

StreamedKey is wrapper object that wraps an error and a key. The Err object must be checked for errors before accessing the Key field.

type StreamedValue

type StreamedValue[V any] struct {
	// Err contains the error (if any) while obtaining the value.
	Err error
	// Value contains the value of the entry.
	Value V
	// indicates if the value is empty
	IsValueEmpty bool
}

StreamedValue is wrapper object that wraps an error and a value. The Err object must be checked for errors before accessing the Value field.

type V1ProxyProtocol

type V1ProxyProtocol string

V1ProxyProtocol defines the types of proxy protocols such as "CacheService" or "QueueService".

func GetCacheServiceProtocol

func GetCacheServiceProtocol() V1ProxyProtocol

Directories

Path Synopsis
Package aggregators provides various aggregator functions and types.
Package aggregators provides various aggregator functions and types.
Package discovery provides an implementation of Coherence NSLookup.
Package discovery provides an implementation of Coherence NSLookup.
Package extractors provides various extractor functions and types.
Package extractors provides various extractor functions and types.
Package filters provides various filter functions and types.
Package filters provides various filter functions and types.
Package processors provides various entry processor functions and types.
Package processors provides various entry processor functions and types.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL