amqp

package module
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2022 License: Apache-2.0 Imports: 2 Imported by: 0

README

⚠️ This is a proof of concept

As this is a proof of concept, it won't be supported by the k6 team. It may also break in the future as xk6 evolves. USE AT YOUR OWN RISK! Any issues with the tool should be raised here.

xk6-amqp

A k6 extension for publishing and consuming of messages AMQP queues and exchanges.

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

Then:

  1. Download xk6:
$ go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the k6 binary:
$ xk6 build --with github.com/grafana/xk6-amqp@latest

Development

To make development a little smoother, use the Makefile in the root folder. The default target will format your code, run tests, and create a k6 binary with your local code rather than from GitHub.

git clone git@github.com:grafana/xk6-amqp.git
cd xk6-amqp
make

Example

import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
  console.log("K6 amqp extension enabled, version: " + Amqp.version)
  const url = "amqp://guest:guest@localhost:5672/"
  Amqp.start({
    connection_url: url
  })
  console.log("Connection opened: " + url)
  
  const queueName = 'K6 general'
  
  Queue.declare({
    name: queueName,
    // durable: false,
    // delete_when_unused: false,
    // exclusive: false,
    // no_wait: false,
    // args: null
  })

  console.log(queueName + " queue is ready")

  Amqp.publish({
    queue_name: queueName,
    body: "Ping from k6",
    content_type: "text/plain"
    // exchange: '',
    // mandatory: false,
    // immediate: false,
  })

  const listener = function(data) { console.log('received data: ' + data) }
  Amqp.listen({
    queue_name: queueName,
    listener: listener,
    // consumer: '',
    // auto_ack: true,
    // exclusive: false,
		// no_local: false,
		// no_wait: false,
    // args: null
  })
}

Result output:

$ ./k6 run script.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: ../xk6-amqp/examples/test.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] K6 amqp extension enabled, version: v0.0.1    source=console
INFO[0000] Connection opened: amqp://guest:guest@localhost:5672/  source=console
INFO[0000] K6 general queue is ready                     source=console
INFO[0000] received data: Ping from k6                   source=console

running (00m00.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m00.0s/10m0s  1/1 iters, 1 per VU

     data_received........: 0 B 0 B/s
     data_sent............: 0 B 0 B/s
     iteration_duration...: avg=31.37ms min=31.37ms med=31.37ms max=31.37ms p(90)=31.37ms p(95)=31.37ms
     iterations...........: 1   30.855627/s

Inspect examples folder for more details.

Documentation

Overview

Package amqp contains AMQP API for a remote server.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQP

type AMQP struct {
	Version    string
	Connection *amqpDriver.Connection
	Queue      *Queue
	Exchange   *Exchange
}

AMQP type holds connection to a remote AMQP server.

func (*AMQP) Listen

func (amqp *AMQP) Listen(options ListenOptions) error

Listen binds to an AMQP queue in order to receive message(s) as they are received.

func (*AMQP) Publish

func (amqp *AMQP) Publish(options PublishOptions) error

Publish delivers the payload using options provided.

func (*AMQP) Start

func (amqp *AMQP) Start(options Options) error

Start establishes a session with an AMQP server given the provided options.

type ConsumeOptions

type ConsumeOptions struct {
	Consumer  string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqpDriver.Table
}

ConsumeOptions defines options for use when consuming a message.

type DeclareOptions

type DeclareOptions struct {
	Name             string
	Durable          bool
	DeleteWhenUnused bool
	Exclusive        bool
	NoWait           bool
	Args             amqpDriver.Table
}

DeclareOptions provides queue options when declaring (creating) a queue.

type Exchange

type Exchange struct {
	Version    string
	Connection *amqpDriver.Connection
}

Exchange defines a connection to publish/subscribe destinations.

func (*Exchange) Bind

func (exchange *Exchange) Bind(options ExchangeBindOptions) error

Bind subscribes one exchange to another.

func (*Exchange) Declare

func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error

Declare creates a new exchange given the provided options.

func (*Exchange) Delete

func (exchange *Exchange) Delete(name string) error

Delete removes an exchange from the remote server given the exchange name.

func (*Exchange) Unbind

func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error

Unbind removes a subscription from one exchange to another.

type ExchangeBindOptions

type ExchangeBindOptions struct {
	DestinationExchangeName string
	SourceExchangeName      string
	RoutingKey              string
	NoWait                  bool
	Args                    amqpDriver.Table
}

ExchangeBindOptions provides options when binding (subscribing) one exchange to another.

type ExchangeDeclareOptions

type ExchangeDeclareOptions struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqpDriver.Table
}

ExchangeDeclareOptions provides options when declaring (creating) an exchange.

type ExchangeOptions

type ExchangeOptions struct {
	ConnectionURL string
}

ExchangeOptions defines configuration settings for accessing an exchange.

type ExchangeUnbindOptions

type ExchangeUnbindOptions struct {
	DestinationExchangeName string
	SourceExchangeName      string
	RoutingKey              string
	NoWait                  bool
	Args                    amqpDriver.Table
}

ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.

type ListenOptions

type ListenOptions struct {
	Listener  ListenerType
	QueueName string
	Consumer  string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqpDriver.Table
}

ListenOptions defines options for subscribing to message(s) within a queue.

type ListenerType

type ListenerType func(string) error

ListenerType is the message handler implemented within JavaScript.

type Options

type Options struct {
	ConnectionURL string
}

Options defines configuration options for an AMQP session.

type PublishOptions

type PublishOptions struct {
	QueueName   string
	Body        string
	Exchange    string
	ContentType string
	Mandatory   bool
	Immediate   bool
	Persistent  bool
}

PublishOptions defines a message payload with delivery options.

type Queue

type Queue struct {
	Version    string
	Connection *amqpDriver.Connection
}

Queue defines a connection to a point-to-point destination.

func (*Queue) Bind

func (queue *Queue) Bind(options QueueBindOptions) error

Bind subscribes a queue to an exchange in order to receive message(s).

func (*Queue) Declare

func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error)

Declare creates a new queue given the provided options.

func (*Queue) Delete

func (queue *Queue) Delete(name string) error

Delete removes a queue from the remote server given the queue name.

func (*Queue) Inspect

func (queue *Queue) Inspect(name string) (amqpDriver.Queue, error)

Inspect provides queue metadata given queue name.

func (*Queue) Purge

func (queue *Queue) Purge(name string, noWait bool) (int, error)

Purge removes all non-consumed message(s) from the specified queue.

func (*Queue) Unbind

func (queue *Queue) Unbind(options QueueUnbindOptions) error

Unbind removes a queue subscription from an exchange to discontinue receiving message(s).

type QueueBindOptions

type QueueBindOptions struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	NoWait       bool
	Args         amqpDriver.Table
}

QueueBindOptions provides options when binding a queue to an exchange in order to receive message(s).

type QueueOptions

type QueueOptions struct {
	ConnectionURL string
}

QueueOptions defines configuration settings for accessing a queue.

type QueueUnbindOptions

type QueueUnbindOptions struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	Args         amqpDriver.Table
}

QueueUnbindOptions provides options when unbinding a queue from an exchange to stop receiving message(s).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL