google

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2020 License: GPL-3.0 Imports: 7 Imported by: 0

Documentation

Overview

Package google implement elMagician pubsub interfaces for GCP Pubsub provider.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrPublisherDestroyed = errors.New("publisher instance was destroyed")

Functions

func NewPubsub

func NewPubsub(ctx context.Context, config Config, opts ...option.ClientOption) (pubsub.Pubsub, error)

NewPubsub initializes a GCP implementation for pubsub.

Example (WithOption)
conf := google.Config{
	ProjectID:       "aSuperCoolProject",
	CredentialsPath: "path/to/credentials.json",
}

_, err := google.NewPubsub(
	context.Background(), conf,
	option.WithCredentialsFile("some/other/credentials.json"),
	option.WithEndpoint("dont.evil/know/where"),
)
if err != nil {
	panic(err)
}

// in this example, client uses credentials path from Config. Passing an option will not override
// credentials values.
Output:

Example (WithoutOption)
conf := google.Config{
	ProjectID:       "aSuperCoolProject",
	CredentialsPath: "path/to/credentials.json",
	Timeout:         10 * time.Second,
	Concurrency:     10,
}

_, err := google.NewPubsub(context.Background(), conf)
if err != nil {
	panic(err)
}
Output:

Types

type Config

type Config struct {
	// ProjectID in GCP
	ProjectID string `yaml:"projectId" json:"projectId"`

	// CredentialsPath to your JSON credential provided by GCP.
	CredentialsPath string `yaml:"credentialsPath" json:"credentialsPath"`

	// Concurrency is the default concurrency for listening process.
	Concurrency int `yaml:"concurrency" json:"concurrency"`

	// Timeout is the default timeout for GCP calls
	Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

Config for pubsub instance

type Listener

type Listener struct{}

func (Listener) Listen

func (l Listener) Listen(ctx context.Context)

func (Listener) OnError

func (l Listener) OnError() chan error

func (Listener) OnMessage

func (l Listener) OnMessage(envelop pubsub.Envelop, newMessage func() pubsub.Message) chan pubsub.Message

func (Listener) OnUnmatched

func (l Listener) OnUnmatched() chan pubsub.Message

func (Listener) Stop

func (l Listener) Stop()

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher implements pubsub.Publisher interface for GCP.

func (*Publisher) Destroy

func (p *Publisher) Destroy()

func (*Publisher) Send

Send message to topics registered. Any new topic will be saved to the registry if Destroy was not called. If Destroy was called, all topics will have their connection stopped and known topics will be kept in registry.

Send will return a single string ID if sending to a single topic, else a list of string.

Example
var conn *grpc.ClientConn
var yourEnvelop *Envelop // we are using a mock implementation here
// This setup a test instance for Google Pubsub.
// You don't need it outside of unit testing.
{
	srv, cli := initTestClient("aSuperCoolProject")
	defer func() {
		if err := srv.Close(); err != nil {
			panic(err)
		}
	}()

	_, err := cli.CreateTopic(context.Background(), "mine")
	if err != nil {
		panic(err)
	}
	_, err = cli.CreateTopic(context.Background(), "tropical")
	if err != nil {
		panic(err)
	}
	_, err = cli.CreateTopic(context.Background(), "someTopic")
	if err != nil {
		panic(err)
	}

	conn, err = grpc.Dial(srv.Addr, grpc.WithInsecure())
	if err != nil {
		panic(err)
	}

	yourEnvelop = &Envelop{}
	yourEnvelop.message = &message{
		Message: &googlePubSub.Message{
			ID:         "test",
			Data:       []byte("someData"),
			Attributes: map[string]string{"version": "v1", "type": "test", "new": "false"},
		},
	}
}

ctx := context.Background()
conf := google.Config{
	ProjectID:       "aSuperCoolProject",
	CredentialsPath: "path/to/credentials.json",
	Timeout:         10 * time.Second,
	Concurrency:     0,
}

// Initialize pubsub instance.
ps, err := google.NewPubsub(ctx, conf, option.WithGRPCConn(conn))
if err != nil {
	// TODO manage error
	panic(err)
	return
}

// Register topics that will be used from the instance.
ps.Registry().
	MustAddTopic("topicAnna", nil). // Will not fail if topic does not exists using MustAddTopic. Use AddTopic to check existence on add.
	MustAddTopic("tropical", nil)

if err := ps.Registry().AddTopic("mine", nil); err != nil {
	// TODO manager error
	panic(err)
	return
}

// Send message to registered topics
results, err := ps.Publish().To("mine").Send(ctx, yourEnvelop)
if err != nil {
	// TODO manage error
	fmt.Println(err.Error())
	return
}

idsStr := ""
for topic, res := range results.Results(context.Background()) {
	idsStr += topic + ": " + res.ID
}

idsStr = strings.TrimSuffix(idsStr, ",")
fmt.Println("Msg:", idsStr)

// Send message to registered topics
results, err = ps.Publish().To("tropical", "mine").Send(ctx, yourEnvelop)
if err != nil {
	// TODO manage error
	fmt.Println(err.Error())
	return
}

idsStr = ""
for topic, res := range results.Results(context.Background()) {
	if res.Error != nil {
		fmt.Println("got unexpected error", res.Error)
	}
	if topic == "tropical" {
		idsStr = topic + ", " + idsStr
	} else {
		idsStr += topic
	}
}

idsStr = strings.TrimSuffix(idsStr, ",")
fmt.Println(idsStr)
// Stop topics connection to server without discarding them.s
ps.Registry().StopTopics("mine", "someTopic")

// Reset registry after stopping all topics.
ps.Registry().Clear()
Output:

Msg: mine: m0
tropical, mine

func (*Publisher) To

func (p *Publisher) To(topics ...string) pubsub.Publisher

To adds topic to sent message to.

If Destroy is called, unknown topic will not be saved in registry and https://pkg.go.dev/cloud.google.com/go/pubsub#Topic.Stop will be called.

This method apply last registered send configuration for topic. If no configuration where registered, it use default configuration

func (*Publisher) WithOption

func (p *Publisher) WithOption(opt interface{}) pubsub.Publisher

WithOption provide send settings to apply to call.

It will be applied to all topics added to sent process before the WithOption call.

type Pubsub

type Pubsub struct {
	// Client is the gcp instance used to send requests.
	// It is passed as a private parameter to all structures
	// derived from Pubsub
	Client *googlePubSub.Client

	// Config for running instance.
	// It is passed as a private parameter to all structures
	// derived from Pubsub
	Config Config
	// contains filtered or unexported fields
}

Pubsub implements pubsub.Pubsub interface for GCP.

func (Pubsub) Listen

func (p Pubsub) Listen(subscription string) pubsub.Listener

func (Pubsub) Publish

func (p Pubsub) Publish() pubsub.Publisher

Publisher set up an instance to send message to pubsub. It use client, config and registry from main Pubsub instance.

func (Pubsub) Receive

func (p Pubsub) Receive(subscription string) pubsub.Receiver

func (Pubsub) Registry

func (p Pubsub) Registry() pubsub.Registry

type Receiver

type Receiver struct{}

func (Receiver) OnError

func (r Receiver) OnError(callback func(ctx context.Context))

func (Receiver) OnMessage

func (r Receiver) OnMessage(envelop pubsub.Envelop, callback pubsub.MessageCallback)

func (Receiver) OnUnmatched

func (r Receiver) OnUnmatched(callback pubsub.MessageCallback)

func (Receiver) Receive

func (r Receiver) Receive(ctx context.Context) error

func (Receiver) Start

func (r Receiver) Start(ctx context.Context)

func (Receiver) Stop

func (r Receiver) Stop()

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func (*Registry) AddSubscription

func (l *Registry) AddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) error

nolint: dupl

func (*Registry) AddTopic

func (l *Registry) AddTopic(key string, publishSettings *googlePubSub.PublishSettings) error

nolint: dupl

func (*Registry) Clear

func (l *Registry) Clear()

func (*Registry) MustAddSubscription

func (l *Registry) MustAddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) pubsub.Registry

nolint: dupl

func (*Registry) MustAddTopic

func (l *Registry) MustAddTopic(key string, publishSettings *googlePubSub.PublishSettings) pubsub.Registry

nolint: dupl

func (*Registry) StopTopic

func (l *Registry) StopTopic(key string)

func (*Registry) StopTopics

func (l *Registry) StopTopics(topics ...string)

type SendResults

type SendResults struct {
	// contains filtered or unexported fields
}

func (SendResults) OnResults

func (s SendResults) OnResults(ctx context.Context, callback func(topic string, result pubsub.Result))

func (SendResults) Results

func (s SendResults) Results(ctx context.Context) pubsub.Results

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL