Documentation ¶
Overview ¶
Package hazelcast provides the Hazelcast Go client.
Hazelcast is an open-source distributed in-memory data store and computation platform. It provides a wide variety of distributed data structures and concurrency primitives.
Hazelcast Go client is a way to communicate to Hazelcast IMDG clusters and access the cluster data.
Configuration ¶
If you are using Hazelcast and Go Client on the same computer, generally the default configuration should be fine. This is great for trying out the client. However, if you run the client on a different computer than any of the cluster members, you may need to do some simple configurations such as specifying the member addresses.
The Hazelcast IMDG members and clients have their own configuration options. You may need to reflect some of the member side configurations on the client side to properly connect to the cluster.
In order to configure the client, you only need to create a new `hazelcast.Config{}`, which you can pass to `hazelcast.StartNewClientWithConnfig` function:
config := hazelcast.Config{} client, err := hazelcast.StartNewClientWithConfig(context.TODO(), config)
Calling hazelcast.StartNewClientWithConfig with the default configuration is equivalent to hazelcast.StartNewClient. The default configuration assumes Hazelcast is running at localhost:5701 with the cluster name set to dev. If you run Hazelcast members in a different server than the client, you need to make certain changes to client settings.
Assuming Hazelcast members are running at hz1.server.com:5701, hz2.server.com:5701 and hz3.server.com:5701 with cluster name production, you would use the configuration below. Note that addresses must include port numbers:
config := hazelcast.Config{} config.Cluster.Name = "production" config.Cluster.Network.SetAddresses("hz1.server.com:5701", "hz2.server.com:5701", "hz3.server.com:5701")
You can also load configuration from JSON:
text := ` { "Cluster": { "Name": "production", "Network": { "Addresses": [ "hz1.server.com:5701", "hz2.server.com:5701", "hz3.server.com:5701" ] } } }` var config hazelcast.Config if err := json.Unmarshal([]byte(text), &config); err != nil { panic(err) }
If you are changing several options in a configuration section, you may have to repeatedly specify the configuration section:
config := hazelcast.Config{} config.Cluster.Name = "dev" config.Cluster.HeartbeatInterval = types.Duration(60 * time.Second) config.Cluster.Unisocket = true config.Cluster.SetLoadBalancer(cluster.NewRandomLoadBalancer())
You can simplify the code above by getting a reference to config.Cluster and update it:
config := hazelcast.Config{} cc := &config.Cluster // Note that we are getting a reference to config.Cluster! cc.Name = "dev" cc.HeartbeatInterval = types.Duration(60 * time.Second) cc.Unisocket = true cc.SetLoadBalancer(cluster.NewRandomLoadBalancer())
Note that you should get a reference to the configuration section you are updating, otherwise you would update a copy of it, which doesn't modify the configuration.
There are a few options that require a duration, such as config.Cluster.HeartbeatInterval, config.Cluster.Network.ConnectionTimeout and others. You must use types.Duration instead of time.Duration with those options, since types.Duration values support human readable durations when deserialized from text:
import "github.com/hazelcast/hazelcast-go-client/types" // ... config := hazelcast.Config{} config.Cluster.InvocationTimeout = types.Duration(3 * time.Minute) config.Cluster.Network.ConnectionTimeout = types.Duration(10 * time.Second)
That corresponds to the following JSON configuration. Refer to https://golang.org/pkg/time/#ParseDuration for the available duration strings:
{ "Cluster": { "InvocationTimeout": "3m", "Network": { "ConnectionTimeout": "10s" } }
Here are all configuration items with their default values:
config := hazelcast.Config{} config.ClientName = "" config.SetLabels() cc := &config.Cluster cc.Name = "dev" cc.HeartbeatTimeout = types.Duration(5 * time.Second) cc.HeartbeatInterval = types.Duration(60 * time.Second) cc.InvocationTimeout = types.Duration(120 * time.Second) cc.RedoOperation = false cc.Unisocket = false cc.SetLoadBalancer(cluster.NewRoundRobinLoadBalancer()) cc.Network.SetAddresses("127.0.0.1:5701") cc.Network.SSL.Enabled = true cc.Network.SSL.SetTLSConfig(&tls.Config{}) cc.Network.ConnectionTimeout = types.Duration(5 * time.Second) cc.Security.Credentials.Username = "" cc.Security.Credentials.Password = "" cc.Discovery.UsePublicIP = false cc.Cloud.Enabled = false cc.Cloud.Token = "" cc.ConnectionStrategy.ReconnectMode = cluster.ReconnectModeOn cc.ConnectionStrategy.Timeout = types.Duration(1<<63 - 1) cc.ConnectionStrategy.Retry.InitialBackoff = types.Duration(1*time.Second) cc.ConnectionStrategy.Retry.MaxBackoff = types.Duration(30*time.Second) cc.ConnectionStrategy.Retry.Multiplier = 1.05 cc.ConnectionStrategy.Retry.Jitter = 0.0 config.Serialization.PortableVersion = 0 config.Serialization.LittleEndian = false config.Serialization.SetPortableFactories() config.Serialization.SetIdentifiedDataSerializableFactories() config.Serialization.SetCustomSerializer() config.Serialization.SetClassDefinitions() config.Serialization.SetGlobalSerializer() // Gob serializer config.Stats.Enabled = false config.Stats.Period = types.Duration(5 * time.Second) config.Logger.Level = logger.InfoLevel
Listening for Distributed Object Events ¶
You can listen to creation and destroy events for distributed objects by attaching a listener to the client. A distributed object is created when first referenced unless it already exists. Here is an example:
// Error handling is omitted for brevity. handler := func(e hazelcast.DistributedObjectNotified) { isMapEvent := e.ServiceName == hazelcast.ServiceNameMap isCreationEvent := e.EventType == hazelcast.DistributedObjectCreated log.Println(e.EventType, e.ServiceName, e.ObjectName, "creation?", isCreationEvent, "isMap?", isMapEvent) } subscriptionID, _ := client.AddDistributedObjectListener(ctx, handler) myMap, _ := client.GetMap(ctx, "my-map") // handler is called with: ServiceName=ServiceNameMap; ObjectName="my-map"; EventType=DistributedObjectCreated myMap.Destroy(ctx) // handler is called with: ServiceName=ServiceNameMap; ObjectName="my-map"; EventType=DistributedObjectDestroyed
If you don't want to receive any distributed object events, use client.RemoveDistributedObjectListener:
client.RemoveDistributedObjectListener(subscriptionID)
Running SQL Queries ¶
Running SQL queries require Hazelcast 5.0 and up. Check out the Hazelcast SQL documentation here: https://docs.hazelcast.com/hazelcast/latest/sql/sql-overview
The SQL support should be enabled in Hazelcast server configuration:
<hazelcast> <jet enabled="true" /> </hazelcast>
The client supports two kinds of queries: The ones returning rows (select statements and a few others) and the rest (insert, update, etc.). The former kinds of queries are executed with QuerySQL method and the latter ones are executed with ExecSQL method.
Use the question mark (?) for placeholders.
To connect to a data source and query it as if it is a table, a mapping should be created. Currently, mappings for Map, Kafka and file data sources are supported.
You can read the details about mappings here: https://docs.hazelcast.com/hazelcast/latest/sql/sql-overview#mappings
The following data types are supported when inserting/updating. The names in parantheses correspond to SQL types:
- string (varchar)
- int8 (tinyint)
- int16 (smallint)
- int32 (integer)
- int64 (bigint)
- bool (boolean)
- float32 (real)
- float64 (double)
- types.Decimal (decimal)
- time.Time not supported, use one of types.LocalDate, types.LocalTime, types.LocalDateTime or types.OffsetDateTime
- types.LocalDate (date)
- types.LocalTime (time)
- types.LocalDateTime (timestamp)
- types.OffsetDateTime (timestamp with time zone)
- serialization.JSON (json)
Using Date/Time
In order to force using a specific date/time type, create a time.Time value and cast it to the target type:
t := time.Now() dateValue := types.LocalDate(t) timeValue := types.LocalTime(t) dateTimeValue := types.LocalDateTime(t) dateTimeWithTimezoneValue := types.OffsetDateTime(t)
Management Center Integration ¶
Hazelcast Management Center can monitor your clients if client-side statistics are enabled.
You can enable statistics by setting config.Stats.Enabled to true. Optionally, the period of statistics collection can be set using config.Stats.Period setting. The labels set in configuration appear in the Management Center console:
config := hazelcast.Config{} config.SetLabels("fast-cache", "staging") config.Stats.Enabled = true config.Stats.Period = 1 * time.Second client, err := hazelcast.StartNewClientWithConfig(config)
Example ¶
// Create the configuration config := hazelcast.Config{} config.Cluster.Name = "dev" config.Cluster.Network.SetAddresses("localhost:5701") // Start the client with the configuration provider. ctx := context.TODO() client, err := hazelcast.StartNewClientWithConfig(ctx, config) if err != nil { log.Fatal(err) } // Retrieve a map. peopleMap, err := client.GetMap(ctx, "people") if err != nil { log.Fatal(err) } // Call map functions. err = peopleMap.Set(ctx, "jane", "doe") if err != nil { log.Fatal(err) } // Stop the client once you are done with it. client.Shutdown(ctx)
Output:
Index ¶
- Constants
- type Client
- func (c *Client) AddDistributedObjectListener(ctx context.Context, handler DistributedObjectNotifiedHandler) (types.UUID, error)
- func (c *Client) AddLifecycleListener(handler LifecycleStateChangeHandler) (types.UUID, error)
- func (c *Client) AddMembershipListener(handler cluster.MembershipStateChangeHandler) (types.UUID, error)
- func (c *Client) GetDistributedObjectsInfo(ctx context.Context) ([]types.DistributedObjectInfo, error)
- func (c *Client) GetFlakeIDGenerator(ctx context.Context, name string) (*FlakeIDGenerator, error)
- func (c *Client) GetList(ctx context.Context, name string) (*List, error)
- func (c *Client) GetMap(ctx context.Context, name string) (*Map, error)
- func (c *Client) GetMultiMap(ctx context.Context, name string) (*MultiMap, error)
- func (c *Client) GetPNCounter(ctx context.Context, name string) (*PNCounter, error)
- func (c *Client) GetQueue(ctx context.Context, name string) (*Queue, error)
- func (c *Client) GetReplicatedMap(ctx context.Context, name string) (*ReplicatedMap, error)
- func (c *Client) GetSet(ctx context.Context, name string) (*Set, error)
- func (c *Client) GetTopic(ctx context.Context, name string) (*Topic, error)
- func (c *Client) Name() string
- func (c *Client) RemoveDistributedObjectListener(ctx context.Context, subscriptionID types.UUID) error
- func (c *Client) RemoveLifecycleListener(subscriptionID types.UUID) error
- func (c *Client) RemoveMembershipListener(subscriptionID types.UUID) error
- func (c *Client) Running() bool
- func (c *Client) SQL() sql.Service
- func (c *Client) Shutdown(ctx context.Context) error
- type Config
- func (c *Config) AddFlakeIDGenerator(name string, prefetchCount int32, prefetchExpiry types.Duration) error
- func (c *Config) AddLifecycleListener(handler LifecycleStateChangeHandler) types.UUID
- func (c *Config) AddMembershipListener(handler cluster.MembershipStateChangeHandler) types.UUID
- func (c *Config) Clone() Config
- func (c *Config) SetLabels(labels ...string)
- func (c *Config) Validate() error
- type DistributedObjectEventType
- type DistributedObjectNotified
- type DistributedObjectNotifiedHandler
- type EntryEventType
- type EntryNotified
- type EntryNotifiedHandler
- type FlakeIDGenerator
- type FlakeIDGeneratorConfig
- type ItemEventType
- type LifecycleState
- type LifecycleStateChangeHandler
- type LifecycleStateChanged
- type List
- func (l *List) Add(ctx context.Context, element interface{}) (bool, error)
- func (l *List) AddAll(ctx context.Context, elements ...interface{}) (bool, error)
- func (l *List) AddAllAt(ctx context.Context, index int, elements ...interface{}) (bool, error)
- func (l *List) AddAt(ctx context.Context, index int, element interface{}) error
- func (l *List) AddListener(ctx context.Context, includeValue bool, handler ListItemNotifiedHandler) (types.UUID, error)
- func (l *List) Clear(ctx context.Context) error
- func (l *List) Contains(ctx context.Context, element interface{}) (bool, error)
- func (l *List) ContainsAll(ctx context.Context, elements ...interface{}) (bool, error)
- func (p List) Destroy(ctx context.Context) error
- func (l *List) Get(ctx context.Context, index int) (interface{}, error)
- func (l *List) GetAll(ctx context.Context) ([]interface{}, error)
- func (l *List) IndexOf(ctx context.Context, element interface{}) (int, error)
- func (l *List) IsEmpty(ctx context.Context) (bool, error)
- func (l *List) LastIndexOf(ctx context.Context, element interface{}) (int, error)
- func (l *List) Remove(ctx context.Context, element interface{}) (bool, error)
- func (l *List) RemoveAll(ctx context.Context, elements ...interface{}) (bool, error)
- func (l *List) RemoveAt(ctx context.Context, index int) (interface{}, error)
- func (l *List) RemoveListener(ctx context.Context, subscriptionID types.UUID) error
- func (l *List) RetainAll(ctx context.Context, elements ...interface{}) (bool, error)
- func (l *List) Set(ctx context.Context, index int, element interface{}) (interface{}, error)
- func (l *List) Size(ctx context.Context) (int, error)
- func (l *List) SubList(ctx context.Context, start int, end int) ([]interface{}, error)
- type ListItemNotified
- type ListItemNotifiedHandler
- type Map
- func (m *Map) AddEntryListener(ctx context.Context, config MapEntryListenerConfig, ...) (types.UUID, error)
- func (m *Map) AddIndex(ctx context.Context, indexConfig types.IndexConfig) error
- func (m *Map) AddInterceptor(ctx context.Context, interceptor interface{}) (string, error)
- func (m *Map) Aggregate(ctx context.Context, agg aggregate.Aggregator) (interface{}, error)
- func (m *Map) AggregateWithPredicate(ctx context.Context, agg aggregate.Aggregator, pred predicate.Predicate) (interface{}, error)
- func (m *Map) Clear(ctx context.Context) error
- func (m *Map) ContainsKey(ctx context.Context, key interface{}) (bool, error)
- func (m *Map) ContainsValue(ctx context.Context, value interface{}) (bool, error)
- func (m *Map) Delete(ctx context.Context, key interface{}) error
- func (p Map) Destroy(ctx context.Context) error
- func (m *Map) Evict(ctx context.Context, key interface{}) (bool, error)
- func (m *Map) EvictAll(ctx context.Context) error
- func (m *Map) ExecuteOnEntries(ctx context.Context, entryProcessor interface{}) ([]types.Entry, error)
- func (m *Map) ExecuteOnEntriesWithPredicate(ctx context.Context, entryProcessor interface{}, pred predicate.Predicate) ([]types.Entry, error)
- func (m *Map) ExecuteOnKey(ctx context.Context, entryProcessor interface{}, key interface{}) (interface{}, error)
- func (m *Map) ExecuteOnKeys(ctx context.Context, entryProcessor interface{}, keys ...interface{}) ([]interface{}, error)
- func (m *Map) Flush(ctx context.Context) error
- func (m *Map) ForceUnlock(ctx context.Context, key interface{}) error
- func (m *Map) Get(ctx context.Context, key interface{}) (interface{}, error)
- func (m *Map) GetAll(ctx context.Context, keys ...interface{}) ([]types.Entry, error)
- func (m *Map) GetEntrySet(ctx context.Context) ([]types.Entry, error)
- func (m *Map) GetEntrySetWithPredicate(ctx context.Context, predicate predicate.Predicate) ([]types.Entry, error)
- func (m *Map) GetEntryView(ctx context.Context, key interface{}) (*types.SimpleEntryView, error)
- func (m *Map) GetKeySet(ctx context.Context) ([]interface{}, error)
- func (m *Map) GetKeySetWithPredicate(ctx context.Context, predicate predicate.Predicate) ([]interface{}, error)
- func (m *Map) GetValues(ctx context.Context) ([]interface{}, error)
- func (m *Map) GetValuesWithPredicate(ctx context.Context, predicate predicate.Predicate) ([]interface{}, error)
- func (m *Map) IsEmpty(ctx context.Context) (bool, error)
- func (m *Map) IsLocked(ctx context.Context, key interface{}) (bool, error)
- func (m *Map) LoadAllReplacing(ctx context.Context, keys ...interface{}) error
- func (m *Map) LoadAllWithoutReplacing(ctx context.Context, keys ...interface{}) error
- func (m *Map) Lock(ctx context.Context, key interface{}) error
- func (m *Map) LockWithLease(ctx context.Context, key interface{}, leaseTime time.Duration) error
- func (m *Map) NewLockContext(ctx context.Context) context.Context
- func (m *Map) Put(ctx context.Context, key interface{}, value interface{}) (interface{}, error)
- func (m *Map) PutAll(ctx context.Context, entries ...types.Entry) error
- func (m *Map) PutIfAbsent(ctx context.Context, key interface{}, value interface{}) (interface{}, error)
- func (m *Map) PutIfAbsentWithTTL(ctx context.Context, key interface{}, value interface{}, ttl time.Duration) (interface{}, error)
- func (m *Map) PutIfAbsentWithTTLAndMaxIdle(ctx context.Context, key interface{}, value interface{}, ttl time.Duration, ...) (interface{}, error)
- func (m *Map) PutTransient(ctx context.Context, key interface{}, value interface{}) error
- func (m *Map) PutTransientWithMaxIdle(ctx context.Context, key interface{}, value interface{}, maxIdle time.Duration) error
- func (m *Map) PutTransientWithTTL(ctx context.Context, key interface{}, value interface{}, ttl time.Duration) error
- func (m *Map) PutTransientWithTTLAndMaxIdle(ctx context.Context, key interface{}, value interface{}, ttl time.Duration, ...) error
- func (m *Map) PutWithMaxIdle(ctx context.Context, key interface{}, value interface{}, maxIdle time.Duration) (interface{}, error)
- func (m *Map) PutWithTTL(ctx context.Context, key interface{}, value interface{}, ttl time.Duration) (interface{}, error)
- func (m *Map) PutWithTTLAndMaxIdle(ctx context.Context, key interface{}, value interface{}, ttl time.Duration, ...) (interface{}, error)
- func (m *Map) Remove(ctx context.Context, key interface{}) (interface{}, error)
- func (m *Map) RemoveAll(ctx context.Context, predicate predicate.Predicate) error
- func (m *Map) RemoveEntryListener(ctx context.Context, subscriptionID types.UUID) error
- func (m *Map) RemoveIfSame(ctx context.Context, key interface{}, value interface{}) (bool, error)
- func (m *Map) RemoveInterceptor(ctx context.Context, registrationID string) (bool, error)
- func (m *Map) Replace(ctx context.Context, key interface{}, value interface{}) (interface{}, error)
- func (m *Map) ReplaceIfSame(ctx context.Context, key interface{}, oldValue interface{}, ...) (bool, error)
- func (m *Map) Set(ctx context.Context, key interface{}, value interface{}) error
- func (m *Map) SetTTL(ctx context.Context, key interface{}, ttl time.Duration) error
- func (m *Map) SetTTLAffected(ctx context.Context, key interface{}, ttl time.Duration) (bool, error)
- func (m *Map) SetWithTTL(ctx context.Context, key interface{}, value interface{}, ttl time.Duration) error
- func (m *Map) SetWithTTLAndMaxIdle(ctx context.Context, key interface{}, value interface{}, ttl time.Duration, ...) error
- func (m *Map) Size(ctx context.Context) (int, error)
- func (m *Map) TryLock(ctx context.Context, key interface{}) (bool, error)
- func (m *Map) TryLockWithLease(ctx context.Context, key interface{}, lease time.Duration) (bool, error)
- func (m *Map) TryLockWithLeaseAndTimeout(ctx context.Context, key interface{}, lease time.Duration, ...) (bool, error)
- func (m *Map) TryLockWithTimeout(ctx context.Context, key interface{}, timeout time.Duration) (bool, error)
- func (m *Map) TryPut(ctx context.Context, key interface{}, value interface{}) (bool, error)
- func (m *Map) TryPutWithTimeout(ctx context.Context, key interface{}, value interface{}, timeout time.Duration) (bool, error)
- func (m *Map) TryRemove(ctx context.Context, key interface{}) (interface{}, error)
- func (m *Map) TryRemoveWithTimeout(ctx context.Context, key interface{}, timeout time.Duration) (interface{}, error)
- func (m *Map) Unlock(ctx context.Context, key interface{}) error
- type MapEntryListenerConfig
- func (c *MapEntryListenerConfig) NotifyEntryAdded(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryAllCleared(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryAllEvicted(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryEvicted(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryExpired(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryInvalidated(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryLoaded(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryMerged(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryRemoved(enable bool)
- func (c *MapEntryListenerConfig) NotifyEntryUpdated(enable bool)
- type MessagePublished
- type MultiMap
- func (m *MultiMap) Clear(ctx context.Context) error
- func (m *MultiMap) ContainsEntry(ctx context.Context, key interface{}, value interface{}) (bool, error)
- func (m *MultiMap) ContainsKey(ctx context.Context, key interface{}) (bool, error)
- func (m *MultiMap) ContainsValue(ctx context.Context, value interface{}) (bool, error)
- func (m *MultiMap) Delete(ctx context.Context, key interface{}) error
- func (p MultiMap) Destroy(ctx context.Context) error
- func (m *MultiMap) ForceUnlock(ctx context.Context, key interface{}) error
- func (m *MultiMap) Get(ctx context.Context, key interface{}) ([]interface{}, error)
- func (m *MultiMap) GetEntrySet(ctx context.Context) ([]types.Entry, error)
- func (m *MultiMap) GetKeySet(ctx context.Context) ([]interface{}, error)
- func (m *MultiMap) GetValues(ctx context.Context) ([]interface{}, error)
- func (m *MultiMap) IsLocked(ctx context.Context, key interface{}) (bool, error)
- func (m *MultiMap) Lock(ctx context.Context, key interface{}) error
- func (m *MultiMap) LockWithLease(ctx context.Context, key interface{}, leaseTime time.Duration) error
- func (m *MultiMap) NewLockContext(ctx context.Context) context.Context
- func (m *MultiMap) Put(ctx context.Context, key interface{}, value interface{}) (bool, error)
- func (m *MultiMap) PutAll(ctx context.Context, key interface{}, values ...interface{}) error
- func (m *MultiMap) Remove(ctx context.Context, key interface{}) ([]interface{}, error)
- func (m *MultiMap) RemoveEntry(ctx context.Context, key interface{}, value interface{}) (bool, error)
- func (m *MultiMap) Size(ctx context.Context) (int, error)
- func (m *MultiMap) TryLock(ctx context.Context, key interface{}) (bool, error)
- func (m *MultiMap) TryLockWithLease(ctx context.Context, key interface{}, lease time.Duration) (bool, error)
- func (m *MultiMap) TryLockWithLeaseAndTimeout(ctx context.Context, key interface{}, lease time.Duration, ...) (bool, error)
- func (m *MultiMap) TryLockWithTimeout(ctx context.Context, key interface{}, timeout time.Duration) (bool, error)
- func (m *MultiMap) Unlock(ctx context.Context, key interface{}) error
- func (m *MultiMap) ValueCount(ctx context.Context, key interface{}) (int, error)
- type PNCounter
- func (pn *PNCounter) AddAndGet(ctx context.Context, delta int64) (int64, error)
- func (pn *PNCounter) DecrementAndGet(ctx context.Context) (int64, error)
- func (p PNCounter) Destroy(ctx context.Context) error
- func (pn *PNCounter) Get(ctx context.Context) (int64, error)
- func (pn *PNCounter) GetAndAdd(ctx context.Context, delta int64) (int64, error)
- func (pn *PNCounter) GetAndDecrement(ctx context.Context) (int64, error)
- func (pn *PNCounter) GetAndIncrement(ctx context.Context) (int64, error)
- func (pn *PNCounter) GetAndSubtract(ctx context.Context, delta int64) (int64, error)
- func (pn *PNCounter) IncrementAndGet(ctx context.Context) (int64, error)
- func (pn *PNCounter) Reset()
- func (pn *PNCounter) SubtractAndGet(ctx context.Context, delta int64) (int64, error)
- type Queue
- func (q *Queue) Add(ctx context.Context, value interface{}) (bool, error)
- func (q *Queue) AddAll(ctx context.Context, values ...interface{}) (bool, error)
- func (q *Queue) AddItemListener(ctx context.Context, includeValue bool, handler QueueItemNotifiedHandler) (types.UUID, error)
- func (q *Queue) AddWithTimeout(ctx context.Context, value interface{}, timeout time.Duration) (bool, error)
- func (q *Queue) Clear(ctx context.Context) error
- func (q *Queue) Contains(ctx context.Context, value interface{}) (bool, error)
- func (q *Queue) ContainsAll(ctx context.Context, values ...interface{}) (bool, error)
- func (p Queue) Destroy(ctx context.Context) error
- func (q *Queue) Drain(ctx context.Context) ([]interface{}, error)
- func (q *Queue) DrainWithMaxSize(ctx context.Context, maxSize int) ([]interface{}, error)
- func (q *Queue) GetAll(ctx context.Context) ([]interface{}, error)
- func (q *Queue) IsEmpty(ctx context.Context) (bool, error)
- func (q *Queue) Peek(ctx context.Context) (interface{}, error)
- func (q *Queue) Poll(ctx context.Context) (interface{}, error)
- func (q *Queue) PollWithTimeout(ctx context.Context, timeout time.Duration) (interface{}, error)
- func (q *Queue) Put(ctx context.Context, value interface{}) error
- func (q *Queue) RemainingCapacity(ctx context.Context) (int, error)
- func (q *Queue) Remove(ctx context.Context, value interface{}) (bool, error)
- func (q *Queue) RemoveAll(ctx context.Context, values ...interface{}) (bool, error)
- func (q *Queue) RemoveListener(ctx context.Context, subscriptionID types.UUID) error
- func (q *Queue) RetainAll(ctx context.Context, values ...interface{}) (bool, error)
- func (q *Queue) Size(ctx context.Context) (int, error)
- func (q *Queue) Take(ctx context.Context) (interface{}, error)
- type QueueItemNotified
- type QueueItemNotifiedHandler
- type ReplicatedMap
- func (m *ReplicatedMap) AddEntryListener(ctx context.Context, handler EntryNotifiedHandler) (types.UUID, error)
- func (m *ReplicatedMap) AddEntryListenerToKey(ctx context.Context, key interface{}, handler EntryNotifiedHandler) (types.UUID, error)
- func (m *ReplicatedMap) AddEntryListenerToKeyWithPredicate(ctx context.Context, key interface{}, predicate predicate.Predicate, ...) (types.UUID, error)
- func (m *ReplicatedMap) AddEntryListenerWithPredicate(ctx context.Context, predicate predicate.Predicate, ...) (types.UUID, error)
- func (m *ReplicatedMap) Clear(ctx context.Context) error
- func (m *ReplicatedMap) ContainsKey(ctx context.Context, key interface{}) (bool, error)
- func (m *ReplicatedMap) ContainsValue(ctx context.Context, value interface{}) (bool, error)
- func (p ReplicatedMap) Destroy(ctx context.Context) error
- func (m *ReplicatedMap) Get(ctx context.Context, key interface{}) (interface{}, error)
- func (m *ReplicatedMap) GetEntrySet(ctx context.Context) ([]types.Entry, error)
- func (m *ReplicatedMap) GetKeySet(ctx context.Context) ([]interface{}, error)
- func (m *ReplicatedMap) GetValues(ctx context.Context) ([]interface{}, error)
- func (m *ReplicatedMap) IsEmpty(ctx context.Context) (bool, error)
- func (m *ReplicatedMap) Put(ctx context.Context, key interface{}, value interface{}) (interface{}, error)
- func (m *ReplicatedMap) PutAll(ctx context.Context, keyValuePairs ...types.Entry) error
- func (m *ReplicatedMap) Remove(ctx context.Context, key interface{}) (interface{}, error)
- func (m *ReplicatedMap) RemoveEntryListener(ctx context.Context, subscriptionID types.UUID) error
- func (m *ReplicatedMap) Size(ctx context.Context) (int, error)
- type Set
- func (s *Set) Add(ctx context.Context, item interface{}) (bool, error)
- func (s *Set) AddAll(ctx context.Context, values ...interface{}) (bool, error)
- func (s *Set) AddItemListener(ctx context.Context, includeValue bool, handler SetItemNotifiedHandler) (types.UUID, error)
- func (s *Set) Clear(ctx context.Context) error
- func (s *Set) Contains(ctx context.Context, value interface{}) (bool, error)
- func (s *Set) ContainsAll(ctx context.Context, values ...interface{}) (bool, error)
- func (p Set) Destroy(ctx context.Context) error
- func (s *Set) GetAll(ctx context.Context) ([]interface{}, error)
- func (s *Set) IsEmpty(ctx context.Context) (bool, error)
- func (s *Set) Remove(ctx context.Context, value interface{}) (bool, error)
- func (s *Set) RemoveAll(ctx context.Context, values ...interface{}) (bool, error)
- func (s *Set) RemoveListener(ctx context.Context, subscriptionID types.UUID) error
- func (s *Set) RetainAll(ctx context.Context, values ...interface{}) (bool, error)
- func (s *Set) Size(ctx context.Context) (int, error)
- type SetItemNotified
- type SetItemNotifiedHandler
- type StatsConfig
- type Topic
- func (t *Topic) AddMessageListener(ctx context.Context, handler TopicMessageHandler) (types.UUID, error)
- func (p Topic) Destroy(ctx context.Context) error
- func (t *Topic) Publish(ctx context.Context, message interface{}) error
- func (t *Topic) PublishAll(ctx context.Context, messages ...interface{}) error
- func (t *Topic) RemoveListener(ctx context.Context, subscriptionID types.UUID) error
- type TopicMessageHandler
Examples ¶
Constants ¶
const ( ServiceNameMap = "hz:impl:mapService" ServiceNameReplicatedMap = "hz:impl:replicatedMapService" ServiceNameMultiMap = "hz:impl:multiMapService" ServiceNameQueue = "hz:impl:queueService" ServiceNameTopic = "hz:impl:topicService" ServiceNameList = "hz:impl:listService" ServiceNameSet = "hz:impl:setService" ServiceNamePNCounter = "hz:impl:PNCounterService" ServiceNameFlakeIDGenerator = "hz:impl:flakeIdGeneratorService" )
const (
ClientVersion = internal.ClientVersion
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client enables you to do all Hazelcast operations without being a member of the cluster. It connects to one or more of the cluster members and delegates all cluster wide operations to them.
func StartNewClient ¶ added in v1.0.0
StartNewClient creates and starts a new client with the default configuration. The default configuration is tuned connect to an Hazelcast cluster running on the same computer with the client.
func StartNewClientWithConfig ¶ added in v1.0.0
StartNewClientWithConfig creates and starts a new client with the given configuration.
func (*Client) AddDistributedObjectListener ¶ added in v1.0.0
func (c *Client) AddDistributedObjectListener(ctx context.Context, handler DistributedObjectNotifiedHandler) (types.UUID, error)
AddDistributedObjectListener adds a distributed object listener and returns a unique subscription ID. Use the returned subscription ID to remove the listener.
func (*Client) AddLifecycleListener ¶ added in v1.0.0
func (c *Client) AddLifecycleListener(handler LifecycleStateChangeHandler) (types.UUID, error)
AddLifecycleListener adds a lifecycle state change handler after the client starts. Use the returned subscription ID to remove the listener. The handler must not block.
func (*Client) AddMembershipListener ¶ added in v1.0.0
func (c *Client) AddMembershipListener(handler cluster.MembershipStateChangeHandler) (types.UUID, error)
AddMembershipListener adds a member state change handler and returns a unique subscription ID. Use the returned subscription ID to remove the listener.
func (*Client) GetDistributedObjectsInfo ¶ added in v1.1.0
func (c *Client) GetDistributedObjectsInfo(ctx context.Context) ([]types.DistributedObjectInfo, error)
GetDistributedObjectsInfo returns the information of all objects created cluster-wide.
func (*Client) GetFlakeIDGenerator ¶
GetFlakeIDGenerator returns a FlakeIDGenerator instance.
func (*Client) GetMultiMap ¶
GetMultiMap returns a MultiMap instance.
func (*Client) GetPNCounter ¶
GetPNCounter returns a PNCounter instance.
func (*Client) GetReplicatedMap ¶
GetReplicatedMap returns a replicated map instance.
func (*Client) Name ¶
Name returns client's name Use config.Name to set the client name. If not set manually, an automatically generated name is used.
func (*Client) RemoveDistributedObjectListener ¶ added in v1.0.0
func (c *Client) RemoveDistributedObjectListener(ctx context.Context, subscriptionID types.UUID) error
RemoveDistributedObjectListener removes the distributed object listener handler with the given subscription ID.
func (*Client) RemoveLifecycleListener ¶ added in v1.0.0
RemoveLifecycleListener removes the lifecycle state change handler with the given subscription ID
func (*Client) RemoveMembershipListener ¶ added in v1.0.0
RemoveMembershipListener removes the member state change handler with the given subscription ID.
type Config ¶ added in v1.0.0
type Config struct { FlakeIDGenerators map[string]FlakeIDGeneratorConfig `json:",omitempty"` Labels []string `json:",omitempty"` ClientName string `json:",omitempty"` Logger logger.Config `json:",omitempty"` Failover cluster.FailoverConfig `json:",omitempty"` Serialization serialization.Config `json:",omitempty"` Cluster cluster.Config `json:",omitempty"` Stats StatsConfig `json:",omitempty"` // contains filtered or unexported fields }
Config contains configuration for a client. Zero value of Config is the default configuration.
func (*Config) AddFlakeIDGenerator ¶ added in v1.1.0
func (c *Config) AddFlakeIDGenerator(name string, prefetchCount int32, prefetchExpiry types.Duration) error
AddFlakeIDGenerator validates the values and adds new FlakeIDGeneratorConfig with the given name.
func (*Config) AddLifecycleListener ¶ added in v1.0.0
func (c *Config) AddLifecycleListener(handler LifecycleStateChangeHandler) types.UUID
AddLifecycleListener adds a lifecycle listener. The listener is attached to the client before the client starts, so all lifecycle events can be received. Use the returned subscription ID to remove the listener. The handler must not block.
func (*Config) AddMembershipListener ¶ added in v1.0.0
func (c *Config) AddMembershipListener(handler cluster.MembershipStateChangeHandler) types.UUID
AddMembershipListener adds a membership listener. The listener is attached to the client before the client starts, so all membership events can be received. Use the returned subscription ID to remove the listener.
type DistributedObjectEventType ¶ added in v1.0.0
type DistributedObjectEventType string
DistributedObjectEventType describes event type of a distributed object.
const ( // DistributedObjectCreated is the event type when a distributed object is created. DistributedObjectCreated DistributedObjectEventType = "CREATED" // DistributedObjectDestroyed is the event type when a distributed object is destroyed. DistributedObjectDestroyed DistributedObjectEventType = "DESTROYED" )
type DistributedObjectNotified ¶ added in v1.0.0
type DistributedObjectNotified struct { ServiceName string ObjectName string EventType DistributedObjectEventType }
DistributedObjectNotified contains informatino about the distributed object event.
func (DistributedObjectNotified) EventName ¶ added in v1.0.0
func (d DistributedObjectNotified) EventName() string
type DistributedObjectNotifiedHandler ¶ added in v1.0.0
type DistributedObjectNotifiedHandler func(event DistributedObjectNotified)
DistributedObjectNotifiedHandler is called when a distribute object event occurs.
type EntryEventType ¶ added in v1.0.0
type EntryEventType int32
EntryEventType is the type of an entry event.
const ( // EntryAdded is dispatched if an entry is added. EntryAdded EntryEventType = 1 << 0 // EntryRemoved is dispatched if an entry is removed. EntryRemoved EntryEventType = 1 << 1 // EntryUpdated is dispatched if an entry is updated. EntryUpdated EntryEventType = 1 << 2 // EntryEvicted is dispatched if an entry is evicted. EntryEvicted EntryEventType = 1 << 3 // EntryExpired is dispatched if an entry is expired. EntryExpired EntryEventType = 1 << 4 // EntryAllEvicted is dispatched if all entries are evicted. EntryAllEvicted EntryEventType = 1 << 5 // EntryAllCleared is dispatched if all entries are cleared. EntryAllCleared EntryEventType = 1 << 6 // EntryMerged is dispatched if an entry is merged after a network partition. EntryMerged EntryEventType = 1 << 7 // EntryInvalidated is dispatched if an entry is invalidated. EntryInvalidated EntryEventType = 1 << 8 // EntryLoaded is dispatched if an entry is loaded. EntryLoaded EntryEventType = 1 << 9 )
type EntryNotified ¶ added in v1.0.0
type EntryNotified struct { MergingValue interface{} Key interface{} Value interface{} OldValue interface{} MapName string Member cluster.MemberInfo NumberOfAffectedEntries int EventType EntryEventType }
EntryNotified contains information about an entry event. Member may have the zero value of cluster.MemberInfo if the member is not known at the time the corresponding callback runs. You can check that situation by checking whether Member.UUID is the default UUID.
func (*EntryNotified) EventName ¶ added in v1.0.0
func (e *EntryNotified) EventName() string
type EntryNotifiedHandler ¶ added in v1.0.0
type EntryNotifiedHandler func(event *EntryNotified)
EntryNotifiedHandler is called when an entry event happens.
type FlakeIDGenerator ¶ added in v1.1.0
type FlakeIDGenerator struct {
// contains filtered or unexported fields
}
FlakeIDGenerator is a cluster-wide unique ID generator.
Generated IDs are are k-ordered (roughly ordered) and in the range [0, math.MaxInt64]. They can be negative only if members are explicitly configured with a future epoch start value. For details, see: https://docs.hazelcast.com/imdg/latest/data-structures/flake-id-generator.html
Instead of asking cluster for each ID, they are fetched in batches and then served. Batch size and expiry duration can be configured via FlakeIDGeneratorConfig.
type FlakeIDGeneratorConfig ¶ added in v1.1.0
type FlakeIDGeneratorConfig struct { // PrefetchCount defines the number of pre-fetched IDs from cluster. // The allowed range is [1, 100_000] and defaults to 100. PrefetchCount int32 `json:",omitempty"` // PrefetchExpiry defines the expiry duration of pre-fetched IDs. Defaults to 10 minutes. PrefetchExpiry types.Duration `json:",omitempty"` }
FlakeIDGeneratorConfig contains configuration for the pre-fetching behavior of FlakeIDGenerator.
func (*FlakeIDGeneratorConfig) Validate ¶ added in v1.1.0
func (f *FlakeIDGeneratorConfig) Validate() error
type ItemEventType ¶ added in v1.0.0
type ItemEventType int32
ItemEventType describes event types for item related events.
const ( // ItemAdded stands for item added event. ItemAdded ItemEventType = 1 // ItemRemoved stands for item removed event. ItemRemoved ItemEventType = 2 )
type LifecycleState ¶ added in v1.0.0
type LifecycleState int
LifecycleState indicates the state of the lifecycle event.
const ( // LifecycleStateStarting signals that the client is starting. LifecycleStateStarting LifecycleState = iota // LifecycleStateStarted signals that the client started. LifecycleStateStarted // LifecycleStateShuttingDown signals that the client is shutting down. LifecycleStateShuttingDown // LifecycleStateShutDown signals that the client shut down. LifecycleStateShutDown // LifecycleStateConnected signals that the client connected to the cluster. LifecycleStateConnected // LifecycleStateDisconnected signals that the client disconnected from the cluster. LifecycleStateDisconnected // LifecycleStateChangedCluster signals that the client is connected to a new cluster. LifecycleStateChangedCluster )
func (LifecycleState) String ¶ added in v1.0.0
func (s LifecycleState) String() string
type LifecycleStateChangeHandler ¶ added in v1.0.0
type LifecycleStateChangeHandler func(event LifecycleStateChanged)
LifecycleStateChangeHandler is called when a lifecycle event occurs.
type LifecycleStateChanged ¶ added in v1.0.0
type LifecycleStateChanged struct {
State LifecycleState
}
LifecycleStateChanged contains information about a lifecycle event.
func (*LifecycleStateChanged) EventName ¶ added in v1.0.0
func (e *LifecycleStateChanged) EventName() string
type List ¶ added in v1.0.0
type List struct {
// contains filtered or unexported fields
}
List is a concurrent, distributed, ordered collection. The user of this data structure has precise control over where in the list each element is inserted. The user can access elements by their integer index (position in the list), and search for elements in the list.
List is not a partitioned Hazelcast data structure. So all the contents of the List are stored in a single machine (and in the backup). So, a single List will not scale by adding more members in the cluster.
For details, see https://docs.hazelcast.com/imdg/latest/data-structures/map.html
Example ¶
ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Get a random list list, err := client.GetList(ctx, "list-1") if err != nil { log.Fatal(err) } // Get and print list size size, err := list.Size(ctx) if err != nil { log.Fatal(err) } fmt.Println(size) // Add data, error handling is omitted here for brevity list.Add(ctx, "Item 1") list.Add(ctx, "Item 2") // Get and print list size size, err = list.Size(ctx) if err != nil { log.Fatal(err) } fmt.Println(size) // Shutdown client client.Shutdown(ctx)
Output:
func (*List) Add ¶ added in v1.0.0
Add appends the specified element to the end of this list. Returns true if the list has changed as a result of this operation, false otherwise.
func (*List) AddAll ¶ added in v1.0.0
AddAll appends all elements in the specified slice to the end of this list. Returns true if the list has changed as a result of this operation, false otherwise.
func (*List) AddAllAt ¶ added in v1.0.0
AddAllAt inserts all elements in the specified slice at specified index, keeping the order of the slice. Shifts the subsequent elements to the right. Returns true if the list has changed as a result of this operation, false otherwise.
func (*List) AddAt ¶ added in v1.0.0
AddAt inserts the specified element at the specified index. Shifts the subsequent elements to the right.
func (*List) AddListener ¶ added in v1.0.0
func (l *List) AddListener(ctx context.Context, includeValue bool, handler ListItemNotifiedHandler) (types.UUID, error)
AddListener adds an item listener for this list. The listener will be invoked whenever an item is added to or removed from this list. Received events include the updated item if includeValue is true. Returns subscription ID of the listener.
func (*List) Contains ¶ added in v1.0.0
Contains checks if the list contains the given element. Returns true if the list contains the element, false otherwise.
func (*List) ContainsAll ¶ added in v1.0.0
ContainsAll checks if the list contains all of the given elements. Returns true if the list contains all of the elements, otherwise false.
func (List) Destroy ¶ added in v1.0.0
Destroy removes this object cluster-wide. Clears and releases all resources for this object.
func (*List) GetAll ¶ added in v1.0.0
GetAll returns a slice that contains all elements of this list in proper sequence.
func (*List) IndexOf ¶ added in v1.0.0
IndexOf returns the index of the first occurrence of the given element in this list.
func (*List) LastIndexOf ¶ added in v1.0.0
LastIndexOf returns the index of the last occurrence of the given element in this list.
func (*List) Remove ¶ added in v1.0.0
Remove removes the given element from this list. Returns true if the list has changed as the result of this operation, false otherwise.
func (*List) RemoveAll ¶ added in v1.0.0
RemoveAll removes the given elements from the list. Returns true if the list has changed as the result of this operation, false otherwise.
func (*List) RemoveAt ¶ added in v1.0.0
RemoveAt removes the element at the given index. Returns the removed element.
func (*List) RemoveListener ¶ added in v1.0.0
RemoveListener removes the item listener with the given subscription ID.
func (*List) RetainAll ¶ added in v1.0.0
RetainAll removes all elements from this list except the ones contained in the given slice. Returns true if the list has changed as a result of this operation, false otherwise.
func (*List) Set ¶ added in v1.0.0
Set replaces the element at the specified index in this list with the specified element. Returns the previous element from the list.
type ListItemNotified ¶ added in v1.0.0
type ListItemNotified struct { Value interface{} ListName string Member cluster.MemberInfo EventType ItemEventType }
ListItemNotified describes the List item event. Member may have the zero value of cluster.MemberInfo if the member is not known at the time the corresponding callback runs. You can check that situation by checking whether Member.UUID is the default UUID.
func (ListItemNotified) EventName ¶ added in v1.0.0
func (q ListItemNotified) EventName() string
EventName returns generic event name, common for all List item listeners.
type ListItemNotifiedHandler ¶ added in v1.0.0
type ListItemNotifiedHandler func(event *ListItemNotified)
ListItemNotifiedHandler is a handler function for the List item listener.
type Map ¶ added in v1.0.0
type Map struct {
// contains filtered or unexported fields
}
Map is a distributed map. Hazelcast Go client enables you to perform operations like reading and writing from/to a Hazelcast Map with methods like Get and Put. For details, see https://docs.hazelcast.com/imdg/latest/data-structures/map.html
Listening for Map Entry Events ¶
The first step of listening to entry-based events is creating an instance of MapEntryListenerConfig. MapEntryListenerConfig contains options to filter the events by key and/or predicate and has an option to include the value of the entry, not just the key. You should also choose which type of events you want to receive. In the example below, a listener configuration for added and updated entries is created. Entries only with key "somekey" and matching to predicate year > 2000 are considered:
entryListenerConfig := hazelcast.MapEntryListenerConfig{ Key: "somekey", Predicate: predicate.Greater("year", 2000), IncludeValue: true, } entryListenerConfig.NotifyEntryAdded(true) entryListenerConfig.NotifyEntryUpdated(true) m, err := client.GetMap(ctx, "somemap")
After creating the configuration, the second step is adding an event listener and a handler to act on received events:
subscriptionID, err := m.AddEntryListener(ctx, entryListenerConfig, func(event *hazelcast.EntryNotified) { switch event.EventType { case hazelcast.EntryAdded: fmt.Println("Entry Added:", event.Value) case hazelcast.EntryRemoved: fmt.Println("Entry Removed:", event.Value) case hazelcast.EntryUpdated: fmt.Println("Entry Updated:", event.Value) case hazelcast.EntryEvicted: fmt.Println("Entry Evicted:", event.Value) case hazelcast.EntryLoaded: fmt.Println("Entry Loaded:", event.Value) } })
Adding an event listener returns a subscription ID, which you can later use to remove the listener:
err = m.RemoveEntryListener(ctx, subscriptionID)
Using Locks ¶
You can lock entries in a Map. When an entry is locked, only the owner of that lock can access that entry in the cluster until it is unlocked by the owner of force unlocked. See https://docs.hazelcast.com/imdg/latest/data-structures/map.html#locking-maps for details.
Locks are reentrant. The owner of a lock can acquire the lock again without waiting for the lock to be unlocked. If the key is locked N times, it should be unlocked N times before another goroutine can acquire it.
Lock ownership in Hazelcast Go Client is explicit. The first step to own a lock is creating a lock context, which is similar to a key. The lock context is a regular context.Context which carry a special value that uniquely identifies the lock context in the cluster. Once the lock context is created, it can be used to lock/unlock entries and used with any function that is lock aware, such as Put.
m, err := client.GetMap(ctx, "my-map") lockCtx := m.NewLockContext(ctx) // block acquiring the lock err = m.Lock(lockCtx, "some-key") // pass lock context to use the locked entry err = m.Set(lockCtx, "some-key", "some-value") // release the lock once done with it err = m.Unlock(lockCtx, "some-key")
As mentioned before, lock context is a regular context.Context which carry a special lock ID. You can pass any context.Context to any Map function, but in that case lock ownership between operations using the same hazelcast.Client instance is not possible.
Example ¶
ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Retrieve a map. peopleMap, err := client.GetMap(ctx, "people") if err != nil { log.Fatal(err) } // Call map functions. err = peopleMap.Set(ctx, "jane", "doe") if err != nil { log.Fatal(err) } // Stop the client once you are done with it. client.Shutdown(ctx)
Output:
func (*Map) AddEntryListener ¶ added in v1.0.0
func (m *Map) AddEntryListener(ctx context.Context, config MapEntryListenerConfig, handler EntryNotifiedHandler) (types.UUID, error)
AddEntryListener adds a continuous entry listener to this map.
Example ¶
// error handling was omitted for brevity ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } entryListenerConfig := hazelcast.MapEntryListenerConfig{ IncludeValue: true, } m, err := client.GetMap(ctx, "somemap") if err != nil { log.Fatal(err) } // enable receiving entry added events entryListenerConfig.NotifyEntryAdded(true) // enable receiving entry removed events entryListenerConfig.NotifyEntryRemoved(true) // enable receiving entry updated events entryListenerConfig.NotifyEntryUpdated(true) // enable receiving entry evicted events entryListenerConfig.NotifyEntryEvicted(true) // enable receiving entry loaded events entryListenerConfig.NotifyEntryLoaded(true) subscriptionID, err := m.AddEntryListener(ctx, entryListenerConfig, func(event *hazelcast.EntryNotified) { switch event.EventType { // this is an entry added event case hazelcast.EntryAdded: fmt.Println("Entry Added:", event.Value) // this is an entry removed event case hazelcast.EntryRemoved: fmt.Println("Entry Removed:", event.Value) // this is an entry updated event case hazelcast.EntryUpdated: fmt.Println("Entry Updated:", event.Value) // this is an entry evicted event case hazelcast.EntryEvicted: fmt.Println("Entry Remove:", event.Value) // this is an entry loaded event case hazelcast.EntryLoaded: fmt.Println("Entry Loaded:", event.Value) } }) if err != nil { log.Fatal(err) } // performing modifications on the map entries key := strconv.Itoa(int(time.Now().Unix())) if err := m.Set(ctx, key, "1"); err != nil { log.Fatal(err) } if err := m.Set(ctx, key, "2"); err != nil { log.Fatal(err) } if err := m.Delete(ctx, key); err != nil { log.Fatal(err) } // you can use the subscriptionID later to remove the event listener. if err := m.RemoveEntryListener(ctx, subscriptionID); err != nil { log.Fatal(err) }
Output:
func (*Map) AddIndex ¶ added in v1.0.0
AddIndex adds an index to this map for the specified entries so that queries can run faster.
func (*Map) AddInterceptor ¶ added in v1.0.0
AddInterceptor adds an interceptor for this map.
func (*Map) Aggregate ¶ added in v1.0.0
Aggregate runs the given aggregator and returns the result.
Example ¶
ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } myMap, err := client.GetMap(ctx, "my-map") if err != nil { log.Fatal(err) } if err = myMap.Set(ctx, "k1", serialization.JSON(`{"A": "foo", "B": 10}`)); err != nil { log.Fatal(err) } if err = myMap.Set(ctx, "k2", serialization.JSON(`{"A": "bar", "B": 30}`)); err != nil { log.Fatal(err) } result, err := myMap.Aggregate(ctx, aggregate.LongSum("B")) if err != nil { log.Fatal(err) } fmt.Println(result)
Output:
func (*Map) AggregateWithPredicate ¶ added in v1.0.0
func (m *Map) AggregateWithPredicate(ctx context.Context, agg aggregate.Aggregator, pred predicate.Predicate) (interface{}, error)
AggregateWithPredicate runs the given aggregator and returns the result. The result is filtered with the given predicate.
func (*Map) ContainsKey ¶ added in v1.0.0
ContainsKey returns true if the map contains an entry with the given key.
func (*Map) ContainsValue ¶ added in v1.0.0
ContainsValue returns true if the map contains an entry with the given value.
func (*Map) Delete ¶ added in v1.0.0
Delete removes the mapping for a key from this map if it is present. Unlike remove(object), this operation does not return the removed value, which avoids the serialization cost of the returned value. If the removed value will not be used, a delete operation is preferred over a remove operation for better performance.
func (Map) Destroy ¶ added in v1.0.0
Destroy removes this object cluster-wide. Clears and releases all resources for this object.
func (*Map) Evict ¶ added in v1.0.0
Evict evicts the mapping for a key from this map. Returns true if the key is evicted.
func (*Map) ExecuteOnEntries ¶ added in v1.0.0
func (m *Map) ExecuteOnEntries(ctx context.Context, entryProcessor interface{}) ([]types.Entry, error)
ExecuteOnEntries applies the user defined EntryProcessor to all the entries in the map.
func (*Map) ExecuteOnEntriesWithPredicate ¶ added in v1.1.1
func (m *Map) ExecuteOnEntriesWithPredicate(ctx context.Context, entryProcessor interface{}, pred predicate.Predicate) ([]types.Entry, error)
ExecuteOnEntriesWithPredicate applies the user defined EntryProcessor to all the entries in the map which satisfies the predicate.
func (*Map) ExecuteOnKey ¶ added in v1.2.0
func (m *Map) ExecuteOnKey(ctx context.Context, entryProcessor interface{}, key interface{}) (interface{}, error)
ExecuteOnKey applies the user defined EntryProcessor to the entry with the specified key in the map.
func (*Map) ExecuteOnKeys ¶ added in v1.2.0
func (m *Map) ExecuteOnKeys(ctx context.Context, entryProcessor interface{}, keys ...interface{}) ([]interface{}, error)
ExecuteOnKeys applies the user defined EntryProcessor to the entries with the specified keys in the map.
func (*Map) ForceUnlock ¶ added in v1.0.0
ForceUnlock releases the lock for the specified key regardless of the lock owner. It always successfully unlocks the key, never blocks, and returns immediately.
func (*Map) Get ¶ added in v1.0.0
Get returns the value for the specified key, or nil if this map does not contain this key. Warning: This method returns a clone of original value, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes.
func (*Map) GetEntrySet ¶ added in v1.0.0
GetEntrySet returns a clone of the mappings contained in this map.
func (*Map) GetEntrySetWithPredicate ¶ added in v1.0.0
func (m *Map) GetEntrySetWithPredicate(ctx context.Context, predicate predicate.Predicate) ([]types.Entry, error)
GetEntrySetWithPredicate returns a clone of the mappings contained in this map.
Example ¶
// Start the client with defaults ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Get a random map m, err := client.GetMap(ctx, "map-1") if err != nil { log.Fatal(err) } // Populate the map, error handling is omitted here for brevity m.Put(ctx, "key-1", serialization.JSON(`{"property: 5}`)) m.Put(ctx, "key-2", serialization.JSON(`{"property": 10}`)) m.Put(ctx, "key-3", serialization.JSON(`{"property": 15}`)) // Filter the entries in the map based on a predicate and print those pred := predicate.And(predicate.Less("property", 12), predicate.Greater("property", 8)) entries, err := m.GetEntrySetWithPredicate(ctx, pred) if err != nil { log.Fatal(err) } fmt.Println(entries) // Shutdown client client.Shutdown(ctx)
Output:
func (*Map) GetEntryView ¶ added in v1.0.0
GetEntryView returns the SimpleEntryView for the specified key. If there is no entry view for the key, nil is returned.
func (*Map) GetKeySetWithPredicate ¶ added in v1.0.0
func (m *Map) GetKeySetWithPredicate(ctx context.Context, predicate predicate.Predicate) ([]interface{}, error)
GetKeySetWithPredicate returns keys contained in this map.
func (*Map) GetValues ¶ added in v1.0.0
GetValues returns a list clone of the values contained in this map.
func (*Map) GetValuesWithPredicate ¶ added in v1.0.0
func (m *Map) GetValuesWithPredicate(ctx context.Context, predicate predicate.Predicate) ([]interface{}, error)
GetValuesWithPredicate returns a list clone of the values contained in this map.
func (*Map) IsEmpty ¶ added in v1.0.0
IsEmpty returns true if this map contains no key-value mappings.
func (*Map) LoadAllReplacing ¶ added in v1.0.0
LoadAllReplacing loads all keys from the store at server side or loads the given keys if provided. Replaces existing keys.
func (*Map) LoadAllWithoutReplacing ¶ added in v1.0.0
LoadAllWithoutReplacing loads all keys from the store at server side or loads the given keys if provided.
func (*Map) Lock ¶ added in v1.0.0
Lock acquires the lock for the specified key infinitely. If the lock is not available, the current goroutine is blocked until the lock is acquired using the same lock context.
You get a lock whether the value is present in the map or not. Other goroutines or threads on other systems would block on their invoke of Lock until the non-existent key is unlocked. If the lock holder introduces the key to the map, the Put operation is not blocked. If a goroutine not holding a lock on the non-existent key tries to introduce the key while a lock exists on the non-existent key, the Put operation blocks until it is unlocked.
Scope of the lock is this Map only. Acquired lock is only for the key in this map.
Locks are re-entrant. If the key is locked N times, it should be unlocked N times before another goroutine can acquire it.
func (*Map) LockWithLease ¶ added in v1.0.0
LockWithLease acquires the lock for the specified lease time. Otherwise, it behaves the same as Lock function.
func (*Map) NewLockContext ¶ added in v1.0.0
NewLockContext augments the passed parent context with a unique lock ID. If passed context is nil, context.Background is used as the parent context.
Example ¶
// lockAndIncrement locks the given key, reads the value from it and sets back the incremented value. lockAndIncrement := func(myMap *hazelcast.Map, key string, wg *sync.WaitGroup) { // Signal completion before this goroutine exits. defer wg.Done() intValue := int64(0) // Create a new unique lock context. lockCtx := myMap.NewLockContext(context.Background()) // Lock the key. // The key cannot be unlocked without the same lock context. if err := myMap.Lock(lockCtx, key); err != nil { panic(err) } // Remember to unlock the key, otherwise it won't be accessible elsewhere. defer myMap.Unlock(lockCtx, key) // The same lock context, or a derived one from that lock context must be used, // otherwise the Get operation below will block. v, err := myMap.Get(lockCtx, key) if err != nil { panic(err) } // If v is not nil, then there's already a value for the key. if v != nil { intValue = v.(int64) } // Increment and set the value back. intValue++ // The same lock context, or a derived one from that lock context must be used, // otherwise the Set operation below will block. if err = myMap.Set(lockCtx, key, intValue); err != nil { panic(err) } } const goroutineCount = 100 const key = "counter" ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Get a random map. myMap, err := client.GetMap(ctx, "map") if err != nil { log.Fatal(err) } // Lock and increment the value stored in key for goroutineCount times. wg := &sync.WaitGroup{} wg.Add(goroutineCount) for i := 0; i < goroutineCount; i++ { go lockAndIncrement(myMap, key, wg) } // Wait for all goroutines to complete. wg.Wait() // Retrieve the final value. // A lock context is not needed, since the key is unlocked. if lastValue, err := myMap.Get(context.Background(), key); err != nil { panic(err) } else { fmt.Println("lastValue", lastValue) } client.Shutdown(ctx)
Output:
func (*Map) PutAll ¶ added in v1.0.0
PutAll copies all the mappings from the specified map to this map. No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not.
func (*Map) PutIfAbsent ¶ added in v1.0.0
func (m *Map) PutIfAbsent(ctx context.Context, key interface{}, value interface{}) (interface{}, error)
PutIfAbsent associates the specified key with the given value if it is not already associated.
func (*Map) PutIfAbsentWithTTL ¶ added in v1.0.0
func (m *Map) PutIfAbsentWithTTL(ctx context.Context, key interface{}, value interface{}, ttl time.Duration) (interface{}, error)
PutIfAbsentWithTTL associates the specified key with the given value if it is not already associated. Entry will expire and get evicted after the ttl.
func (*Map) PutIfAbsentWithTTLAndMaxIdle ¶ added in v1.0.0
func (m *Map) PutIfAbsentWithTTLAndMaxIdle(ctx context.Context, key interface{}, value interface{}, ttl time.Duration, maxIdle time.Duration) (interface{}, error)
PutIfAbsentWithTTLAndMaxIdle associates the specified key with the given value if it is not already associated. Entry will expire and get evicted after the ttl. Given max idle time (maximum time for this entry to stay idle in the map) is used.
func (*Map) PutTransient ¶ added in v1.0.0
PutTransient sets the value for the given key. MapStore defined at the server side will not be called. The TTL defined on the server-side configuration will be used. Max idle time defined on the server-side configuration will be used.
func (*Map) PutTransientWithMaxIdle ¶ added in v1.0.0
func (m *Map) PutTransientWithMaxIdle(ctx context.Context, key interface{}, value interface{}, maxIdle time.Duration) error
PutTransientWithMaxIdle sets the value for the given key. MapStore defined at the server side will not be called. Given max idle time (maximum time for this entry to stay idle in the map) is used. Set maxIdle to 0 for infinite idle time.
func (*Map) PutTransientWithTTL ¶ added in v1.0.0
func (m *Map) PutTransientWithTTL(ctx context.Context, key interface{}, value interface{}, ttl time.Duration) error
PutTransientWithTTL sets the value for the given key. MapStore defined at the server side will not be called. Given TTL (maximum time in seconds for this entry to stay in the map) is used. Set ttl to 0 for infinite timeout.
func (*Map) PutTransientWithTTLAndMaxIdle ¶ added in v1.0.0
func (m *Map) PutTransientWithTTLAndMaxIdle(ctx context.Context, key interface{}, value interface{}, ttl time.Duration, maxIdle time.Duration) error
PutTransientWithTTLAndMaxIdle sets the value for the given key. MapStore defined at the server side will not be called. Given TTL (maximum time in seconds for this entry to stay in the map) is used. Set ttl to 0 for infinite timeout. Given max idle time (maximum time for this entry to stay idle in the map) is used. Set maxIdle to 0 for infinite idle time.
func (*Map) PutWithMaxIdle ¶ added in v1.0.0
func (m *Map) PutWithMaxIdle(ctx context.Context, key interface{}, value interface{}, maxIdle time.Duration) (interface{}, error)
PutWithMaxIdle sets the value for the given key and returns the old value. maxIdle is the maximum time in seconds for this entry to stay idle in the map.
func (*Map) PutWithTTL ¶ added in v1.0.0
func (m *Map) PutWithTTL(ctx context.Context, key interface{}, value interface{}, ttl time.Duration) (interface{}, error)
PutWithTTL sets the value for the given key and returns the old value. Entry will expire and get evicted after the ttl.
func (*Map) PutWithTTLAndMaxIdle ¶ added in v1.0.0
func (m *Map) PutWithTTLAndMaxIdle(ctx context.Context, key interface{}, value interface{}, ttl time.Duration, maxIdle time.Duration) (interface{}, error)
PutWithTTLAndMaxIdle sets the value for the given key and returns the old value. Entry will expire and get evicted after the ttl. maxIdle is the maximum time in seconds for this entry to stay idle in the map.
func (*Map) RemoveEntryListener ¶ added in v1.0.0
RemoveEntryListener removes the specified entry listener.
func (*Map) RemoveIfSame ¶ added in v1.0.0
RemoveIfSame removes the entry for a key only if it is currently mapped to a given value. Returns true if the entry was removed.
func (*Map) RemoveInterceptor ¶ added in v1.0.0
RemoveInterceptor removes the interceptor.
func (*Map) Replace ¶ added in v1.0.0
Replace replaces the entry for a key only if it is currently mapped to some value and returns the previous value.
func (*Map) ReplaceIfSame ¶ added in v1.0.0
func (m *Map) ReplaceIfSame(ctx context.Context, key interface{}, oldValue interface{}, newValue interface{}) (bool, error)
ReplaceIfSame replaces the entry for a key only if it is currently mapped to a given value. Returns true if the value was replaced.
func (*Map) SetTTL ¶ added in v1.0.0
SetTTL updates the TTL value of the entry specified by the given key with a new TTL value. Given TTL (maximum time in seconds for this entry to stay in the map) is used. Set ttl to 0 for infinite timeout.
func (*Map) SetTTLAffected ¶ added in v1.2.0
SetTTLAffected updates the TTL value of the entry specified by the given key with a new TTL value. Given TTL (maximum time in seconds for this entry to stay in the map) is used. Returns true if entry is affected. Set ttl to 0 for infinite timeout.
func (*Map) SetWithTTL ¶ added in v1.0.0
func (m *Map) SetWithTTL(ctx context.Context, key interface{}, value interface{}, ttl time.Duration) error
SetWithTTL sets the value for the given key. Given TTL (maximum time in seconds for this entry to stay in the map) is used. Set ttl to 0 for infinite timeout.
func (*Map) SetWithTTLAndMaxIdle ¶ added in v1.0.0
func (m *Map) SetWithTTLAndMaxIdle(ctx context.Context, key interface{}, value interface{}, ttl time.Duration, maxIdle time.Duration) error
SetWithTTLAndMaxIdle sets the value for the given key. Given TTL (maximum time in seconds for this entry to stay in the map) is used. Set ttl to 0 for infinite timeout. Given max idle time (maximum time for this entry to stay idle in the map) is used. Set maxIdle to 0 for infinite idle time.
func (*Map) TryLock ¶ added in v1.0.0
TryLock tries to acquire the lock for the specified key. When the lock is not available, the current goroutine doesn't wait and returns false immediately.
func (*Map) TryLockWithLease ¶ added in v1.0.0
func (m *Map) TryLockWithLease(ctx context.Context, key interface{}, lease time.Duration) (bool, error)
TryLockWithLease tries to acquire the lock for the specified key. Lock will be released after lease time passes.
func (*Map) TryLockWithLeaseAndTimeout ¶ added in v1.0.0
func (m *Map) TryLockWithLeaseAndTimeout(ctx context.Context, key interface{}, lease time.Duration, timeout time.Duration) (bool, error)
TryLockWithLeaseAndTimeout tries to acquire the lock for the specified key. The current goroutine is blocked until the lock is acquired using the same lock context, or he specified waiting time elapses. Lock will be released after lease time passes.
func (*Map) TryLockWithTimeout ¶ added in v1.0.0
func (m *Map) TryLockWithTimeout(ctx context.Context, key interface{}, timeout time.Duration) (bool, error)
TryLockWithTimeout tries to acquire the lock for the specified key. The current goroutine is blocked until the lock is acquired using the same lock context, or he specified waiting time elapses.
func (*Map) TryPut ¶ added in v1.0.0
TryPut tries to put the given key and value into this map and returns immediately.
func (*Map) TryPutWithTimeout ¶ added in v1.0.0
func (m *Map) TryPutWithTimeout(ctx context.Context, key interface{}, value interface{}, timeout time.Duration) (bool, error)
TryPutWithTimeout tries to put the given key and value into this map and waits until operation is completed or the given timeout is reached.
func (*Map) TryRemove ¶ added in v1.0.0
TryRemove tries to remove the given key from this map and returns immediately.
type MapEntryListenerConfig ¶ added in v1.0.0
type MapEntryListenerConfig struct { Predicate predicate.Predicate Key interface{} IncludeValue bool // contains filtered or unexported fields }
MapEntryListenerConfig contains configuration for a map entry listener.
func (*MapEntryListenerConfig) NotifyEntryAdded ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryAdded(enable bool)
NotifyEntryAdded enables receiving an entry event when an entry is added.
func (*MapEntryListenerConfig) NotifyEntryAllCleared ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryAllCleared(enable bool)
NotifyEntryAllCleared enables receiving an entry event when all entries are cleared.
func (*MapEntryListenerConfig) NotifyEntryAllEvicted ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryAllEvicted(enable bool)
NotifyEntryAllEvicted enables receiving an entry event when all entries are evicted.
func (*MapEntryListenerConfig) NotifyEntryEvicted ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryEvicted(enable bool)
NotifyEntryEvicted enables receiving an entry event when an entry is evicted.
func (*MapEntryListenerConfig) NotifyEntryExpired ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryExpired(enable bool)
NotifyEntryExpired enables receiving an entry event when an entry is expired.
func (*MapEntryListenerConfig) NotifyEntryInvalidated ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryInvalidated(enable bool)
NotifyEntryInvalidated enables receiving an entry event when an entry is invalidated.
func (*MapEntryListenerConfig) NotifyEntryLoaded ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryLoaded(enable bool)
NotifyEntryLoaded enables receiving an entry event when an entry is loaded.
func (*MapEntryListenerConfig) NotifyEntryMerged ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryMerged(enable bool)
NotifyEntryMerged enables receiving an entry event when an entry is merged.
func (*MapEntryListenerConfig) NotifyEntryRemoved ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryRemoved(enable bool)
NotifyEntryRemoved enables receiving an entry event when an entry is removed.
func (*MapEntryListenerConfig) NotifyEntryUpdated ¶ added in v1.0.0
func (c *MapEntryListenerConfig) NotifyEntryUpdated(enable bool)
NotifyEntryUpdated enables receiving an entry event when an entry is updated.
type MessagePublished ¶ added in v1.0.0
type MessagePublished struct { PublishTime time.Time Value interface{} TopicName string Member cluster.MemberInfo }
MessagePublished contains information about a message published event. Member may have the zero value of cluster.MemberInfo if the member is not known at the time the corresponding callback runs. You can check that situation by checking whether Member.UUID is the default UUID.
func (*MessagePublished) EventName ¶ added in v1.0.0
func (m *MessagePublished) EventName() string
type MultiMap ¶ added in v1.2.0
type MultiMap struct {
// contains filtered or unexported fields
}
MultiMap is a distributed map. Hazelcast Go client enables you to perform operations like reading and writing from/to a Hazelcast MultiMap with methods like Get and Put. For details, see https://docs.hazelcast.com/hazelcast/latest/data-structures/multimap.html
Using Locks ¶
You can lock entries in a MultiMap. When an entry is locked, only the owner of that lock can access that entry in the cluster until it is unlocked by the owner of force unlocked. See https://docs.hazelcast.com/imdg/latest/data-structures/map.html#locking-maps for details, usage is identical.
Locks are reentrant. The owner of a lock can acquire the lock again without waiting for the lock to be unlocked. If the key is locked N times, it should be unlocked N times before another goroutine can acquire it.
Lock ownership in Hazelcast Go Client is explicit. The first step to own a lock is creating a lock context, which is similar to a key. The lock context is a regular context.Context which carry a special value that uniquely identifies the lock context in the cluster. Once the lock context is created, it can be used to lock/unlock entries and used with any function that is lock aware, such as Put.
m, err := client.GetMultiMap(ctx, "my-map") lockCtx := m.NewLockContext(ctx) // block acquiring the lock err = m.Lock(lockCtx, "some-key") // pass lock context to use the locked entry err = m.Put(lockCtx, "some-key", "some-value") // release the lock once done with it err = m.Unlock(lockCtx, "some-key")
As mentioned before, lock context is a regular context.Context which carry a special lock ID. You can pass any context.Context to any MultiMap function, but in that case lock ownership between operations using the same hazelcast.Client instance is not possible.
Example ¶
ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Retrieve a map. peopleMap, err := client.GetMultiMap(ctx, "people") if err != nil { log.Fatal(err) } // Call map functions. success, err := peopleMap.Put(ctx, "jane", "doe") if err != nil { log.Fatal(err) } if !success { log.Fatal("map operation failed") } // Add multiple values to existing key if err = peopleMap.PutAll(ctx, "jane", "smith", "mason"); err != nil { log.Fatal(err) } values, err := peopleMap.Get(ctx, "jane") if err != nil { log.Fatal(err) } // ["smith", "mason", "doe"] order of values may not be preserved fmt.Println(values) // Stop the client once you are done with it. client.Shutdown(ctx)
Output:
func (*MultiMap) Clear ¶ added in v1.2.0
Clear deletes all entries one by one and fires related events.
func (*MultiMap) ContainsEntry ¶ added in v1.2.0
func (m *MultiMap) ContainsEntry(ctx context.Context, key interface{}, value interface{}) (bool, error)
ContainsEntry returns true if the multi-map contains an entry with the given key and value.
func (*MultiMap) ContainsKey ¶ added in v1.2.0
ContainsKey returns true if the map contains an entry with the given key.
func (*MultiMap) ContainsValue ¶ added in v1.2.0
ContainsValue returns true if the map contains an entry with the given value.
func (*MultiMap) Delete ¶ added in v1.2.0
Delete removes the mapping for a key from this multi-map if it is present. Unlike remove(object), this operation does not return the removed value, which avoids the serialization cost of the returned value. If the removed value will not be used, delete operation is preferred over remove operation for better performance.
func (MultiMap) Destroy ¶ added in v1.2.0
Destroy removes this object cluster-wide. Clears and releases all resources for this object.
func (*MultiMap) ForceUnlock ¶ added in v1.2.0
ForceUnlock releases the lock for the specified key regardless of the lock owner. It always successfully unlocks the key, never blocks, and returns immediately.
func (*MultiMap) Get ¶ added in v1.2.0
Get returns values for the specified key or an empty slice if this multi-map does not contain this key. Warning: This method returns a clone of original value, modifying the returned value does not change the actual value in the multi-map. One should put modified value back to make changes visible to all nodes.
func (*MultiMap) GetEntrySet ¶ added in v1.2.0
GetEntrySet returns a clone of the mappings contained in this multi-map.
func (*MultiMap) GetValues ¶ added in v1.2.0
GetValues returns a list clone of the values contained in this map.
func (*MultiMap) Lock ¶ added in v1.2.0
Lock acquires the lock for the specified key infinitely. If the lock is not available, the current goroutine is blocked until the lock is acquired using the same lock context.
You get a lock whether the value is present in the multi-map or not. Other goroutines or threads on other systems would block on their invoke of Lock until the non-existent key is unlocked. If the lock holder introduces the key to the map, the Put operation is not blocked. If a goroutine not holding a lock on the non-existent key tries to introduce the key while a lock exists on the non-existent key, the Put operation blocks until it is unlocked.
Scope of the lock is this MultiMap only. Acquired lock is only for the key in this map.
Locks are re-entrant. If the key is locked N times, it should be unlocked N times before another goroutine can acquire it.
func (*MultiMap) LockWithLease ¶ added in v1.2.0
func (m *MultiMap) LockWithLease(ctx context.Context, key interface{}, leaseTime time.Duration) error
LockWithLease acquires the lock for the specified lease time. Otherwise, it behaves the same as Lock function.
func (*MultiMap) NewLockContext ¶ added in v1.2.0
NewLockContext augments the passed parent context with a unique lock ID. If passed context is nil, context.Background is used as the parent context.
func (*MultiMap) Put ¶ added in v1.2.0
Put appends the value for the given key to the corresponding value list and returns if operation is successful.
func (*MultiMap) PutAll ¶ added in v1.2.0
PutAll appends given values to the value list of given key. No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not.
func (*MultiMap) Remove ¶ added in v1.2.0
Remove deletes all the values corresponding to the given key and returns them as a slice.
func (*MultiMap) RemoveEntry ¶ added in v1.2.0
func (m *MultiMap) RemoveEntry(ctx context.Context, key interface{}, value interface{}) (bool, error)
RemoveEntry removes the specified value for the given key and returns true if call had an effect.
func (*MultiMap) TryLock ¶ added in v1.2.0
TryLock tries to acquire the lock for the specified key. When the lock is not available, the current goroutine doesn't wait and returns false immediately.
func (*MultiMap) TryLockWithLease ¶ added in v1.2.0
func (m *MultiMap) TryLockWithLease(ctx context.Context, key interface{}, lease time.Duration) (bool, error)
TryLockWithLease tries to acquire the lock for the specified key. Lock will be released after lease time passes.
func (*MultiMap) TryLockWithLeaseAndTimeout ¶ added in v1.2.0
func (m *MultiMap) TryLockWithLeaseAndTimeout(ctx context.Context, key interface{}, lease time.Duration, timeout time.Duration) (bool, error)
TryLockWithLeaseAndTimeout tries to acquire the lock for the specified key. The current goroutine is blocked until the lock is acquired using the same lock context, or he specified waiting time elapses. Lock will be released after lease time passes.
func (*MultiMap) TryLockWithTimeout ¶ added in v1.2.0
func (m *MultiMap) TryLockWithTimeout(ctx context.Context, key interface{}, timeout time.Duration) (bool, error)
TryLockWithTimeout tries to acquire the lock for the specified key. The current goroutine is blocked until the lock is acquired using the same lock context, or he specified waiting time elapses.
type PNCounter ¶ added in v1.0.0
type PNCounter struct {
// contains filtered or unexported fields
}
PNCounter is a PN (Positive-Negative) CRDT counter.
The counter supports adding and subtracting values as well as retrieving the current counter value. Each replica of this counter can perform operations locally without coordination with the other replicas, thus increasing availability. The counter guarantees that whenever two nodes have received the same set of updates, possibly in a different order, their state is identical, and any conflicting updates are merged automatically. If no new updates are made to the shared state, all nodes that can communicate will eventually have the same data.
When invoking updates from the client, the invocation is remote. This may lead to indeterminate state - the update may be applied but the response has not been received. In this case, the caller will be notified with hzerrors.ErrTargetDisconnected.
The read and write methods provide monotonic read and RYW (read-your-write) guarantees. These guarantees are session guarantees which means that if no replica with the previously observed state is reachable, the session guarantees are lost and the method invocation will return hzerrors.ErrConsistencyLostException. This does not mean that an update is lost. All of the updates are part of some replica and will be eventually reflected in the state of all other replicas. This error just means that you cannot observe your own writes because all replicas that contain your updates are currently unreachable. After you receive hzerrors.ErrConsistencyLostException, you can either wait for a sufficiently up-to-date replica to become reachable in which case the session can be continued or you can reset the session by calling the Reset function. If you have called the Reset function, a new session is started with the next invocation to a CRDT replica.
Note that The CRDT state is kept entirely on non-lite (data) members. If there aren't any and the methods here are invoked on a lite member, they will fail with hzerrors.ErrNoDataMember.
For details see https://docs.hazelcast.com/imdg/latest/data-structures/pn-counter.html
Example ¶
ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Retrieve the PN counter named my-pn. pn, err := client.GetPNCounter(ctx, "my-pn") if err != nil { log.Fatal(err) } // Add the given value and retrieve the result. _, err = pn.AddAndGet(ctx, 43) if err != nil { log.Fatal(err) } // Decrement the given value and retrieve the result. value, err := pn.DecrementAndGet(ctx) if err != nil { log.Fatal(err) } fmt.Println(value)
Output:
func (*PNCounter) AddAndGet ¶ added in v1.0.0
AddAndGet adds the given value to the current value and returns the updated value.
func (*PNCounter) DecrementAndGet ¶ added in v1.0.0
DecrementAndGet decrements the counter value by one and returns the updated value.
func (PNCounter) Destroy ¶ added in v1.0.0
Destroy removes this object cluster-wide. Clears and releases all resources for this object.
func (*PNCounter) GetAndAdd ¶ added in v1.0.0
GetAndAdd adds the given value to the current value and returns the previous value.
func (*PNCounter) GetAndDecrement ¶ added in v1.0.0
GetAndDecrement decrements the counter value by one and returns the previous value.
func (*PNCounter) GetAndIncrement ¶ added in v1.0.0
GetAndIncrement increments the counter value by one and returns the previous value.
func (*PNCounter) GetAndSubtract ¶ added in v1.0.0
GetAndSubtract subtracts the given value from the current value and returns the previous value.
func (*PNCounter) IncrementAndGet ¶ added in v1.0.0
IncrementAndGet increments the counter value by one and returns the updated value.
type Queue ¶ added in v1.0.0
type Queue struct {
// contains filtered or unexported fields
}
Queue is a concurrent, blocking, distributed, observable queue.
Queue is not a partitioned data-structure. All of the Queue content is stored in a single machine (and in the backup). Queue will not scale by adding more members in the cluster.
For details see https://docs.hazelcast.com/imdg/latest/data-structures/queue.html
Example ¶
ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Get a random queue q, err := client.GetQueue(ctx, "queue-1") if err != nil { log.Fatal(err) } // Add an item to the queue if space is available (non-blocking) added, err := q.Add(ctx, "item 1") if err != nil { log.Fatal(err) } if added { fmt.Println("Added item 1") } // Get the head of the queue if available and print item item, err := q.Poll(ctx) if err != nil { log.Fatal(err) } fmt.Println(item) // Add an item waiting for capacity until timeout added, err = q.AddWithTimeout(ctx, "item 2", 2*time.Second) if err != nil { log.Fatal(err) } if added { fmt.Println("Added item 2") } // Wait indefinetely to add an item err = q.Put(ctx, "item 3") if err != nil { log.Fatal(err) } // Wait indefintely to take the head and print item item, err = q.Take(ctx) if err != nil { log.Fatal(err) } fmt.Println(item) // Shutdown client client.Shutdown(ctx)
Output:
func (*Queue) Add ¶ added in v1.0.0
Add adds the specified item to this queue if there is available space. Returns true when element is successfully added
func (*Queue) AddAll ¶ added in v1.0.0
AddAll adds the elements in the specified collection to this queue. Returns true if the queue is changed after the call.
func (*Queue) AddItemListener ¶ added in v1.0.0
func (q *Queue) AddItemListener(ctx context.Context, includeValue bool, handler QueueItemNotifiedHandler) (types.UUID, error)
AddItemListener adds an item listener for this queue. Listener will be notified for all queue add/remove events. Received events include the updated item if includeValue is true.
func (*Queue) AddWithTimeout ¶ added in v1.0.0
func (q *Queue) AddWithTimeout(ctx context.Context, value interface{}, timeout time.Duration) (bool, error)
AddWithTimeout adds the specified item to this queue if there is available space. Returns true when element is successfully added
func (*Queue) Contains ¶ added in v1.0.0
Contains returns true if the queue includes the given value.
func (*Queue) ContainsAll ¶ added in v1.0.0
ContainsAll returns true if the queue includes all given values.
func (Queue) Destroy ¶ added in v1.0.0
Destroy removes this object cluster-wide. Clears and releases all resources for this object.
func (*Queue) DrainWithMaxSize ¶ added in v1.0.0
DrainWithMaxSize returns maximum maxSize items in tne queue and removes returned items from the queue.
func (*Queue) Peek ¶ added in v1.0.0
Peek retrieves the head of queue without removing it from the queue.
func (*Queue) PollWithTimeout ¶ added in v1.0.0
PollWithTimeout retrieves and removes the head of this queue. Waits until this timeout elapses and returns the result.
func (*Queue) Put ¶ added in v1.0.0
Put adds the specified element into this queue. If there is no space, it waits until necessary space becomes available.
func (*Queue) RemainingCapacity ¶ added in v1.0.0
RemainingCapacity returns the remaining capacity of this queue.
func (*Queue) Remove ¶ added in v1.0.0
Remove removes the specified element from the queue if it exists.
func (*Queue) RemoveAll ¶ added in v1.0.0
RemoveAll removes all of the elements of the specified collection from this queue.
func (*Queue) RemoveListener ¶ added in v1.0.0
RemoveListener removes the specified listener.
func (*Queue) RetainAll ¶ added in v1.0.0
RetainAll removes the items which are not contained in the specified collection.
type QueueItemNotified ¶ added in v1.0.0
type QueueItemNotified struct { Value interface{} QueueName string Member cluster.MemberInfo EventType ItemEventType }
QueueItemNotified contains information about an item notified event. Member may have the zero value of cluster.MemberInfo if the member is not known at the time the corresponding callback runs. You can check that situation by checking whether Member.UUID is the default UUID.
func (QueueItemNotified) EventName ¶ added in v1.0.0
func (q QueueItemNotified) EventName() string
type QueueItemNotifiedHandler ¶ added in v1.0.0
type QueueItemNotifiedHandler func(event *QueueItemNotified)
QueueItemNotifiedHandler is called when an item notified event is generated for a Queue.
type ReplicatedMap ¶ added in v1.0.0
type ReplicatedMap struct {
// contains filtered or unexported fields
}
ReplicatedMap is a distributed key-value data structure where the data is replicated to all members in the cluster. It provides full replication of entries to all members for high speed access.
See https://docs.hazelcast.com/imdg/latest/data-structures/replicated-map.html for details.
Example ¶
ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Get a random replicated map replicatedMap, err := client.GetReplicatedMap(ctx, "replicated-map-1") if err != nil { log.Fatal(err) } // Populate map replacedValue, err := replicatedMap.Put(ctx, "key", "value") if err != nil { log.Fatal(err) } fmt.Println(replacedValue) // Get value and print value, err := replicatedMap.Get(ctx, "key") if err != nil { log.Fatal(err) } fmt.Println(value) // Shutdown client client.Shutdown(ctx)
Output:
func (*ReplicatedMap) AddEntryListener ¶ added in v1.0.0
func (m *ReplicatedMap) AddEntryListener(ctx context.Context, handler EntryNotifiedHandler) (types.UUID, error)
AddEntryListener adds a continuous entry listener to this map.
func (*ReplicatedMap) AddEntryListenerToKey ¶ added in v1.0.0
func (m *ReplicatedMap) AddEntryListenerToKey(ctx context.Context, key interface{}, handler EntryNotifiedHandler) (types.UUID, error)
AddEntryListenerToKey adds a continuous entry listener to this map.
func (*ReplicatedMap) AddEntryListenerToKeyWithPredicate ¶ added in v1.0.0
func (m *ReplicatedMap) AddEntryListenerToKeyWithPredicate(ctx context.Context, key interface{}, predicate predicate.Predicate, handler EntryNotifiedHandler) (types.UUID, error)
AddEntryListenerToKeyWithPredicate adds a continuous entry listener to this map.
func (*ReplicatedMap) AddEntryListenerWithPredicate ¶ added in v1.0.0
func (m *ReplicatedMap) AddEntryListenerWithPredicate(ctx context.Context, predicate predicate.Predicate, handler EntryNotifiedHandler) (types.UUID, error)
AddEntryListenerWithPredicate adds a continuous entry listener to this map.
func (*ReplicatedMap) Clear ¶ added in v1.0.0
func (m *ReplicatedMap) Clear(ctx context.Context) error
Clear deletes all entries one by one and fires related events
func (*ReplicatedMap) ContainsKey ¶ added in v1.0.0
func (m *ReplicatedMap) ContainsKey(ctx context.Context, key interface{}) (bool, error)
ContainsKey returns true if the map contains an entry with the given key
func (*ReplicatedMap) ContainsValue ¶ added in v1.0.0
func (m *ReplicatedMap) ContainsValue(ctx context.Context, value interface{}) (bool, error)
ContainsValue returns true if the map contains an entry with the given value
func (ReplicatedMap) Destroy ¶ added in v1.0.0
Destroy removes this object cluster-wide. Clears and releases all resources for this object.
func (*ReplicatedMap) Get ¶ added in v1.0.0
func (m *ReplicatedMap) Get(ctx context.Context, key interface{}) (interface{}, error)
Get returns the value for the specified key, or nil if this map does not contain this key. This function returns a clone of original value, modifying the returned value does not change the actual value in the map. One should put modified value back to make changes visible to all nodes.
func (*ReplicatedMap) GetEntrySet ¶ added in v1.0.0
GetEntrySet returns a clone of the mappings contained in this map.
func (*ReplicatedMap) GetKeySet ¶ added in v1.0.0
func (m *ReplicatedMap) GetKeySet(ctx context.Context) ([]interface{}, error)
GetKeySet returns keys contained in this map
func (*ReplicatedMap) GetValues ¶ added in v1.0.0
func (m *ReplicatedMap) GetValues(ctx context.Context) ([]interface{}, error)
GetValues returns a list clone of the values contained in this map
func (*ReplicatedMap) IsEmpty ¶ added in v1.0.0
func (m *ReplicatedMap) IsEmpty(ctx context.Context) (bool, error)
IsEmpty returns true if this map contains no key-value mappings.
func (*ReplicatedMap) Put ¶ added in v1.0.0
func (m *ReplicatedMap) Put(ctx context.Context, key interface{}, value interface{}) (interface{}, error)
Put sets the value for the given key and returns the old value.
func (*ReplicatedMap) PutAll ¶ added in v1.0.0
PutAll copies all the mappings from the specified map to this map. No atomicity guarantees are given. In the case of a failure, some key-value tuples may get written, while others are not.
func (*ReplicatedMap) Remove ¶ added in v1.0.0
func (m *ReplicatedMap) Remove(ctx context.Context, key interface{}) (interface{}, error)
Remove deletes the value for the given key and returns it.
func (*ReplicatedMap) RemoveEntryListener ¶ added in v1.0.0
RemoveEntryListener removes the specified entry listener.
type Set ¶ added in v1.0.0
type Set struct {
// contains filtered or unexported fields
}
Set is a concurrent, distributed set implementation.
Hazelcast Set is a distributed set which does not allow duplicate elements. For details, see: https://docs.hazelcast.com/imdg/latest/data-structures/set.html
Example ¶
ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Retrieve the set named my-set. set, err := client.GetSet(ctx, "my-set") if err != nil { log.Fatal(err) } _, err = set.AddAll(ctx, "item1", "item2", "item3", "item2", "item1") if err != nil { log.Fatal(err) } // Get the items. Note that there are no duplicates. items, err := set.GetAll(ctx) if err != nil { log.Fatal(err) } for _, item := range items { fmt.Println("Item:", item) }
Output:
func (*Set) Add ¶ added in v1.0.0
Add adds the given item to the set. Returns true if the item was not already in the set.
func (*Set) AddAll ¶ added in v1.0.0
AddAll adds the elements in the specified collection to this set. Returns true if the set is changed after the call.
func (*Set) AddItemListener ¶ added in v1.0.0
func (s *Set) AddItemListener(ctx context.Context, includeValue bool, handler SetItemNotifiedHandler) (types.UUID, error)
AddItemListener adds an item listener for this set. Listener will be notified for all set add/remove events. Received events include the updated item if includeValue is true.
func (*Set) ContainsAll ¶ added in v1.0.0
ContainsAll returns true if the set includes all given values.
func (Set) Destroy ¶ added in v1.0.0
Destroy removes this object cluster-wide. Clears and releases all resources for this object.
func (*Set) Remove ¶ added in v1.0.0
Remove removes the specified element from the set if it exists.
func (*Set) RemoveAll ¶ added in v1.0.0
RemoveAll removes all of the elements of the specified collection from this set. Returns true if the set was changed.
func (*Set) RemoveListener ¶ added in v1.0.0
RemoveListener removes the specified listener.
type SetItemNotified ¶ added in v1.0.0
type SetItemNotified struct { Value interface{} SetName string Member cluster.MemberInfo EventType ItemEventType }
SetItemNotified contains information about an item notified event. Member may have the zero value of cluster.MemberInfo if the member is not known at the time the corresponding callback runs. You can check that situation by checking whether Member.UUID is the default UUID.
func (SetItemNotified) EventName ¶ added in v1.0.0
func (q SetItemNotified) EventName() string
type SetItemNotifiedHandler ¶ added in v1.0.0
type SetItemNotifiedHandler func(event *SetItemNotified)
SetItemNotifiedHandler is called when an item notified event is generated for a Set.
type StatsConfig ¶ added in v1.0.0
type StatsConfig struct { // Enabled enables collecting statistics. Enabled bool `json:",omitempty"` // Period is the period of statistics collection. Period types.Duration `json:",omitempty"` }
StatsConfig contains configuration for Management Center.
func (*StatsConfig) Validate ¶ added in v1.0.0
func (c *StatsConfig) Validate() error
Validate validates the stats configuration and replaces missing configuration with defaults.
type Topic ¶ added in v1.0.0
type Topic struct {
// contains filtered or unexported fields
}
Topic is a distribution mechanism for publishing messages that are delivered to multiple subscribers, which is also known as a publish/subscribe (pub/sub) messaging model.
Publish and subscriptions are cluster-wide. When a member subscribes for a topic, it is actually registering for messages published by any member in the cluster, including the new members joined after you added the listener.
Messages are ordered, meaning that listeners(subscribers) will process the messages in the order they are actually published.
Example ¶
// messageListener handles incoming messages to the topic messageListener := func(event *hazelcast.MessagePublished) { fmt.Println("Received message: ", event.Value) } messageCount := 10 // Start the client with defaults ctx := context.TODO() client, err := hazelcast.StartNewClient(ctx) if err != nil { log.Fatal(err) } // Get a random topic rand.Seed(time.Now().Unix()) topicName := fmt.Sprintf("sample-%d", rand.Int()) topic, err := client.GetTopic(ctx, topicName) if err != nil { log.Fatal(err) } // Add a message listener to the topic topic.AddMessageListener(ctx, messageListener) // Publish messages to topic for i := 0; i < messageCount; i++ { topic.Publish(ctx, fmt.Sprintf("Message %d", i)) } // Shutdown client client.Shutdown(ctx)
Output:
func (*Topic) AddMessageListener ¶ added in v1.0.0
func (t *Topic) AddMessageListener(ctx context.Context, handler TopicMessageHandler) (types.UUID, error)
AddMessageListener adds a subscriber to this topic.
func (Topic) Destroy ¶ added in v1.0.0
Destroy removes this object cluster-wide. Clears and releases all resources for this object.
func (*Topic) Publish ¶ added in v1.0.0
Publish publishes the given message to all subscribers of this topic.
func (*Topic) PublishAll ¶ added in v1.0.0
PublishAll publishes all given messages to all subscribers of this topic.
type TopicMessageHandler ¶ added in v1.0.0
type TopicMessageHandler func(event *MessagePublished)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package aggregate provides aggregation functions.
|
Package aggregate provides aggregation functions. |
Package cluster contains functions and types needed to connect to a Hazelcast cluster.
|
Package cluster contains functions and types needed to connect to a Hazelcast cluster. |
examples
|
|
Package hzerrors provides sentinel errors.
|
Package hzerrors provides sentinel errors. |
proto/codec
* Copyright (c) 2008-2021, Hazelcast, Inc.
|
* Copyright (c) 2008-2021, Hazelcast, Inc. |
Package logger contains logging related API.
|
Package logger contains logging related API. |
Package predicate provides built-in predicates to use with distributed queries.
|
Package predicate provides built-in predicates to use with distributed queries. |
Package serialization contains serialization functions and types for Hazelcast Go client.
|
Package serialization contains serialization functions and types for Hazelcast Go client. |
Package types contains various helper types.
|
Package types contains various helper types. |