Configure message queues
The message queue topology is an Adobe Commerce and Magento Open Source feature that can be added to existing modules.
Configuring the message queue topology involves creating and modifying the following configuration files in the <module>/etc directory:
communication.xml—Defines aspects of the message queue system that all communication types have in common.queue_consumer.xml—Defines the relationship between an existing queue and its consumer.queue_topology.xml—Defines the message routing rules and declares queues and exchanges.queue_publisher.xml—Defines the exchange where a topic is published.
data-variant=info
data-slots=text
Use cases
Depending on your use case, you can create and configure communication.xml along with one or more of the following files:
-
Publish messages to an existing queue created by a third-party system—Configure the
queue_publisher.xmlfile only. -
Consume messages from an existing queue—Configure the
queue_consumer.xmlfile only. -
Define a local queue and consume messages published by a third-party system—Configure both the
queue_topology.xmlandqueue_consumer.xmlfiles.
communication.xml
The <module>/etc/communication.xml file defines aspects of the message queue system that all communication types have in common. Adobe Commerce supports AMQP, STOMP, and database connections.
Example
The following sample defines two synchronous topics. The first topic is for RPC calls. The second uses a custom service interface.
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="synchronous.rpc.test" request="string" response="string">
<handler name="processRpcRequest" type="Magento\TestModuleSynchronousAmqp\Model\RpcRequestHandler" method="process"/>
</topic>
<topic name="magento.testModuleSynchronousAmqp.api.serviceInterface.execute" schema="Magento\TestModuleSynchronousAmqp\Api\ServiceInterface::execute">
<handler name="processRemoteRequest" type="Magento\TestModuleSynchronousAmqp\Model\RpcRequestHandler" method="process"/>
</topic>
</config>
topic element
Topic configuration is flexible in that you can switch the transport layer for topics at deployment time. These values can be overwritten in the env.php file.
The name parameter is required. The topic definition must include either a request or a schema. Use schema if you want to implement a custom service interface. Otherwise, specify request. If request is specified, then also specify response if the topic is synchronous.
namecat.white.feed and dog.retriever.walk. Wildcards are not supported in communication.xml.requestresponseschema<module>\Api\<ServiceName>::<methodName>.handler element
The handler element specifies the class where the logic for handling messages exists and the method it executes.
nametypemethoddisabledfalse.See Handler processing.
queue_consumer.xml
The queue_consumer.xml file contains one or more consumer elements:
Example
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<consumer name="basic.consumer" queue="basic.consumer.queue" handler="LoggerClass::log"/>
<consumer name="synchronous.rpc.test" queue="synchronous.rpc.test.queue" handler="LoggerClass::log"/>
<consumer name="rpc.test" queue="queue.for.rpc.test.unused.queue" consumerInstance="Magento\Framework\MessageQueue\BatchConsumer"/>
<consumer name="test.product.delete" queue="queue.for.test.product.delete" handler="Magento\Queue\Model\ProductDeleteConsumer::processMessage" maxMessages="200" maxIdleTime="180" sleep="60" onlySpawnWhenMessageAvailable="0"/>
</config>
consumer element
name (required)queue (required)handler<Vendor>\Module\<ServiceName>::<methodName>. See Handler processing.consumerInstanceMagento\Framework\MessageQueue\Consumer.connectionamqp, stomp, or db. If omitted, the connection is resolved automatically. See Connection resolution.maxMessagesmaxIdleTimenull.sleepnull which equals 1 second.onlySpawnWhenMessageAvailable1 or 0 only) that identifies whether a consumer should be spawned only if there is available message in the related queue. The default value is null.data-variant=info
data-slots=text
maxIdleTime and sleep attributes are handled only by consumers fired with a defined maxMessages parameter. The onlySpawnWhenMessageAvailable attribute is only validated by the \Magento\MessageQueue\Model\Cron\ConsumersRunner class that runs consumer processes with cron. For details, see Consumer spawning behavior.queue_topology.xml
The queue_topology.xml file defines the message routing rules and declares queues and exchanges. It contains the following elements:
exchangeexchange/binding(optional)exchange/arguments(optional)exchange/binding/arguments(optional)
Example
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="magento-topic-based-exchange1">
<binding id="topicBasedRouting2" topic="anotherTopic" destination="topic-queue1">
<arguments>
<!-- Optional: additional arguments are processed if specified -->
<argument name="argument1" xsi:type="string">value</argument>
</arguments>
</binding>
<arguments>
<argument name="alternate-exchange" xsi:type="string">magento-log-exchange</argument>
</arguments>
</exchange>
<exchange name="magento-topic-based-exchange2" type="topic" connection="db">
<binding id="topicBasedRouting1" topic="#" destinationType="queue" destination="topic-queue2"/>
<arguments>
<argument name="alternate-exchange" xsi:type="string">magento-log-exchange</argument>
</arguments>
</exchange>
</config>
exchange element
name (required)typetopic.connectionamqp, stomp, or db. If omitted, the connection is resolved automatically. See Connection resolution.durabletrue.autoDeletefalse.internalfalse.binding element
The binding element is a subnode of the exchange element.
id (required)topic (required)destinationTypequeue.destination (required)disabledfalse.Example topic names that include wildcards:
*.*.*mytopic.createOrder.success, mytopic.updatePrice.item1mytopic.createOrder, mytopic.createOrder.success.true#mytopic, mytopic.createOrder.success, this.is.a.long.topic.namemytopic.#mytopic and has a period afterward.mytopic.success, mytopic.createOrder.errornew.mytopic.success*.Order.#.Order. There can be any number of strings (including 0) after that.mytopic.Order, mytopic.Order.Create, newtopic.Order.delete.successmytopic.Sales.Order.Createarguments element
The arguments element is an optional element that contains one or more argument elements. These arguments define key/value pairs that are passed to the broker for processing.
Each argument definition must have the following parameters:
nametypeThe following illustrates an arguments block:
<arguments>
<argument name="warehouseId" xsi:type="int">1</argument>
<argument name="carrierName" xsi:type="string">USPS</argument>
</arguments>
queue_publisher.xml
The queue_publisher.xml file defines which connection and exchange to use to publish messages for a specific topic. It contains the following elements:
publisherpublisher/connection
Example
For RabbitMQ (AMQP):
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
<publisher topic="magento.testModuleSynchronousAmqp.api.serviceInterface.execute" disabled="true" />
<publisher topic="asynchronous.test">
<connection name="amqp" exchange="magento" disabled="false"/>
<connection name="db" exchange="exch1" disabled="true"/>
</publisher>
</config>
For ActiveMQ Artemis (STOMP):
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
<publisher topic="magento.testModuleSynchronousAmqp.api.serviceInterface.execute" disabled="true" />
<publisher topic="asynchronous.test" queue="async.test.queue">
<connection name="stomp" disabled="false"/>
<connection name="db" disabled="true"/>
</publisher>
</config>
publisher element
topic (required)queuedisabledfalse.connection element
The connection element is a subnode of the publisher element. Only one enabled connection can be defined for a publisher at any given time. If you omit the connection element, the connection is resolved automatically and magento is used as the exchange. See Connection resolution.
nameamqp, stomp, or db. If you omit the connection element, the connection is resolved automatically.exchangemagento.disabledfalse.data-variant=warning
data-slots=text
publisher for each topic.Connection resolution
Connection names are resolved dynamically based on the message queue deployment configuration in env.php. If you don't explicitly specify a connection, the system automatically selects the appropriate one.
Automatic resolution
The system checks env.php for message queue configuration:
- If AMQP (RabbitMQ) is configured, the
amqpconnection is used - If STOMP (ActiveMQ Artemis) is configured, the
stompconnection is used - Otherwise, the database (
db) connection is used
Explicit connection values
When specifying a connection explicitly, use one of these values:
amqpqueue_consumer.xml, the value must match the connection attribute in queue_topology.xml.stompdbHandler processing
A handler is a class and method that processes a message. You can define a handler in two places:
- In the
<handler>element of the module'scommunication.xmlfile - In the
handlerattribute of the module'squeue_consumer.xmlfile
The following conditions determine which handler is executed:
- If the consumer in
queue_consumer.xmldoes not have aconsumerInstancedefined, the system uses the default consumer:Magento\Framework\MessageQueue\Consumer. In this case, if the<consumer>element contains thehandlerattribute, it is used and the<handler>element incommunication.xmlis ignored. - If the consumer in
queue_consumer.xmlhas aconsumerInstancedefined, the specific consumer implementation defines how the handler is used.
The following table shows how the built-in consumers process handlers:
communication.xml executed?queue_consumer.xml executed?Magento\Framework\MessageQueue\Consumerqueue_consumer.xmlMagento\Framework\MessageQueue\BatchConsumerqueue_consumer.xmlMagento\AsynchronousOperations\Model\MassConsumerConsumer spawning behavior
Two settings control when consumers spawn and exit: onlySpawnWhenMessageAvailable and consumers-wait-for-messages. Both help reduce server resource usage, but they work differently.
onlySpawnWhenMessageAvailableconsumers-wait-for-messagesfalse, the consumer exits immediately if no messages are available. Because this option is a global option, it cannot be configured separately for each consumer.The key difference between the two settings: onlySpawnWhenMessageAvailable prevents unnecessary process creation, while consumers-wait-for-messages creates a process that immediately terminates if the queue is empty.
Configuration
Set onlySpawnWhenMessageAvailable globally in app/etc/env.php:
'queue' => [
'only_spawn_when_message_available' => 1
]
The default global value is 1. To override for a specific consumer, set the onlySpawnWhenMessageAvailable attribute in queue_consumer.xml. The per-consumer setting takes priority over the global setting.
Recommended combinations
- Infrequent consumers—Combine
onlySpawnWhenMessageAvailablewithmaxIdleTime. The consumer spawns only when needed and terminates after a period of inactivity. - Resource optimization—Combine the global
only_spawn_when_message_availablesetting withconsumers-wait-for-messagesset tofalse. Consumers run only when messages exist and exit when the queue is empty.
env.php Configuration
The message queue connection configuration is defined in the app/etc/env.php file. When connection elements are omitted from queue_consumer.xml, queue_publisher.xml, and queue_topology.xml files, the system automatically uses the connection configured in env.php.
Example configurations
For RabbitMQ (AMQP) only:
return [
// ... other configuration
'queue' => [
'amqp' => [
'host' => 'localhost',
'port' => '5672',
'user' => 'guest',
'password' => 'guest',
'virtualhost' => '/'
],
'consumers_wait_for_messages' => 1
]
];
For ActiveMQ Artemis (STOMP) only:
return [
// ... other configuration
'queue' => [
'stomp' => [
'host' => 'localhost',
'port' => '61613',
'user' => 'admin',
'password' => 'admin'
],
'consumers_wait_for_messages' => 1
]
];
For MySQL (Database) only:
return [
// ... other configuration
'queue' => [
'consumers_wait_for_messages' => 1
]
// Database connection uses existing 'db' configuration
];
When multiple connection types are configured:
If you have both AMQP and STOMP configured, you must specify default_connection to indicate which one the system should use:
return [
// ... other configuration
'queue' => [
'amqp' => [
'host' => 'localhost',
'port' => '5672',
'user' => 'guest',
'password' => 'guest',
'virtualhost' => '/'
],
'stomp' => [
'host' => 'localhost',
'port' => '61613',
'user' => 'admin',
'password' => 'admin'
],
'default_connection' => 'amqp', // Required when multiple connections exist
'consumers_wait_for_messages' => 1
]
];
The default_connection value can be db, amqp, or stomp. When only one connection type is configured in env.php, the system automatically uses that connection and default_connection is not required. If queue/default_connection is specified, that connection is used for all message queues unless a specific connection is defined in a module's queue_topology.xml, queue_publisher.xml, or queue_consumer.xml file.
ActiveMQ Artemis (STOMP) support
data-variant=info
data-slots=text
Updating queue.xml
See Migrate message queue configuration for information about upgrading from Adobe Commerce and Magento Open Source 2.0 or 2.1.