grpcclient

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2023 License: Apache-2.0 Imports: 27 Imported by: 0

README

Conduit Connector for gRPC Client

The gRPC Client connector is one of Conduit plugins. It provides a destination gRPC Client connector.

This connector should be paired with another Conduit instance or pipeline, that provides a gRPC server source. Where the client will initiate the connection with the server, and start sending records to it.

How to build?

Run make build to build the connector.

Testing

Run make test to run all the unit tests.

Destination

A client gRPC destination connector initiates connection with a gRPC server using the url provided as a parameter. It creates a bidirectional stream with the server and uses the stream to write records to the server, then waits for acknowledgments to be received from the server through the same stream.

Configuration
name description required default value
url url to gRPC server. true
rateLimit the bandwidth limit in bytes/second, use 0 to disable rate limiting. false 0
reconnectDelay delay between each gRPC request retry. false 5s
maxDowntime max downtime accepted for the server to be off. false 10m
mtls.disabled option to disable mTLS secure connection, set it to true for an insecure connection. false false
mtls.client.certPath the client certificate path. required if mtls.disabled is false
mtls.client.keyPath the client private key path. required if mtls.disabled is false
mtls.ca.certPath the root CA certificate path. required if mtls.disabled is false

Mutual TLS (mTLS)

Mutual TLS is used by default to connect to the server, to disable mTLS you can set the parameter mtls.disabled to true, this will result in an insecure connection to the server.

This repo contains self-signed certificates that can be used for local testing purposes, you can find them under ./test/certs, note that these certificates are not meant to be used in production environment.

To generate your own secure mTLS certificates, check this tutorial.

Planned work

  • Add a source for gRPC client.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Connector = sdk.Connector{
	NewSpecification: Specification,
	NewSource:        nil,
	NewDestination:   NewDestination,
}

Connector combines all constructors for each plugin in one struct.

Functions

func AttachPositionIndex

func AttachPositionIndex(p sdk.Position, index uint32) sdk.Position

func NewDestination

func NewDestination() sdk.Destination

func NewDestinationWithDialer

func NewDestinationWithDialer(dialer func(ctx context.Context, _ string) (net.Conn, error)) sdk.Destination

NewDestinationWithDialer for testing purposes.

func Specification

func Specification() sdk.Specification

Specification returns the connector's specification.

Types

type AckManager

type AckManager struct {
	// contains filtered or unexported fields
}

func NewAckManager

func NewAckManager(sm *StreamManager) *AckManager

func (*AckManager) Expect

func (am *AckManager) Expect(expected []sdk.Position) error

Expect lets the ack manager know what acks to expect in the next batch, If there are still open acks to be received from the previous batch, Expect returns an error. has to be called after Run and before Wait.

func (*AckManager) Got

func (am *AckManager) Got() int

Got returns the deduplicated acknowledgments we have received so far.

func (*AckManager) Run

func (am *AckManager) Run(ctx context.Context) error

Run is a blocking method that listen to acks and validates them, It returns an error if the context is cancelled or if an unrecoverable error happens.

func (*AckManager) Wait

func (am *AckManager) Wait(ctx context.Context) (int, error)

Wait blocks until all acks are received or the connection drops while waiting for acks, or the context was canceled. if the connection drops it returns io.EOF, if the context gets closed it returns the context error, otherwise it returns nil.

type Config

type Config struct {
	// url to gRPC server
	URL string `json:"url" validate:"required"`
	// the bandwidth limit in bytes/second, use "0" to disable rate limiting.
	RateLimit int `json:"rateLimit" default:"0" validate:"gt=-1"`
	// delay between each gRPC request retry.
	ReconnectDelay time.Duration `json:"reconnectDelay" default:"5s"`
	// max downtime accepted for the server to be off.
	MaxDowntime time.Duration `json:"maxDowntime" default:"10m"`
	// mTLS configurations.
	MTLS MTLSConfig `json:"mtls"`
}

Config has the generic parameters needed for a gRPC client

type DestConfig

type DestConfig struct {
	Config
}

func (DestConfig) Parameters

func (DestConfig) Parameters() map[string]sdk.Parameter

type Destination

type Destination struct {
	sdk.UnimplementedDestination
	// contains filtered or unexported fields
}

func (*Destination) Configure

func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error

func (*Destination) Open

func (d *Destination) Open(ctx context.Context) error

func (*Destination) Parameters

func (d *Destination) Parameters() map[string]sdk.Parameter

func (*Destination) Teardown

func (d *Destination) Teardown(ctx context.Context) error

func (*Destination) Write

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error)

type MTLSConfig

type MTLSConfig struct {
	// the client certificate path.
	ClientCertPath string `json:"client.certPath"`
	// the client private key path.
	ClientKeyPath string `json:"client.keyPath"`
	// the root CA certificate path.
	CACertPath string `json:"ca.certPath"`
	// option to disable mTLS secure connection, set it to `true` for an insecure connection.
	Disabled bool `json:"disabled" default:"false"`
}

func (*MTLSConfig) ParseMTLSFiles

func (mc *MTLSConfig) ParseMTLSFiles() (tls.Certificate, *x509.CertPool, error)

ParseMTLSFiles parses and validates mTLS params values, returns the parsed client certificate, and CA certificate pool, and an error if the parsing fails

type Position

type Position struct {
	Index    uint32
	Original []byte
}

func ToRecordPosition

func ToRecordPosition(p sdk.Position) Position

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

func NewStreamManager

func NewStreamManager(ctx context.Context, conn *grpc.ClientConn, reconnectDelay, maxDowntime time.Duration) (*StreamManager, error)

func (*StreamManager) Get

Get blocks until a stream is available. If the context gets closed it returns the context error.

func (*StreamManager) Run

func (sm *StreamManager) Run(ctx context.Context) (err error)

Run a blocking method that monitors the status of the stream connection, If the stream gets closed it waits for the connection to come up again and reestablishes the stream again.

func (*StreamManager) StreamDone

func (sm *StreamManager) StreamDone(ctx context.Context) (chan struct{}, error)

StreamDone returns a channel that will be closed when the connection to the server is lost.

Directories

Path Synopsis
cmd
proto
v1
Package v1 is a generated GoMock package.
Package v1 is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL