amqp

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

README

xk6-amqp-modern

A modern k6 extension for publishing and consuming messages via AMQP 0.9.1 (RabbitMQ).

This is a complete rewrite of grafana/xk6-amqp, modernized to work with the latest k6 versions using the proper Module/Instance API and event loop integration.

⚠️ This project uses AMQP 0.9.1, not AMQP 1.0.


Table of Contents


Quick Start

# Build k6 with extension using Docker
docker compose build k6-build

# Start RabbitMQ
docker compose up -d rabbitmq

# Run a test
docker compose run --rm k6-integration

Build

For Windows
docker run --rm -v "${PWD}:/src" -w /src golang:1.25-alpine sh -c 'apk add --no-cache git && go install go.k6.io/xk6/cmd/xk6@latest && GOOS=windows GOARCH=amd64 $(go env GOPATH)/bin/xk6 build --with github.com/mario15390/xk6-amqp-modern=. --output k6.exe'
For Linux
docker run --rm -v "${PWD}:/src" -w /src golang:1.25-alpine sh -c 'apk add --no-cache git && go install go.k6.io/xk6/cmd/xk6@latest && GOOS=linux GOARCH=amd64 $(go env GOPATH)/bin/xk6 build --with github.com/mario15390/xk6-amqp-modern=. --output k6-linux'
docker compose build k6-build

This produces a Docker image k6-amqp:local with the custom k6 binary.

From Source

Prerequisites: Go 1.23+, Git

# Install xk6
go install go.k6.io/xk6/cmd/xk6@latest

# Build k6 binary with this extension
xk6 build --with github.com/mario15390/xk6-amqp-modern=.

JavaScript API

All functions are named exports from k6/x/amqp:

import {
  connect, close,
  publish, listen,
  declareQueue, deleteQueue, inspectQueue, bindQueue, unbindQueue, purgeQueue,
  declareExchange, deleteExchange, bindExchange, unbindExchange,
} from 'k6/x/amqp';
Connection
Function Description
connect({ url }) Open an AMQP connection for this VU
close() Close the AMQP connection
Publishing & Consuming
Function Description
publish({ queue_name, body, content_type, ... }) Publish a message
await listen({ queue_name, auto_ack, ... }) Consume one message (async, returns Promise)
Queue Operations
Function Description
declareQueue({ name, durable, ... }) Declare/create a queue
deleteQueue(name) Delete a queue
inspectQueue(name) Get queue metadata (messages, consumers)
bindQueue({ queue_name, exchange_name, routing_key }) Bind queue to exchange
unbindQueue({ queue_name, exchange_name, routing_key }) Unbind queue from exchange
purgeQueue(name) Remove all messages from a queue
Exchange Operations
Function Description
declareExchange({ name, kind, durable, ... }) Declare/create an exchange
deleteExchange(name) Delete an exchange
bindExchange({ source_exchange_name, destination_exchange_name, routing_key }) Bind exchanges
unbindExchange({ source_exchange_name, destination_exchange_name, routing_key }) Unbind exchanges
Example
import { connect, publish, listen, declareQueue, deleteQueue, close } from 'k6/x/amqp';
import { check } from 'k6';

export default async function () {
  connect({ url: "amqp://guest:guest@localhost:5672/" });

  const q = declareQueue({ name: "my-queue" });
  console.log(`Queue ${q.name} ready (${q.messages} messages)`);

  publish({
    queue_name: "my-queue",
    body: "Hello from k6!",
    content_type: "text/plain",
  });

  const msg = await listen({
    queue_name: "my-queue",
    auto_ack: true,
  });

  check(msg, {
    'message received': (m) => m === "Hello from k6!",
  });

  deleteQueue("my-queue");
  close();
}

Architecture & Design

Why was the rewrite needed?

The original xk6-amqp extension had three critical issues:

  1. Legacy Module API: Used modules.Register("k6/x/amqp", &struct{}) which registers a single shared instance. Modern k6 requires modules.Module + modules.Instance interfaces for per-VU isolation.

  2. Shared Mutable State: All VUs shared a global map[int]*Connection via pointers, causing race conditions with multiple VUs.

  3. Unsafe Async: The Listen() function spawned goroutines that directly called JavaScript callbacks, bypassing k6's event loop and causing panics from concurrent access to the JS runtime.

Solution Applied
┌─────────────────────────────────────────────────────────┐
│                    k6 Engine                            │
│                                                         │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │   VU 1   │  │   VU 2   │  │   VU N   │              │
│  │          │  │          │  │          │              │
│  │ ┌──────┐ │  │ ┌──────┐ │  │ ┌──────┐ │              │
│  │ │Module│ │  │ │Module│ │  │ │Module│ │  ◄── Each VU  │
│  │ │Inst. │ │  │ │Inst. │ │  │ │Inst. │ │     gets its  │
│  │ │      │ │  │ │      │ │  │ │      │ │     own inst. │
│  │ │ conn │ │  │ │ conn │ │  │ │ conn │ │              │
│  │ └──┬───┘ │  │ └──┬───┘ │  │ └──┬───┘ │              │
│  └────┼─────┘  └────┼─────┘  └────┼─────┘              │
│       │             │             │                     │
└───────┼─────────────┼─────────────┼─────────────────────┘
        │             │             │
        ▼             ▼             ▼
   ┌─────────────────────────────────────┐
   │          RabbitMQ Server            │
   │       (AMQP 0.9.1 Broker)          │
   └─────────────────────────────────────┘
Key Design Decisions
Decision Rationale
Per-VU connections Eliminates shared state and race conditions. Each VU's ModuleInstance has its own *amqpDriver.Connection.
RootModule factory pattern NewModuleInstance(vu) is called once per VU, providing the modules.VU reference for event loop access.
Promise-based listen() Uses vu.RegisterCallback() + rt.NewPromise() to safely return async results to JavaScript via k6's event loop.
Named exports (no sub-modules) Original had separate k6/x/amqp/queue and k6/x/amqp/exchange modules with shared state. Consolidated into single k6/x/amqp module.
js:"snake_case" struct tags Maintains JavaScript naming convention compatibility with the original extension's API.
Event Loop Integration (Listen)

The listen() function is the most architecturally important change:

JavaScript Thread                     Background Goroutine
─────────────────                     ────────────────────
1. Call listen()
2. Create Promise
3. RegisterCallback() ──────────────► 4. Open AMQP channel
   (tells k6: "async                  5. Start consuming
    work in progress")                 6. Wait for message...
                                       7. Message arrives!
   return Promise ◄────────────────── 8. callback(func() {
                                          resolve(message)
                                       })
9. await resolves with message
10. Continue script execution

RegisterCallback() is critical: it tells k6's event loop to keep the VU alive while the goroutine runs. The callback function is the only safe way to interact with the JS runtime from a goroutine.


Project Structure

xk6-amqp-modern/
├── amqp.go                 # Core: RootModule, ModuleInstance, Connect, Publish, Listen
├── amqp_test.go            # Unit tests for the core module
├── queues.go               # Queue operations (DeclareQueue, DeleteQueue, etc.)
├── exchanges.go            # Exchange operations (DeclareExchange, BindExchange, etc.)
├── go.mod                  # Go module definition and dependencies
├── go.sum                  # Go dependency checksums
├── Makefile                # Build, test, and Docker targets
├── Dockerfile              # Multi-stage build for k6 binary
├── Dockerfile.test         # Container for running Go tests
├── docker-compose.yml      # Full stack: RabbitMQ + build + tests
├── .golangci.yml           # Go linter configuration
├── integration/            # Integration tests
│   ├── integration_test.go # Go tests requiring RabbitMQ (build tag: integration)
│   └── scripts/            # k6 test scripts
│       ├── test_publish_listen.js   # Basic publish/listen test
│       ├── test_queue_ops.js        # Queue operations test
│       ├── test_exchange_ops.js     # Exchange operations test
│       ├── test_multi_vu.js         # Multi-VU performance test
│       └── test_publisher_only.js   # Multiple sources publisher test
├── examples/               # Usage examples
│   ├── basic.js            # Simple publish and listen
│   ├── queue-operations.js # Queue lifecycle demo
│   └── exchange-routing.js # Exchange routing demo
└── README.md               # This file
File Responsibilities
File What it does
amqp.go Defines RootModule (factory) and ModuleInstance (per-VU state). Contains Connect(), Close(), Publish(), Listen(), and the init() registration.
queues.go All queue-related methods on ModuleInstance: declare, delete, inspect, bind, unbind, purge. Each method opens a temporary AMQP channel.
exchanges.go All exchange-related methods on ModuleInstance: declare, delete, bind, unbind. Same channel pattern as queues.
amqp_test.go Unit tests that run WITHOUT RabbitMQ. Tests export names, error handling, option defaults.
integration/integration_test.go Tests that require a running RabbitMQ. Guarded by //go:build integration build tag.

Build k6 with Extension

Option 1: Compiling for Windows / Linux using Docker (No local Go required)

If you don't want to install Go locally, you can use Docker to cross-compile the binary.

[!WARNING] Windows Users: Run these commands in PowerShell or Command Prompt, NOT Git Bash. Git Bash alters paths in volume mounts and will cause a working directory invalid error.

Compile k6.exe for Windows (x64)

docker run --rm -v "${PWD}:/src" -w /src golang:1.25-alpine sh -c 'apk add --no-cache git && go install go.k6.io/xk6/cmd/xk6@latest && GOOS=windows GOARCH=amd64 $(go env GOPATH)/bin/xk6 build --with github.com/mario15390/xk6-amqp-modern=. --output k6.exe'

Compile k6 for Linux (Ubuntu/Debian x64)

docker run --rm -v "${PWD}:/src" -w /src golang:1.25-alpine sh -c 'apk add --no-cache git && go install go.k6.io/xk6/cmd/xk6@latest && GOOS=linux GOARCH=amd64 $(go env GOPATH)/bin/xk6 build --with github.com/mario15390/xk6-amqp-modern=. --output k6-linux'

Once the command finishes, you will find the binary (k6.exe or k6-linux) in your project folder. You can then run your test scripts directly:

# Windows
.\k6.exe run examples\basic.js

# Linux
./k6-linux run examples/basic.js
Option 2: Local Compilation (Requires Go)

If you have Go installed on your local machine, you can build the custom k6 binary directly:

go install go.k6.io/xk6/cmd/xk6@latest
xk6 build --with github.com/mario15390/xk6-amqp-modern=.
./k6 run examples/basic.js

Testing

Run All Tests with Docker
# Run everything (unit + integration + k6)
make docker-test-all

# Or individually:
make docker-unit-test          # Unit tests only
make docker-integration-test   # Integration tests (starts RabbitMQ)
make docker-k6-test            # k6 script tests
Run Unit Tests Locally
go test -v -race -cover ./...
Run Integration Tests Locally
# Start RabbitMQ first
docker compose up -d rabbitmq

# Wait for it to be ready, then run integration tests
go test -v -tags=integration -timeout=120s ./integration/...
Performance Testing

The docker-compose.yml file is configured to map the local ./integration/scripts folder as a volume (/scripts) inside the container. This means you can edit the .js files locally and the container will see the changes instantly, without needing to rebuild the Docker image!

# Run the default performance test (5 VUs, 20 iterations)
make docker-performance

# Custom: run test_multi_vu.js with 50 VUs for 1 minute
docker compose run --rm k6-performance run /scripts/test_multi_vu.js --vus 50 --duration 1m

# Simulate multiple sources publishing (NO consuming)
docker compose run --rm k6-performance run /scripts/test_publisher_only.js

[!NOTE] Open integration/scripts/test_publisher_only.js to see predefined commented configurations (Light, Moderate, Heavy loads). Simply uncomment the one you want to use and run the command above.

The performance tests report custom metrics:

  • amqp_messages_published — Counter of published messages
  • amqp_messages_received — Counter of consumed messages
  • amqp_publish_duration_ms — Trend of publish latencies
  • amqp_listen_duration_ms — Trend of listen latencies

Large Scale Load Testing Strategy

When testing with massive concurrency (e.g., 5,000+ VUs), you are no longer just testing the extension's code; you are pushing the limits of the OS network stack, Docker's virtual network, and RabbitMQ's connection handlers.

If you start 5,000 VUs instantly, you will create a Connection Storm. 5,000 TCP sockets attempting to open in the exact same millisecond will likely result in read tcp ... i/o timeout errors as the host or container network queues overflow.

1. RabbitMQ Docker Tuning (Implemented)

To handle high connection density, the provided docker-compose.yml tunes RabbitMQ by:

  • Setting ulimits: nofile: soft: 65536 / hard: 65536 to allow 65k concurrent TCP connections.
  • Adding RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=+P 1048576 to increase the Erlang process limits.
2. k6 Scenarios

Never use options = { vus: 5000, duration: '1m' } for large scale tests! Instead, use the k6 scenarios API with the ramping-vus executor to gradually increase load. This allows RabbitMQ to process connection handshakes smoothly.

Open integration/scripts/test_publisher_only.js to find predefined load profiles:

  • Smoke Test: Validates functionality (1 VU).
  • Load Test: Normal sustained load (500 VUs constant).
  • Stress Test: Pushes limits to find breaking points (ramping to 5000 VUs).
  • Spike Test: Sudden massive surges (baseline 100 -> jump to 4000).
  • Soak Test: Sustained load over hours to detect memory leaks.
3. Interpreting Results

When running large tests, pay attention to these metrics:

  1. amqp_publish_duration_ms: If this spikes severely, RabbitMQ is struggling to process the messages (CPU/Memory bound). Check if vm_memory_high_watermark is triggered in the RabbitMQ management UI.
  2. i/o timeout errors: If you see these during a gradual ramp-up, the network stack is saturated.
  3. CPU vs Network: Running k6 and RabbitMQ on the same machine under heavy load will cause them to compete for CPU.

[!WARNING] Docker Desktop Limits: Docker on Windows (WSL2) and Mac uses a lightweight virtual machine. The virtualized network proxy between the host and the VM becomes a severe bottleneck long before RabbitMQ or k6 actually fails. For accurate large-scale QA/Preprod testing, run k6 and RabbitMQ natively on separate Linux servers or dedicated cloud instances.


Docker

Services
Service Purpose
rabbitmq RabbitMQ 3.x with Management Plugin
k6-build Builds the custom k6 binary
unit-tests Runs Go unit tests
integration-tests Runs Go integration tests (needs RabbitMQ)
k6-integration Runs k6 integration script
k6-performance Runs k6 performance test
RabbitMQ Management UI

After starting RabbitMQ:

docker compose up -d rabbitmq

Access the management UI at http://localhost:15672 (login: guest / guest).


Examples

See the examples/ directory for usage demos, and integration/scripts/ for test scripts.

License

See LICENSE file.

Documentation

Overview

Package amqp provides a k6 extension for AMQP 0.9.1 (RabbitMQ) messaging.

Architecture overview:

  • RootModule: Singleton registered with k6 via init(). Acts as a factory that creates one ModuleInstance per Virtual User (VU).

  • ModuleInstance: Per-VU instance holding its own AMQP connection and exposing all JavaScript-callable methods. This ensures thread safety since each VU runs in its own goroutine and has isolated state.

  • Event Loop Integration: Asynchronous operations (like Listen) use vu.RegisterCallback() to safely schedule results back onto the k6 event loop, preventing the runtime panics that plagued the original extension (see: github.com/grafana/xk6-amqp/issues/6 and #10).

JavaScript API (all functions are named exports from 'k6/x/amqp'):

connect({ url })           — Open AMQP connection for this VU
close()                    — Close the VU's AMQP connection
publish({ ... })           — Publish a message to a queue/exchange
listen({ ... })            — Consume one message (returns Promise<string>)
declareQueue({ ... })      — Declare a queue
deleteQueue(name)          — Delete a queue
inspectQueue(name)         — Inspect queue metadata
bindQueue({ ... })         — Bind a queue to an exchange
unbindQueue({ ... })       — Unbind a queue from an exchange
purgeQueue(name)           — Purge all messages from a queue
declareExchange({ ... })   — Declare an exchange
deleteExchange(name)       — Delete an exchange
bindExchange({ ... })      — Bind an exchange to another exchange
unbindExchange({ ... })    — Unbind an exchange from another exchange

exchanges.go — Exchange operations for the xk6-amqp extension.

Exchanges are the routing hubs in AMQP. Producers publish messages to exchanges, which then route them to queues based on bindings and routing keys.

Exchange types supported by RabbitMQ:

  • "direct": routes to queues whose binding key exactly matches the routing key
  • "fanout": routes to all bound queues (ignores routing key)
  • "topic": routes based on wildcard pattern matching (* and #)
  • "headers": routes based on message header attributes

All methods are on ModuleInstance (per-VU, no shared state).

queues.go — Queue operations for the xk6-amqp extension.

All queue methods are defined on ModuleInstance, meaning each VU uses its own AMQP connection. There are no shared resources.

Queue operations are synchronous (they block the VU goroutine until the AMQP server responds). This is fine because:

  • Queue operations are fast (metadata only, no message payloads)
  • They are typically called in setup/init, not in the hot loop
  • k6 VUs are independent goroutines — blocking one doesn't affect others

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnectOptions

type ConnectOptions struct {
	// URL is the AMQP connection string (e.g. "amqp://guest:guest@localhost:5672/")
	URL string `js:"url"`
}

ConnectOptions holds the parameters for establishing an AMQP connection.

type DeclareExchangeOptions

type DeclareExchangeOptions struct {
	// Name is the exchange name.
	Name string `js:"name"`

	// Kind is the exchange type: "direct", "fanout", "topic", or "headers".
	Kind string `js:"kind"`

	// Durable exchanges survive broker restarts.
	Durable bool `js:"durable"`

	// AutoDelete: if true, the exchange is deleted when the last queue
	// bound to it is unbound.
	AutoDelete bool `js:"auto_delete"`

	// Internal exchanges cannot be published to directly by clients;
	// they can only receive messages from other exchanges via bindings.
	Internal bool `js:"internal"`

	// NoWait: if true, do not wait for server confirmation.
	NoWait bool `js:"no_wait"`

	// Args are optional arguments for the exchange.
	Args amqpDriver.Table `js:"args"`
}

DeclareExchangeOptions provides parameters when declaring (creating) an exchange.

type DeclareQueueOptions

type DeclareQueueOptions struct {
	// Name is the queue name. If empty, the server generates a unique name.
	Name string `js:"name"`

	// Durable queues survive broker restarts. Non-durable queues are deleted
	// when the broker shuts down.
	Durable bool `js:"durable"`

	// DeleteWhenUnused causes the queue to be deleted when the last
	// consumer unsubscribes.
	DeleteWhenUnused bool `js:"delete_when_unused"`

	// Exclusive queues are only accessible by the connection that
	// declared them and are deleted when that connection closes.
	Exclusive bool `js:"exclusive"`

	// NoWait: if true, the server will not respond to the declare.
	// The client should not wait for a response.
	NoWait bool `js:"no_wait"`

	// Args are optional arguments for the queue (e.g., x-message-ttl).
	Args amqpDriver.Table `js:"args"`
}

DeclareQueueOptions provides parameters when declaring (creating) a queue. All fields are optional except Name.

type ExchangeBindOptions

type ExchangeBindOptions struct {
	// DestinationExchangeName receives routed messages.
	DestinationExchangeName string `js:"destination_exchange_name"`

	// SourceExchangeName is where messages originate.
	SourceExchangeName string `js:"source_exchange_name"`

	// RoutingKey is the routing pattern for the binding.
	RoutingKey string `js:"routing_key"`

	// NoWait: if true, do not wait for server confirmation.
	NoWait bool `js:"no_wait"`

	// Args are optional arguments for the binding.
	Args amqpDriver.Table `js:"args"`
}

ExchangeBindOptions provides parameters when binding one exchange to another. This creates a routing link: messages published to the source exchange are also routed to the destination exchange.

type ExchangeUnbindOptions

type ExchangeUnbindOptions struct {
	// DestinationExchangeName is the exchange that was receiving messages.
	DestinationExchangeName string `js:"destination_exchange_name"`

	// SourceExchangeName is the exchange that was sending messages.
	SourceExchangeName string `js:"source_exchange_name"`

	// RoutingKey must match the routing key used when the binding was created.
	RoutingKey string `js:"routing_key"`

	// NoWait: if true, do not wait for server confirmation.
	NoWait bool `js:"no_wait"`

	// Args must match the args used when the binding was created.
	Args amqpDriver.Table `js:"args"`
}

ExchangeUnbindOptions provides parameters when removing an exchange-to-exchange binding.

type ListenOptions

type ListenOptions struct {
	QueueName string           `js:"queue_name"`
	Consumer  string           `js:"consumer"`
	AutoAck   bool             `js:"auto_ack"`
	Exclusive bool             `js:"exclusive"`
	NoLocal   bool             `js:"no_local"`
	NoWait    bool             `js:"no_wait"`
	Args      amqpDriver.Table `js:"args"`
}

ListenOptions defines parameters for consuming a single message from a queue. The listen function returns a Promise that resolves with the message body.

type ModuleInstance

type ModuleInstance struct {
	// contains filtered or unexported fields
}

ModuleInstance holds all per-VU state: the k6 VU reference, the AMQP connection, and the exported JavaScript API surface. Each VU gets its own instance via RootModule.NewModuleInstance().

func (*ModuleInstance) BindExchange

func (mi *ModuleInstance) BindExchange(options ExchangeBindOptions) error

BindExchange creates a routing link from one exchange (source) to another (destination). Messages published to the source that match the routing key will also be delivered to the destination.

JavaScript usage:

bindExchange({
    source_exchange_name: "source-exchange",
    destination_exchange_name: "dest-exchange",
    routing_key: "events.*",
});

func (*ModuleInstance) BindQueue

func (mi *ModuleInstance) BindQueue(options QueueBindOptions) error

BindQueue creates a binding between a queue and an exchange. Messages routed to the exchange with a matching routing key will be delivered to the queue.

JavaScript usage:

bindQueue({
    queue_name: "my-queue",
    exchange_name: "my-exchange",
    routing_key: "my.routing.key",
});

func (*ModuleInstance) Close

func (mi *ModuleInstance) Close() error

Close gracefully shuts down the AMQP connection for this VU. After calling Close(), the VU must call Connect() again before performing any AMQP operations.

JavaScript usage:

close();

func (*ModuleInstance) Connect

func (mi *ModuleInstance) Connect(options ConnectOptions) error

Connect establishes an AMQP connection for this VU. It should be called once in the setup or init phase of a k6 script.

JavaScript usage:

connect({ url: "amqp://guest:guest@localhost:5672/" });

func (*ModuleInstance) DeclareExchange

func (mi *ModuleInstance) DeclareExchange(options DeclareExchangeOptions) error

DeclareExchange creates or verifies an exchange on the AMQP server. If the exchange already exists with identical properties, this is a no-op.

JavaScript usage:

declareExchange({
    name: "my-exchange",
    kind: "topic",
    durable: true,
});

func (*ModuleInstance) DeclareQueue

func (mi *ModuleInstance) DeclareQueue(options DeclareQueueOptions) (map[string]interface{}, error)

DeclareQueue creates or verifies a queue on the AMQP server. If the queue already exists with identical properties, this is a no-op. If it exists with different properties, the server will return an error.

Returns a map with queue metadata:

{ name: "my-queue", messages: 0, consumers: 0 }

JavaScript usage:

const q = declareQueue({ name: "my-queue", durable: true });
console.log(`Queue ${q.name} has ${q.messages} messages`);

func (*ModuleInstance) DeleteExchange

func (mi *ModuleInstance) DeleteExchange(name string) error

DeleteExchange removes an exchange from the AMQP server. Any existing bindings to the exchange are also removed.

JavaScript usage:

deleteExchange("my-exchange");

func (*ModuleInstance) DeleteQueue

func (mi *ModuleInstance) DeleteQueue(name string) error

DeleteQueue removes a queue from the AMQP server. Any pending messages in the queue are discarded.

JavaScript usage:

deleteQueue("my-queue");

func (*ModuleInstance) Exports

func (mi *ModuleInstance) Exports() modules.Exports

Exports implements modules.Instance and returns the named exports that k6 will make available when a script does:

import { connect, publish, listen, ... } from 'k6/x/amqp';

func (*ModuleInstance) InspectQueue

func (mi *ModuleInstance) InspectQueue(name string) (map[string]interface{}, error)

InspectQueue returns metadata about a queue without modifying it. This is useful for checking message counts and consumer counts.

Returns a map:

{ name: "my-queue", messages: 42, consumers: 3 }

JavaScript usage:

const info = inspectQueue("my-queue");
console.log(`${info.messages} messages waiting`);

func (*ModuleInstance) Listen

func (mi *ModuleInstance) Listen(options ListenOptions) interface{}

Listen consumes ONE message from the specified queue and returns it as a JavaScript Promise. This is the key architectural change from the original extension:

OLD (broken): spawned a goroutine that called a JS callback directly, causing concurrent access panics on the JS runtime.

NEW (safe): uses vu.RegisterCallback() to schedule the result back onto k6's event loop. The goroutine reads from the AMQP channel in the background, then uses the registered callback to safely resolve/reject the Promise on the main JS thread.

Performance note: RegisterCallback + Promise adds minimal overhead (~microseconds) compared to the AMQP network round-trip (milliseconds). The goroutine is short-lived (waits for one message, then exits).

JavaScript usage:

const message = await listen({
    queue_name: "my-queue",
    auto_ack: true,
});
console.log("Received:", message);

func (*ModuleInstance) Publish

func (mi *ModuleInstance) Publish(options PublishOptions) error

Publish sends a message to the specified queue or exchange. It opens a temporary channel, publishes the message, and closes the channel. This is safe for high-throughput scenarios as AMQP channels are lightweight.

If ContentType is "application/x-msgpack", the Body (expected to be a JSON string) is re-encoded as MessagePack before sending.

JavaScript usage:

publish({
    queue_name: "my-queue",
    body: "Hello, RabbitMQ!",
    content_type: "text/plain",
    exchange: "",          // optional, defaults to default exchange
    persistent: true,      // optional, marks message as persistent
    headers: { "x-foo": "bar" }, // optional
});

func (*ModuleInstance) PurgeQueue

func (mi *ModuleInstance) PurgeQueue(name string) (int, error)

PurgeQueue removes all messages from a queue without deleting the queue itself. Returns the number of messages purged.

JavaScript usage:

const count = purgeQueue("my-queue");
console.log(`Purged ${count} messages`);

func (*ModuleInstance) UnbindExchange

func (mi *ModuleInstance) UnbindExchange(options ExchangeUnbindOptions) error

UnbindExchange removes a routing link between two exchanges.

JavaScript usage:

unbindExchange({
    source_exchange_name: "source-exchange",
    destination_exchange_name: "dest-exchange",
    routing_key: "events.*",
});

func (*ModuleInstance) UnbindQueue

func (mi *ModuleInstance) UnbindQueue(options QueueUnbindOptions) error

UnbindQueue removes a binding between a queue and an exchange. Messages will no longer be routed from the exchange to this queue for the specified routing key.

JavaScript usage:

unbindQueue({
    queue_name: "my-queue",
    exchange_name: "my-exchange",
    routing_key: "my.routing.key",
});

type PublishOptions

type PublishOptions struct {
	QueueName     string           `js:"queue_name"`
	Body          string           `js:"body"`
	Headers       amqpDriver.Table `js:"headers"`
	Exchange      string           `js:"exchange"`
	ContentType   string           `js:"content_type"`
	Mandatory     bool             `js:"mandatory"`
	Immediate     bool             `js:"immediate"`
	Persistent    bool             `js:"persistent"`
	CorrelationID string           `js:"correlation_id"`
	ReplyTo       string           `js:"reply_to"`
	Expiration    string           `js:"expiration"`
	MessageID     string           `js:"message_id"`
	Timestamp     int64            `js:"timestamp"`
	Type          string           `js:"type"`
	UserID        string           `js:"user_id"`
	AppID         string           `js:"app_id"`
}

PublishOptions defines all parameters for publishing a message. Field names use `js:"snake_case"` tags to match the JavaScript convention from the original extension, maintaining backward compatibility.

type QueueBindOptions

type QueueBindOptions struct {
	// QueueName is the name of the queue to bind.
	QueueName string `js:"queue_name"`

	// ExchangeName is the name of the exchange to bind to.
	ExchangeName string `js:"exchange_name"`

	// RoutingKey is the routing pattern for the binding.
	RoutingKey string `js:"routing_key"`

	// NoWait: if true, do not wait for server confirmation.
	NoWait bool `js:"no_wait"`

	// Args are optional arguments for the binding.
	Args amqpDriver.Table `js:"args"`
}

QueueBindOptions provides parameters when binding a queue to an exchange. Binding tells the exchange to route messages to this queue based on the routing key.

type QueueUnbindOptions

type QueueUnbindOptions struct {
	// QueueName is the name of the queue to unbind.
	QueueName string `js:"queue_name"`

	// ExchangeName is the name of the exchange to unbind from.
	ExchangeName string `js:"exchange_name"`

	// RoutingKey is the routing key of the binding to remove.
	RoutingKey string `js:"routing_key"`

	// Args must match the args used when the binding was created.
	Args amqpDriver.Table `js:"args"`
}

QueueUnbindOptions provides parameters when removing a queue binding.

type RootModule

type RootModule struct{}

RootModule implements modules.Module and serves as the global singleton registered during init(). Its only job is to create a new ModuleInstance for each VU that imports 'k6/x/amqp'.

func (*RootModule) NewModuleInstance

func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance

NewModuleInstance is called by k6 once per VU. It creates a fresh ModuleInstance with its own connection state, ensuring complete isolation between VUs — no shared mutable state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL