pulsar

package module
v0.0.0-...-20ba43e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

README

README.md

Conduit Connector Apache Pulsar

The Apache Pulsar connector is one of Conduit plugins. The connector provides both a source and a destination connector for Apache Pulsar.

How to build?

Run make build to build the connector.

Testing

Run make test to run all the unit tests. Run make test-integration to run the integration tests. Tests require Docker to be installed and running. The command will handle starting and stopping docker containers for you.

The Docker compose file at test/docker-compose.yml can be used to run the required resource locally.

Shared Configuration

The following table lists configuration options common to both source and destination connectors.

name description required default value
url URL of the Pulsar instance to connect to. true
topic Topic specifies the Pulsar topic to which the source / destination will interact with. true
connectionTimeout ConnectionTimeout specifies the duration for which the client will attempt to establish a connection before timing out. false
operationTimeout OperationTimeout is the duration after which an operation is considered to have timed out. false
maxConnectionsPerBroker MaxConnectionsPerBroker limits the number of connections to each broker. false
memoryLimitBytes MemoryLimitBytes sets the memory limit for the client in bytes. If the limit is exceeded, the client may start to block or fail operations. false
enableTransaction EnableTransaction determines if the client should support transactions. false
tlsKeyFilePath TLSKeyFilePath sets the path to the TLS key file false
tlsCertificateFile TLSCertificateFile sets the path to the TLS certificate file false
tlsTrustCertsFilePath TLSTrustCertsFilePath sets the path to the trusted TLS certificate file false
tlsAllowInsecureConnection TLSAllowInsecureConnection configures whether the internal Pulsar client accepts untrusted TLS certificate from broker (default: false) false
tlsValidateHostname TLSValidateHostname configures whether the Pulsar client verifies the validity of the host name from broker (default: false) false

Destination Configuration

The destination connector uses only the shared configuration options listed above.

Source Configuration

Additional to the shared configuration, the source connector has the following configurations.

name description required default value
subscriptionName SubscriptionName is the name of the subscription to be used for consuming messages. If none provided, a random uuid will be created as the name. false
subscriptionType SubscriptionType defines the type of subscription to use. Can be "exclusive", "shared", "failover", "key_shared". Default is "exclusive". false exclusive

Example pipeline.yml

Example of a pipeline.yml file using file to apache pulsar and apache pulsar to file pipelines:

version: 2.0
pipelines:
  - id: file-to-pulsar
    status: running
    connectors:
      - id: file.in
        type: source
        plugin: builtin:file
        name: file-destination
        settings:
          path: ./file.in
      - id: pulsar.out
        type: destination
        plugin: standalone:pulsar
        name: pulsar-source
        settings:
          url: pulsar://localhost:6650
          topic: demo-topic
          sdk.record.format: template
          sdk.record.format.options: '{{ printf "%s" .Payload.After }}'

  - id: pulsar-to-file
    status: running
    connectors:
      - id: pulsar.in
        type: source
        plugin: standalone:pulsar
        name: pulsar-source
        settings:
          url: pulsar://localhost:6650
          topic: demo-topic
          subscriptionName: demo-topic-subscription

      - id: file.out
        type: destination
        plugin: builtin:file
        name: file-destination
        settings:
          path: ./file.out
          sdk.record.format: template
          sdk.record.format.options: '{{ printf "%s" .Payload.After }}'

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Connector = sdk.Connector{
	NewSpecification: Specification,
	NewSource:        NewSource,
	NewDestination:   NewDestination,
}

Connector combines all constructors for each plugin in one struct.

Functions

func NewDestination

func NewDestination() sdk.Destination

func NewSource

func NewSource() sdk.Source

func Specification

func Specification() sdk.Specification

Specification returns the connector's specification.

Types

type Config

type Config struct {
	// URL of the Pulsar instance to connect to.
	URL string `json:"url" validate:"required"`

	// Topic specifies the Pulsar topic used by the connector.
	Topic string `json:"topic" validate:"required"`

	// ConnectionTimeout specifies the duration for which the client will
	// attempt to establish a connection before timing out.
	ConnectionTimeout time.Duration `json:"connectionTimeout"`

	// OperationTimeout is the duration after which an operation is considered
	// to have timed out.
	OperationTimeout time.Duration `json:"operationTimeout"`

	// MaxConnectionsPerBroker limits the number of connections to each broker.
	MaxConnectionsPerBroker int `json:"maxConnectionsPerBroker"`

	// MemoryLimitBytes sets the memory limit for the client in bytes.
	// If the limit is exceeded, the client may start to block or fail operations.
	MemoryLimitBytes int64 `json:"memoryLimitBytes"`

	// EnableTransaction determines if the client should support transactions.
	EnableTransaction bool `json:"enableTransaction"`

	// TLSKeyFilePath sets the path to the TLS key file
	TLSKeyFilePath string `json:"tlsKeyFilePath"`

	// TLSCertificateFile sets the path to the TLS certificate file
	TLSCertificateFile string `json:"tlsCertificateFile"`

	// TLSTrustCertsFilePath sets the path to the trusted TLS certificate file
	TLSTrustCertsFilePath string `json:"tlsTrustCertsFilePath"`

	// TLSAllowInsecureConnection configures whether the internal Pulsar client accepts untrusted TLS certificate from broker (default: false)
	TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection"`

	// TLSValidateHostname configures whether the Pulsar client verifies the validity of the host name from broker (default: false)
	TLSValidateHostname bool `json:"tlsValidateHostname"`
	// contains filtered or unexported fields
}

type Destination

type Destination struct {
	sdk.UnimplementedDestination
	// contains filtered or unexported fields
}

func (*Destination) Configure

func (d *Destination) Configure(_ context.Context, cfg map[string]string) error

func (*Destination) Open

func (d *Destination) Open(ctx context.Context) (err error)

func (*Destination) Parameters

func (d *Destination) Parameters() map[string]sdk.Parameter

func (*Destination) Teardown

func (d *Destination) Teardown(ctx context.Context) error

func (*Destination) Write

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error)

type DestinationConfig

type DestinationConfig struct {
	Config
}

func (DestinationConfig) Parameters

func (DestinationConfig) Parameters() map[string]sdk.Parameter

type Position

type Position struct {
	MessageID        []byte `json:"messageID"`
	SubscriptionName string `json:"subscriptionName"`
}

func (Position) ToSDKPosition

func (p Position) ToSDKPosition() sdk.Position

type Source

type Source struct {
	sdk.UnimplementedSource
	// contains filtered or unexported fields
}

func (*Source) Ack

func (s *Source) Ack(ctx context.Context, position sdk.Position) error

func (*Source) Configure

func (s *Source) Configure(ctx context.Context, cfg map[string]string) error

func (*Source) Open

func (s *Source) Open(ctx context.Context, pos sdk.Position) (err error)

func (*Source) Parameters

func (s *Source) Parameters() map[string]sdk.Parameter

func (*Source) Read

func (s *Source) Read(ctx context.Context) (sdk.Record, error)

func (*Source) Teardown

func (s *Source) Teardown(ctx context.Context) error

type SourceConfig

type SourceConfig struct {
	Config

	// SubscriptionName is the name of the subscription to be used for
	// consuming messages.
	SubscriptionName string `json:"subscriptionName"`
}

func (SourceConfig) Parameters

func (SourceConfig) Parameters() map[string]sdk.Parameter

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL