pulsarpubsub

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"github.com/sraphs/gdk/pubsub"
)

func main() {
	// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/pulsarpubsub"
	// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// The host + path are used as the consumer group name.
	// The "topic" query parameter sets one or more topics to subscribe to.
	// The set of brokers must be in an environment variable KAFKA_BROKERS.
	subscription, err := pubsub.OpenSubscription(ctx,
		"pulsar://my-sub?topic=my-topic")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"github.com/sraphs/gdk/pubsub"
)

func main() {
	// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/pulsarpubsub"
	// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
	// The host + path are the topic name to send to.
	// The set of brokers must be in an environment variable KAFKA_BROKERS.
	topic, err := pubsub.OpenTopic(ctx, "pulsar://my-topic")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "pulsar"

Scheme is the URL scheme pulsarpubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func MinimalConfig

func MinimalConfig(url string) pulsar.ClientOptions

MinimalConfig returns a minimal pulsar.ClientOptions.

func OpenSubscription

func OpenSubscription(client pulsar.Client, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription returns a *pubsub.Subscription representing a Redis Subscribe. The topicName is the Pulsar Channel to subscribe to; for more info, see https://pulsar.apache.org/docs/next/concepts-topic-compaction.

Example
package main

import (
	"context"
	"log"

	"github.com/apache/pulsar-client-go/pulsar"

	"github.com/sraphs/gdk/pubsub/pulsarpubsub"
)

func main() {
	// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
	ctx := context.Background()

	localPulsarURL := "pulsar://localhost:6650"
	config := pulsarpubsub.MinimalConfig(localPulsarURL)
	client, err := pulsar.NewClient(config)
	if err != nil {
		log.Fatal(err)
	}
	// Construct a *pubsub.Subscription, use the SubscriptionName "my-sub"
	// and receiving messages from "my-topic".
	subscription, err := pulsarpubsub.OpenSubscription(client, &pulsarpubsub.SubscriptionOptions{
		ConsumerOptions: pulsar.ConsumerOptions{
			Topic:            "my-topic",
			SubscriptionName: "my-sub",
		},
		KeyName: "",
	})
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

func OpenTopic

func OpenTopic(client pulsar.Client, opts *TopicOptions) (*pubsub.Topic, error)

OpenTopic returns a *pubsub.Topic for use with Redis. The channel is the Redis Chanel; for more info, see https://redis.io/commands/pubsub-channels.

Example
package main

import (
	"context"
	"log"

	"github.com/apache/pulsar-client-go/pulsar"

	"github.com/sraphs/gdk/pubsub/pulsarpubsub"
)

func main() {
	// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line.
	ctx := context.Background()

	localPulsarURL := "pulsar://localhost:6650"
	config := pulsarpubsub.MinimalConfig(localPulsarURL)
	client, err := pulsar.NewClient(config)
	if err != nil {
		log.Fatal(err)
	}

	// Construct a *pubsub.Topic.
	topic, err := pulsarpubsub.OpenTopic(client, &pulsarpubsub.TopicOptions{
		ProducerOptions: pulsar.ProducerOptions{
			Topic: "my-topic",
		},
		KeyName: "",
	})
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	pulsar.ConsumerOptions
	KeyName string
}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by Pulsar.

type TopicOptions

type TopicOptions struct {
	pulsar.ProducerOptions
	KeyName string
}

TopicOptions sets options for constructing a *pubsub.Topic backed by Pulsar.

type URLOpener

type URLOpener struct {
	// Client to use for communication with the server.
	Client pulsar.Client

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens Redis URLs like "redis://my-topic".

The URL host+path is used as the subject.

No query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL