pulsar

package module
v0.0.0-...-3491c95 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2018 License: MIT Imports: 11 Imported by: 0

README

go-pulsar-ws

GoDoc

A Go client for Apache Pulsar using the Websocket protocol.

Status

Currently, there is no official Go client using the [binary protocol], although the Pulsar team is working on a cgo-based implementation. This library was created as a temporary solution. Once an official solution comes out, I will likely not support this anymore.

Usage

Producer
// Initialize a client passing the Pulsar Websocket endpoint.
client := pulsar.New("ws://localhost:8080/ws")

// Initialize a producer given a topic with an optional set of parameters.
// Establishes a websocket connection.
producer, err := client.Producer("persistent/standalone/us-east/test", nil)
if err != nil {
  log.Fatal(err)
}
defer producer.Close()

ctx := context.Background()
res, err := producer.Send(ctx, *pulsar.PublishMsg{
  Payload: []byte("hello world!"),
})
if err != nil {
  log.Fatal(err)
}

// Print the resulting message id.
log.Print(res.MsgId)
Consumer
client := pulsar.New("ws://localhost:8080/ws")

consumer, err := client.Consumer("persistent/standalone/us-east/test", "my-sub", nil)
if err != nil {
  log.Fatal(err)
}
defer consumer.Close()

ctx := context.Background()

for {
  msg, err := consumer.Receive(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // Print message.
  log.Print(string(msg.Payload))

  // Ack once processed.
  if err := consumer.Ack(ctx, msg); err != nil {
    log.Fatal(err)
  }
}
Reader
client := pulsar.New("ws://localhost:8080/ws")

reader, err := client.Reader("persistent/standalone/us-east/test", pulsar.Params{
  "messageId": "earliest",
})
if err != nil {
  log.Fatal(err)
}
defer reader.Close()

ctx := context.Background()

for {
  msg, err := reader.Receive(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // Print message.
  log.Print(string(msg.Payload))

  // Ack once processed.
  if err := reader.Ack(ctx, msg); err != nil {
    log.Fatal(err)
  }
}

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultLogger   = log.Logger
	DefaultLogLevel = zerolog.InfoLevel
)

Functions

This section is empty.

Types

type Client

type Client struct {
	URL    string
	Logger zerolog.Logger
	// contains filtered or unexported fields
}

Client is a client for Apache Pulsar Websocket API. Use New to initialize the client with the default configuration.

func New

func New(url string) *Client

New initializes a new client.

func (*Client) Consumer

func (c *Client) Consumer(topic string, name string, params Params) (Consumer, error)

Consumer initializes a new consumer.

func (*Client) Producer

func (c *Client) Producer(topic string, params Params) (Producer, error)

Producer initializes a new producer.

func (*Client) Reader

func (c *Client) Reader(topic string, params Params) (Reader, error)

Reader initializes a new reader.

type Consumer

type Consumer interface {
	Receive(context.Context) (*Msg, error)
	Ack(context.Context, *Msg) error
	Close() error
}

type Msg

type Msg struct {
	// MsgId is a base64-encoded value of the serializable MessageId proto.
	MsgId string `json:"messageId"`

	// Payload is the message payload.
	Payload []byte `json:"payload"`

	// PublishTime is the time the message was published.
	PublishTime time.Time `json:"publishTime"`

	// Properties are an arbitrary set of key-value properties.
	Properties map[string]string `json:"properties"`

	// Key is the partition key for this message.
	Key string `json:"key"`
}

type Params

type Params map[string]string

type Producer

type Producer interface {
	Send(context.Context, *PublishMsg) (*PublishResult, error)
	Close() error
}

type PublishError

type PublishError struct {
	Code    string `json:"code"`
	Msg     string `json:"msg"`
	Context string `json:"context"`
}

PublishError is the error type used if the server responds with an error.

func (*PublishError) Error

func (e *PublishError) Error() string

Error implements the error interface.

type PublishMsg

type PublishMsg struct {
	Payload             []byte            `json:"payload"`
	Properties          map[string]string `json:"properties"`
	Context             string            `json:"context"`
	Key                 string            `json:"key"`
	ReplicationClusters []string          `json:"replicationClusters"`
}

PublishMsg is the message type for publishing a new message.

type PublishResult

type PublishResult struct {
	MsgId   string `json:"messageId"`
	Context string `json:"context"`
}

PublishResult is the result of a successful publish.

type Reader

type Reader interface {
	Receive(context.Context) (*Msg, error)
	Ack(context.Context, *Msg) error
	Close() error
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL