pool

package
v1.0.0-rc.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2023 License: Apache-2.0 Imports: 29 Imported by: 19

Documentation

Overview

Package pool provides a wrapper for several NeoFS API clients.

The main component is Pool type. It is a virtual connection to the network and provides methods for executing operations on the server. It also supports a weighted random selection of the underlying client to make requests.

Pool has an auto-session mechanism for object operations. It is enabled by default. The mechanism allows to manipulate objects like upload, download, delete, etc, without explicit session passing. This behavior may be disabled per request by calling IgnoreSession() on the appropriate Prm* argument. Note that if auto-session is disabled, the user MUST provide the appropriate session manually for PUT and DELETE object operations. The user may provide session, for another object operations.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type InitParameters

type InitParameters struct {
	// contains filtered or unexported fields
}

InitParameters contains values used to initialize connection Pool.

func DefaultOptions

func DefaultOptions() InitParameters

DefaultOptions returns default option preset for Pool creation. It may be used like start point for configuration or like main configuration.

func (*InitParameters) AddNode

func (x *InitParameters) AddNode(nodeParam NodeParam)

AddNode append information about the node to which you want to connect.

func (*InitParameters) SetClientRebalanceInterval

func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration)

SetClientRebalanceInterval specifies the interval for updating nodes health status.

See also Pool.Dial.

func (*InitParameters) SetErrorThreshold

func (x *InitParameters) SetErrorThreshold(threshold uint32)

SetErrorThreshold specifies the number of errors on connection after which node is considered as unhealthy.

func (*InitParameters) SetHealthcheckTimeout

func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration)

SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.

See also Pool.Dial.

func (*InitParameters) SetLogger

func (x *InitParameters) SetLogger(logger *zap.Logger)

SetLogger specifies logger.

func (*InitParameters) SetNodeDialTimeout

func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration)

SetNodeDialTimeout specifies the timeout for connection to be established.

func (*InitParameters) SetNodeStreamTimeout

func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration)

SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.

func (*InitParameters) SetSessionExpirationDuration

func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64)

SetSessionExpirationDuration specifies the session token lifetime in epochs.

func (*InitParameters) SetSigner

func (x *InitParameters) SetSigner(signer user.Signer)

SetSigner specifies default signer to be used for the protocol communication by default. MUST be of neofscrypto.ECDSA_DETERMINISTIC_SHA256 scheme, for example, neofsecdsa.SignerRFC6979 can be used.

func (*InitParameters) SetStatisticCallback

func (x *InitParameters) SetStatisticCallback(statisticCallback stat.OperationCallback)

SetStatisticCallback makes the Pool to pass stat.OperationCallback for external statistic.

type NodeParam

type NodeParam struct {
	// contains filtered or unexported fields
}

NodeParam groups parameters of remote node.

func NewFlatNodeParams

func NewFlatNodeParams(endpoints []string) []NodeParam

NewFlatNodeParams converts endpoints to appropriate NodeParam. It is useful for situations where all endpoints are equivalent.

func NewNodeParam

func NewNodeParam(priority int, address string, weight float64) (prm NodeParam)

NewNodeParam creates NodeParam using parameters. Address parameter MUST follow the client requirements, see sdkClient.PrmDial.SetServerURI for details.

func (*NodeParam) SetAddress

func (x *NodeParam) SetAddress(address string)

SetAddress specifies address of the node.

func (*NodeParam) SetPriority

func (x *NodeParam) SetPriority(priority int)

SetPriority specifies priority of the node. Negative value is allowed. In the result node groups with the same priority will be sorted by descent.

func (*NodeParam) SetWeight

func (x *NodeParam) SetWeight(weight float64)

SetWeight specifies weight of the node.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents virtual connection to the NeoFS network to communicate with multiple NeoFS servers without thinking about switching between servers due to load balancing proportions or their unavailability. It is designed to provide a convenient abstraction from the multiple sdkClient.client types.

Pool can be created and initialized using NewPool function. Before executing the NeoFS operations using the Pool, connection to the servers MUST BE correctly established (see Dial method). Using the Pool before connecting have been established can lead to a panic. After the work, the Pool SHOULD BE closed (see Close method): it frees internal and system resources which were allocated for the period of work of the Pool. Calling Dial/Close methods during the communication process step strongly discouraged as it leads to undefined behavior.

Each method which produces a NeoFS API call may return an error. Status of underlying server response is casted to built-in error instance. Certain statuses can be checked using `sdkClient` and standard `errors` packages.

See pool package overview to get some examples.

func New

func New(endpoints []NodeParam, signer user.Signer, options InitParameters) (*Pool, error)

New creates connection pool using simple set of endpoints and parameters.

See also pool.DefaultOptions and pool.NewFlatNodeParams for details.

Returned errors:

Example (AdjustingParameters)
package main

import (
	"crypto/ecdsa"
	"crypto/elliptic"
	"crypto/rand"

	"github.com/nspcc-dev/neofs-sdk-go/pool"
	"github.com/nspcc-dev/neofs-sdk-go/user"
)

func main() {
	// The key is generated to simplify the example, in reality it's likely to come from configuration/wallet.
	pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
	signer := user.NewAutoIDSignerRFC6979(*pk)

	opts := pool.DefaultOptions()
	opts.SetErrorThreshold(10)

	p, _ := pool.New(
		pool.NewFlatNodeParams([]string{"grpc://localhost:8080", "grpcs://localhost:8081"}),
		signer,
		opts,
	)
	_ = p
}
Output:

Example (EasiestWay)
package main

import (
	"crypto/ecdsa"
	"crypto/elliptic"
	"crypto/rand"

	"github.com/nspcc-dev/neofs-sdk-go/pool"
	"github.com/nspcc-dev/neofs-sdk-go/user"
)

func main() {
	// The key is generated to simplify the example, in reality it's likely to come from configuration/wallet.
	pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
	signer := user.NewAutoIDSignerRFC6979(*pk)

	p, _ := pool.New(
		pool.NewFlatNodeParams([]string{"grpc://localhost:8080", "grpcs://localhost:8081"}),
		signer,
		pool.DefaultOptions(),
	)
	_ = p
}
Output:

func NewPool

func NewPool(options InitParameters) (*Pool, error)

NewPool creates connection pool using parameters.

Returned errors:

Example

Create pool instance with 3 nodes connection. This InitParameters will make pool use 192.168.130.71 node while it is healthy. Otherwise, it will make the pool use 192.168.130.72 for 90% of requests and 192.168.130.73 for remaining 10%.

package main

import (
	"github.com/nspcc-dev/neofs-sdk-go/pool"
	"github.com/nspcc-dev/neofs-sdk-go/user"
)

func main() {
	// import "github.com/nspcc-dev/neofs-sdk-go/user"

	var signer user.Signer
	var prm pool.InitParameters
	prm.SetSigner(signer)
	prm.AddNode(pool.NewNodeParam(1, "192.168.130.71", 1))
	prm.AddNode(pool.NewNodeParam(2, "192.168.130.72", 9))
	prm.AddNode(pool.NewNodeParam(2, "192.168.130.73", 1))
	// ...

	p, err := pool.NewPool(prm)

	_ = p
	_ = err
}
Output:

func (*Pool) BalanceGet

func (p *Pool) BalanceGet(ctx context.Context, prm client.PrmBalanceGet) (accounting.Decimal, error)

BalanceGet requests current balance of the NeoFS account.

See details in [client.Client.BalanceGet].

func (*Pool) Close

func (p *Pool) Close()

Close closes the Pool and releases all the associated resources.

func (*Pool) ContainerDelete

func (p *Pool) ContainerDelete(ctx context.Context, id cid.ID, signer neofscrypto.Signer, prm client.PrmContainerDelete) error

ContainerDelete sends request to remove the NeoFS container.

See details in [client.Client.ContainerDelete].

func (*Pool) ContainerEACL

func (p *Pool) ContainerEACL(ctx context.Context, id cid.ID, prm client.PrmContainerEACL) (eacl.Table, error)

ContainerEACL reads eACL table of the NeoFS container.

See details in [client.Client.ContainerEACL].

func (*Pool) ContainerGet

func (p *Pool) ContainerGet(ctx context.Context, id cid.ID, prm client.PrmContainerGet) (container.Container, error)

ContainerGet reads NeoFS container by ID.

See details in [client.Client.ContainerGet].

func (*Pool) ContainerList

func (p *Pool) ContainerList(ctx context.Context, ownerID user.ID, prm client.PrmContainerList) ([]cid.ID, error)

ContainerList requests identifiers of the account-owned containers.

See details in [client.Client.ContainerList].

func (*Pool) ContainerPut

func (p *Pool) ContainerPut(ctx context.Context, cont container.Container, signer neofscrypto.Signer, prm client.PrmContainerPut) (cid.ID, error)

ContainerPut sends request to save container in NeoFS.

See details in [client.Client.ContainerPut].

Example
package main

import (
	"context"

	"github.com/nspcc-dev/neofs-sdk-go/client"
	"github.com/nspcc-dev/neofs-sdk-go/container"

	neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
	"github.com/nspcc-dev/neofs-sdk-go/pool"
	"github.com/nspcc-dev/neofs-sdk-go/waiter"
)

func main() {
	// import "github.com/nspcc-dev/neofs-sdk-go/waiter"
	// import "github.com/nspcc-dev/neofs-sdk-go/container"
	// import neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"

	var p pool.Pool
	// ... init pool

	// Connect to the NeoFS server
	_ = p.Dial(context.Background())

	var cont container.Container
	// ... fill container

	var signer neofscrypto.Signer
	// ... create signer

	var prmPut client.PrmContainerPut
	// ... fill params, if required

	// waits until container created or context canceled.
	w := waiter.NewContainerPutWaiter(&p, waiter.DefaultPollInterval)

	containerID, err := w.ContainerPut(context.Background(), cont, signer, prmPut)

	_ = containerID
	_ = err
}
Output:

func (*Pool) ContainerSetEACL

func (p *Pool) ContainerSetEACL(ctx context.Context, table eacl.Table, signer user.Signer, prm client.PrmContainerSetEACL) error

ContainerSetEACL sends request to update eACL table of the NeoFS container.

See details in [client.Client.ContainerSetEACL].

func (*Pool) Dial

func (p *Pool) Dial(ctx context.Context) error

Dial establishes a connection to the servers from the NeoFS network. It also starts a routine that checks the health of the nodes and updates the weights of the nodes for balancing. Returns an error describing failure reason.

If failed, the Pool SHOULD NOT be used.

See also InitParameters.SetClientRebalanceInterval.

func (*Pool) NetMapSnapshot

func (p *Pool) NetMapSnapshot(ctx context.Context, prm client.PrmNetMapSnapshot) (netmap.NetMap, error)

NetMapSnapshot requests current network view of the remote server.

See details in [client.Client.NetMapSnapshot].

func (*Pool) NetworkInfo

func (p *Pool) NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error)

NetworkInfo requests information about the NeoFS network of which the remote server is a part.

See details in [client.Client.NetworkInfo].

func (*Pool) ObjectDelete

func (p *Pool) ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)

ObjectDelete marks an object for deletion from the container using NeoFS API protocol.

Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.

See details in [client.Client.ObjectDelete].

func (*Pool) ObjectGetInit

func (p *Pool) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)

ObjectGetInit initiates reading an object through a remote server using NeoFS API protocol.

Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.

See details in [client.Client.ObjectGetInit].

Example (ExplicitAutoSessionDisabling)
package main

import (
	"context"
	"crypto/ecdsa"
	"crypto/elliptic"
	"crypto/rand"

	"github.com/nspcc-dev/neofs-sdk-go/client"
	cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
	"github.com/nspcc-dev/neofs-sdk-go/object"

	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
	"github.com/nspcc-dev/neofs-sdk-go/pool"
	"github.com/nspcc-dev/neofs-sdk-go/user"
)

func main() {
	// The key is generated to simplify the example, in reality it's likely to come from configuration/wallet.
	pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
	signer := user.NewAutoIDSignerRFC6979(*pk)

	p, _ := pool.New(
		pool.NewFlatNodeParams([]string{"grpc://localhost:8080", "grpcs://localhost:8081"}),
		signer,
		pool.DefaultOptions(),
	)

	_ = p.Dial(context.Background())

	var prm client.PrmObjectGet
	// If you don't provide the session manually with prm.WithinSession, the request will be executed without session.
	prm.IgnoreSession()

	var ownerID user.ID
	var hdr object.Object
	hdr.SetContainerID(cid.ID{})
	hdr.SetOwnerID(&ownerID)

	var containerID cid.ID
	// fill containerID
	var objetID oid.ID
	// fill objectID

	// In case of a session wasn't provided with prm.WithinSession, the signer must be for account which is a container
	// owner, otherwise there will be an error.

	// In case of a session was provided with prm.WithinSession, the signer can be ether container owner account or
	// third party, who can use a session token signed by container owner.
	_, _, _ = p.ObjectGetInit(context.Background(), containerID, objetID, signer, prm)

	// ...
}
Output:

func (*Pool) ObjectHash

func (p *Pool) ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error)

ObjectHash requests checksum of the range list of the object payload using

Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.

See details in [client.Client.ObjectHash].

func (*Pool) ObjectHead

func (p *Pool) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)

ObjectHead reads object header through a remote server using NeoFS API protocol.

Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.

See details in [client.Client.ObjectHead].

Example
package main

import (
	"context"
	"errors"

	"github.com/nspcc-dev/neofs-sdk-go/client"
	apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"

	cid "github.com/nspcc-dev/neofs-sdk-go/container/id"

	oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
	"github.com/nspcc-dev/neofs-sdk-go/pool"
	"github.com/nspcc-dev/neofs-sdk-go/user"
)

func main() {
	// import "github.com/nspcc-dev/neofs-sdk-go/waiter"
	// import "github.com/nspcc-dev/neofs-sdk-go/container"
	// import "github.com/nspcc-dev/neofs-sdk-go/user"

	var p pool.Pool
	// ... init pool

	// Connect to the NeoFS server
	_ = p.Dial(context.Background())

	var signer user.Signer
	// ... create signer

	var prmHead client.PrmObjectHead
	// ... fill params, if required

	hdr, err := p.ObjectHead(context.Background(), cid.ID{}, oid.ID{}, signer, prmHead)
	if err != nil {
		if errors.Is(err, apistatus.ErrObjectNotFound) {
			return
		}

		// ...
	}

	_ = hdr

	p.Close()
}
Output:

func (*Pool) ObjectPutInit

func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)

ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol.

Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.

See details in [client.Client.ObjectPutInit].

Example (AutoSessionDisabling)
package main

import (
	"context"
	"crypto/ecdsa"
	"crypto/elliptic"
	"crypto/rand"

	"github.com/nspcc-dev/neofs-sdk-go/client"
	cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
	"github.com/nspcc-dev/neofs-sdk-go/object"
	"github.com/nspcc-dev/neofs-sdk-go/pool"
	"github.com/nspcc-dev/neofs-sdk-go/session"
	"github.com/nspcc-dev/neofs-sdk-go/user"
)

func main() {
	// The key is generated to simplify the example, in reality it's likely to come from configuration/wallet.
	pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
	signer := user.NewAutoIDSignerRFC6979(*pk)

	p, _ := pool.New(
		pool.NewFlatNodeParams([]string{"grpc://localhost:8080", "grpcs://localhost:8081"}),
		signer,
		pool.DefaultOptions(),
	)

	_ = p.Dial(context.Background())

	// Session should be initialized with Client.SessionCreate function.
	var sess session.Object

	var prm client.PrmObjectPutInit
	// Auto-session disabled, because you provided session already.
	prm.WithinSession(sess)

	// For ObjectPutInit operation prm.IgnoreSession shouldn't be called ever, because putObject without session is not
	// acceptable, and it will be an error.
	// prm.IgnoreSession()

	var ownerID user.ID
	var hdr object.Object
	hdr.SetContainerID(cid.ID{})
	hdr.SetOwnerID(&ownerID)

	// In case of a session wasn't provided with prm.WithinSession, the signer must be for account which is a container
	// owner, otherwise there will be an error.

	// In case of a session was provided with prm.WithinSession, the signer can be ether container owner account or
	// third party, who can use a session token signed by container owner.
	_, _ = p.ObjectPutInit(context.Background(), hdr, signer, prm)

	// ...
}
Output:

func (*Pool) ObjectRangeInit

func (p *Pool) ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID oid.ID, offset, length uint64, signer user.Signer, prm client.PrmObjectRange) (*client.ObjectRangeReader, error)

ObjectRangeInit initiates reading an object's payload range through a remote

Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.

See details in [client.Client.ObjectRangeInit].

func (*Pool) ObjectSearchInit

func (p *Pool) ObjectSearchInit(ctx context.Context, containerID cid.ID, signer user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)

ObjectSearchInit initiates object selection through a remote server using NeoFS API protocol.

Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.

See details in [client.Client.ObjectSearchInit].

func (*Pool) RawClient

func (p *Pool) RawClient() (*sdkClient.Client, error)

RawClient returns single client instance to have possibility to work with exact one.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL