Documentation
¶
Overview ¶
Package amqp provides a k6 extension for AMQP 0.9.1 (RabbitMQ) messaging.
Architecture overview:
RootModule: Singleton registered with k6 via init(). Acts as a factory that creates one ModuleInstance per Virtual User (VU).
ModuleInstance: Per-VU instance holding its own AMQP connection and exposing all JavaScript-callable methods. This ensures thread safety since each VU runs in its own goroutine and has isolated state.
Event Loop Integration: Asynchronous operations (like Listen) use vu.RegisterCallback() to safely schedule results back onto the k6 event loop, preventing the runtime panics that plagued the original extension (see: github.com/grafana/xk6-amqp/issues/6 and #10).
JavaScript API (all functions are named exports from 'k6/x/amqp'):
connect({ url }) — Open AMQP connection for this VU
close() — Close the VU's AMQP connection
publish({ ... }) — Publish a message to a queue/exchange
listen({ ... }) — Consume one message (returns Promise<string>)
declareQueue({ ... }) — Declare a queue
deleteQueue(name) — Delete a queue
inspectQueue(name) — Inspect queue metadata
bindQueue({ ... }) — Bind a queue to an exchange
unbindQueue({ ... }) — Unbind a queue from an exchange
purgeQueue(name) — Purge all messages from a queue
declareExchange({ ... }) — Declare an exchange
deleteExchange(name) — Delete an exchange
bindExchange({ ... }) — Bind an exchange to another exchange
unbindExchange({ ... }) — Unbind an exchange from another exchange
exchanges.go — Exchange operations for the xk6-amqp extension.
Exchanges are the routing hubs in AMQP. Producers publish messages to exchanges, which then route them to queues based on bindings and routing keys.
Exchange types supported by RabbitMQ:
- "direct": routes to queues whose binding key exactly matches the routing key
- "fanout": routes to all bound queues (ignores routing key)
- "topic": routes based on wildcard pattern matching (* and #)
- "headers": routes based on message header attributes
All methods are on ModuleInstance (per-VU, no shared state).
queues.go — Queue operations for the xk6-amqp extension.
All queue methods are defined on ModuleInstance, meaning each VU uses its own AMQP connection. There are no shared resources.
Queue operations are synchronous (they block the VU goroutine until the AMQP server responds). This is fine because:
- Queue operations are fast (metadata only, no message payloads)
- They are typically called in setup/init, not in the hot loop
- k6 VUs are independent goroutines — blocking one doesn't affect others
Index ¶
- type ConnectOptions
- type DeclareExchangeOptions
- type DeclareQueueOptions
- type ExchangeBindOptions
- type ExchangeUnbindOptions
- type ListenOptions
- type ModuleInstance
- func (mi *ModuleInstance) BindExchange(options ExchangeBindOptions) error
- func (mi *ModuleInstance) BindQueue(options QueueBindOptions) error
- func (mi *ModuleInstance) Close() error
- func (mi *ModuleInstance) Connect(options ConnectOptions) error
- func (mi *ModuleInstance) DeclareExchange(options DeclareExchangeOptions) error
- func (mi *ModuleInstance) DeclareQueue(options DeclareQueueOptions) (map[string]interface{}, error)
- func (mi *ModuleInstance) DeleteExchange(name string) error
- func (mi *ModuleInstance) DeleteQueue(name string) error
- func (mi *ModuleInstance) Exports() modules.Exports
- func (mi *ModuleInstance) InspectQueue(name string) (map[string]interface{}, error)
- func (mi *ModuleInstance) Listen(options ListenOptions) interface{}
- func (mi *ModuleInstance) Publish(options PublishOptions) error
- func (mi *ModuleInstance) PurgeQueue(name string) (int, error)
- func (mi *ModuleInstance) UnbindExchange(options ExchangeUnbindOptions) error
- func (mi *ModuleInstance) UnbindQueue(options QueueUnbindOptions) error
- type PublishOptions
- type QueueBindOptions
- type QueueUnbindOptions
- type RootModule
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnectOptions ¶
type ConnectOptions struct {
// URL is the AMQP connection string (e.g. "amqp://guest:guest@localhost:5672/")
URL string `js:"url"`
}
ConnectOptions holds the parameters for establishing an AMQP connection.
type DeclareExchangeOptions ¶
type DeclareExchangeOptions struct {
// Name is the exchange name.
Name string `js:"name"`
// Kind is the exchange type: "direct", "fanout", "topic", or "headers".
Kind string `js:"kind"`
// Durable exchanges survive broker restarts.
Durable bool `js:"durable"`
// AutoDelete: if true, the exchange is deleted when the last queue
// bound to it is unbound.
AutoDelete bool `js:"auto_delete"`
// Internal exchanges cannot be published to directly by clients;
// they can only receive messages from other exchanges via bindings.
Internal bool `js:"internal"`
// NoWait: if true, do not wait for server confirmation.
NoWait bool `js:"no_wait"`
// Args are optional arguments for the exchange.
Args amqpDriver.Table `js:"args"`
}
DeclareExchangeOptions provides parameters when declaring (creating) an exchange.
type DeclareQueueOptions ¶
type DeclareQueueOptions struct {
// Name is the queue name. If empty, the server generates a unique name.
Name string `js:"name"`
// Durable queues survive broker restarts. Non-durable queues are deleted
// when the broker shuts down.
Durable bool `js:"durable"`
// DeleteWhenUnused causes the queue to be deleted when the last
// consumer unsubscribes.
DeleteWhenUnused bool `js:"delete_when_unused"`
// Exclusive queues are only accessible by the connection that
// declared them and are deleted when that connection closes.
Exclusive bool `js:"exclusive"`
// NoWait: if true, the server will not respond to the declare.
// The client should not wait for a response.
NoWait bool `js:"no_wait"`
// Args are optional arguments for the queue (e.g., x-message-ttl).
Args amqpDriver.Table `js:"args"`
}
DeclareQueueOptions provides parameters when declaring (creating) a queue. All fields are optional except Name.
type ExchangeBindOptions ¶
type ExchangeBindOptions struct {
// DestinationExchangeName receives routed messages.
DestinationExchangeName string `js:"destination_exchange_name"`
// SourceExchangeName is where messages originate.
SourceExchangeName string `js:"source_exchange_name"`
// RoutingKey is the routing pattern for the binding.
RoutingKey string `js:"routing_key"`
// NoWait: if true, do not wait for server confirmation.
NoWait bool `js:"no_wait"`
// Args are optional arguments for the binding.
Args amqpDriver.Table `js:"args"`
}
ExchangeBindOptions provides parameters when binding one exchange to another. This creates a routing link: messages published to the source exchange are also routed to the destination exchange.
type ExchangeUnbindOptions ¶
type ExchangeUnbindOptions struct {
// DestinationExchangeName is the exchange that was receiving messages.
DestinationExchangeName string `js:"destination_exchange_name"`
// SourceExchangeName is the exchange that was sending messages.
SourceExchangeName string `js:"source_exchange_name"`
// RoutingKey must match the routing key used when the binding was created.
RoutingKey string `js:"routing_key"`
// NoWait: if true, do not wait for server confirmation.
NoWait bool `js:"no_wait"`
// Args must match the args used when the binding was created.
Args amqpDriver.Table `js:"args"`
}
ExchangeUnbindOptions provides parameters when removing an exchange-to-exchange binding.
type ListenOptions ¶
type ListenOptions struct {
QueueName string `js:"queue_name"`
Consumer string `js:"consumer"`
AutoAck bool `js:"auto_ack"`
Exclusive bool `js:"exclusive"`
NoLocal bool `js:"no_local"`
NoWait bool `js:"no_wait"`
Args amqpDriver.Table `js:"args"`
}
ListenOptions defines parameters for consuming a single message from a queue. The listen function returns a Promise that resolves with the message body.
type ModuleInstance ¶
type ModuleInstance struct {
// contains filtered or unexported fields
}
ModuleInstance holds all per-VU state: the k6 VU reference, the AMQP connection, and the exported JavaScript API surface. Each VU gets its own instance via RootModule.NewModuleInstance().
func (*ModuleInstance) BindExchange ¶
func (mi *ModuleInstance) BindExchange(options ExchangeBindOptions) error
BindExchange creates a routing link from one exchange (source) to another (destination). Messages published to the source that match the routing key will also be delivered to the destination.
JavaScript usage:
bindExchange({
source_exchange_name: "source-exchange",
destination_exchange_name: "dest-exchange",
routing_key: "events.*",
});
func (*ModuleInstance) BindQueue ¶
func (mi *ModuleInstance) BindQueue(options QueueBindOptions) error
BindQueue creates a binding between a queue and an exchange. Messages routed to the exchange with a matching routing key will be delivered to the queue.
JavaScript usage:
bindQueue({
queue_name: "my-queue",
exchange_name: "my-exchange",
routing_key: "my.routing.key",
});
func (*ModuleInstance) Close ¶
func (mi *ModuleInstance) Close() error
Close gracefully shuts down the AMQP connection for this VU. After calling Close(), the VU must call Connect() again before performing any AMQP operations.
JavaScript usage:
close();
func (*ModuleInstance) Connect ¶
func (mi *ModuleInstance) Connect(options ConnectOptions) error
Connect establishes an AMQP connection for this VU. It should be called once in the setup or init phase of a k6 script.
JavaScript usage:
connect({ url: "amqp://guest:guest@localhost:5672/" });
func (*ModuleInstance) DeclareExchange ¶
func (mi *ModuleInstance) DeclareExchange(options DeclareExchangeOptions) error
DeclareExchange creates or verifies an exchange on the AMQP server. If the exchange already exists with identical properties, this is a no-op.
JavaScript usage:
declareExchange({
name: "my-exchange",
kind: "topic",
durable: true,
});
func (*ModuleInstance) DeclareQueue ¶
func (mi *ModuleInstance) DeclareQueue(options DeclareQueueOptions) (map[string]interface{}, error)
DeclareQueue creates or verifies a queue on the AMQP server. If the queue already exists with identical properties, this is a no-op. If it exists with different properties, the server will return an error.
Returns a map with queue metadata:
{ name: "my-queue", messages: 0, consumers: 0 }
JavaScript usage:
const q = declareQueue({ name: "my-queue", durable: true });
console.log(`Queue ${q.name} has ${q.messages} messages`);
func (*ModuleInstance) DeleteExchange ¶
func (mi *ModuleInstance) DeleteExchange(name string) error
DeleteExchange removes an exchange from the AMQP server. Any existing bindings to the exchange are also removed.
JavaScript usage:
deleteExchange("my-exchange");
func (*ModuleInstance) DeleteQueue ¶
func (mi *ModuleInstance) DeleteQueue(name string) error
DeleteQueue removes a queue from the AMQP server. Any pending messages in the queue are discarded.
JavaScript usage:
deleteQueue("my-queue");
func (*ModuleInstance) Exports ¶
func (mi *ModuleInstance) Exports() modules.Exports
Exports implements modules.Instance and returns the named exports that k6 will make available when a script does:
import { connect, publish, listen, ... } from 'k6/x/amqp';
func (*ModuleInstance) InspectQueue ¶
func (mi *ModuleInstance) InspectQueue(name string) (map[string]interface{}, error)
InspectQueue returns metadata about a queue without modifying it. This is useful for checking message counts and consumer counts.
Returns a map:
{ name: "my-queue", messages: 42, consumers: 3 }
JavaScript usage:
const info = inspectQueue("my-queue");
console.log(`${info.messages} messages waiting`);
func (*ModuleInstance) Listen ¶
func (mi *ModuleInstance) Listen(options ListenOptions) interface{}
Listen consumes ONE message from the specified queue and returns it as a JavaScript Promise. This is the key architectural change from the original extension:
OLD (broken): spawned a goroutine that called a JS callback directly, causing concurrent access panics on the JS runtime.
NEW (safe): uses vu.RegisterCallback() to schedule the result back onto k6's event loop. The goroutine reads from the AMQP channel in the background, then uses the registered callback to safely resolve/reject the Promise on the main JS thread.
Performance note: RegisterCallback + Promise adds minimal overhead (~microseconds) compared to the AMQP network round-trip (milliseconds). The goroutine is short-lived (waits for one message, then exits).
JavaScript usage:
const message = await listen({
queue_name: "my-queue",
auto_ack: true,
});
console.log("Received:", message);
func (*ModuleInstance) Publish ¶
func (mi *ModuleInstance) Publish(options PublishOptions) error
Publish sends a message to the specified queue or exchange. It opens a temporary channel, publishes the message, and closes the channel. This is safe for high-throughput scenarios as AMQP channels are lightweight.
If ContentType is "application/x-msgpack", the Body (expected to be a JSON string) is re-encoded as MessagePack before sending.
JavaScript usage:
publish({
queue_name: "my-queue",
body: "Hello, RabbitMQ!",
content_type: "text/plain",
exchange: "", // optional, defaults to default exchange
persistent: true, // optional, marks message as persistent
headers: { "x-foo": "bar" }, // optional
});
func (*ModuleInstance) PurgeQueue ¶
func (mi *ModuleInstance) PurgeQueue(name string) (int, error)
PurgeQueue removes all messages from a queue without deleting the queue itself. Returns the number of messages purged.
JavaScript usage:
const count = purgeQueue("my-queue");
console.log(`Purged ${count} messages`);
func (*ModuleInstance) UnbindExchange ¶
func (mi *ModuleInstance) UnbindExchange(options ExchangeUnbindOptions) error
UnbindExchange removes a routing link between two exchanges.
JavaScript usage:
unbindExchange({
source_exchange_name: "source-exchange",
destination_exchange_name: "dest-exchange",
routing_key: "events.*",
});
func (*ModuleInstance) UnbindQueue ¶
func (mi *ModuleInstance) UnbindQueue(options QueueUnbindOptions) error
UnbindQueue removes a binding between a queue and an exchange. Messages will no longer be routed from the exchange to this queue for the specified routing key.
JavaScript usage:
unbindQueue({
queue_name: "my-queue",
exchange_name: "my-exchange",
routing_key: "my.routing.key",
});
type PublishOptions ¶
type PublishOptions struct {
QueueName string `js:"queue_name"`
Body string `js:"body"`
Headers amqpDriver.Table `js:"headers"`
Exchange string `js:"exchange"`
ContentType string `js:"content_type"`
Mandatory bool `js:"mandatory"`
Immediate bool `js:"immediate"`
Persistent bool `js:"persistent"`
CorrelationID string `js:"correlation_id"`
ReplyTo string `js:"reply_to"`
Expiration string `js:"expiration"`
MessageID string `js:"message_id"`
Timestamp int64 `js:"timestamp"`
Type string `js:"type"`
UserID string `js:"user_id"`
AppID string `js:"app_id"`
}
PublishOptions defines all parameters for publishing a message. Field names use `js:"snake_case"` tags to match the JavaScript convention from the original extension, maintaining backward compatibility.
type QueueBindOptions ¶
type QueueBindOptions struct {
// QueueName is the name of the queue to bind.
QueueName string `js:"queue_name"`
// ExchangeName is the name of the exchange to bind to.
ExchangeName string `js:"exchange_name"`
// RoutingKey is the routing pattern for the binding.
RoutingKey string `js:"routing_key"`
// NoWait: if true, do not wait for server confirmation.
NoWait bool `js:"no_wait"`
// Args are optional arguments for the binding.
Args amqpDriver.Table `js:"args"`
}
QueueBindOptions provides parameters when binding a queue to an exchange. Binding tells the exchange to route messages to this queue based on the routing key.
type QueueUnbindOptions ¶
type QueueUnbindOptions struct {
// QueueName is the name of the queue to unbind.
QueueName string `js:"queue_name"`
// ExchangeName is the name of the exchange to unbind from.
ExchangeName string `js:"exchange_name"`
// RoutingKey is the routing key of the binding to remove.
RoutingKey string `js:"routing_key"`
// Args must match the args used when the binding was created.
Args amqpDriver.Table `js:"args"`
}
QueueUnbindOptions provides parameters when removing a queue binding.
type RootModule ¶
type RootModule struct{}
RootModule implements modules.Module and serves as the global singleton registered during init(). Its only job is to create a new ModuleInstance for each VU that imports 'k6/x/amqp'.
func (*RootModule) NewModuleInstance ¶
func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance
NewModuleInstance is called by k6 once per VU. It creates a fresh ModuleInstance with its own connection state, ensuring complete isolation between VUs — no shared mutable state.