Edit in GitHubLog an issue

Configure message queues

The message queue topology is an Adobe Commerce and Magento Open Source feature. You can also add it to existing modules.

Configuring the message queue topology involves creating and modifying the following configuration files in the <module>/etc directory:

Use cases#

Depending on your needs, you may only need to create and configure communication.xml and one or two of these files.

  • If you only want to publish to an existing queue created by a 3rd party system, you will only need the queue_publisher.xml file.
  • If you only want to consume from an existing queue, you will only need the queue_consumer.xml config file.
  • In cases where you want to configure the local queue and publish to it for 3rd party systems to consume, you will need the queue_publisher.xml and queue_topology.xml files.
  • When you want to configure the local queue and consume messages published by 3rd party system, you will need the queue_topology.xml and queue_consumer.xml files.

communication.xml#

The <module>/etc/communication.xml file defines aspects of the message queue system that all communication types have in common. This release supports AMQP and database connections.

Example#

The following sample defines two synchronous topics. The first topic is for RPC calls. The second uses a custom service interface.

Copied to your clipboard
1<?xml version="1.0"?>
2<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
3 <topic name="synchronous.rpc.test" request="string" response="string">
4 <handler name="processRpcRequest" type="Magento\TestModuleSynchronousAmqp\Model\RpcRequestHandler" method="process"/>
5 </topic>
6 <topic name="magento.testModuleSynchronousAmqp.api.serviceInterface.execute" schema="Magento\TestModuleSynchronousAmqp\Api\ServiceInterface::execute">
7 <handler name="processRemoteRequest" type="Magento\TestModuleSynchronousAmqp\Model\RpcRequestHandler" method="process"/>
8 </topic>
9</config>

topic element#

Topic configuration is flexible in that you can switch the transport layer for topics at deployment time. These values can be overwritten in the env.php file.

The name parameter is required. The topic definition must include either a request or a schema. Use schema if you want to implement a custom service interface. Otherwise, specify request. If request is specified, then also specify response if the topic is synchronous.

ParameterDescription
nameA string that uniquely identifies the topic. A topic name should be a series of strings that are separated by periods. The leftmost string should be the most general, and each string afterward should narrow the scope. For example, to describe actions for tending to pets, you might create names such as cat.white.feed and dog.retriever.walk. Wildcards are not supported in the communication.xml file.
requestSpecifies the data type of the topic.
responseSpecifies the format of the response. This parameter is required if you are defining a synchronous topic. Omit this parameter if you are defining an asynchronous topic.
schemaThe interface that describes the structure of the message. The format must be <module>\Api\<ServiceName>::<methodName>.

handler element#

The handler element specifies the class where the logic for handling messages exists and the method it executes.

ParameterDescription
nameA string that uniquely defines the handler. The name can be derived from the topic name if the handler is specific to the topic. If the handler provides more generic capabilities, name the handler so that it describes those capabilities.
typeThe class or interface that defines the handler.
methodThe method this handler executes.
disabledDetermines whether this handler is disabled. The default value is false.

queue_consumer.xml#

The queue_consumer.xml file contains one or more consumer elements:

Example#

Copied to your clipboard
1<?xml version="1.0"?>
2<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
3 <consumer name="basic.consumer" queue="basic.consumer.queue" handler="LoggerClass::log"/>
4 <consumer name="synchronous.rpc.test" queue="synchronous.rpc.test.queue" handler="LoggerClass::log"/>
5 <consumer name="rpc.test" queue="queue.for.rpc.test.unused.queue" consumerInstance="Magento\Framework\MessageQueue\BatchConsumer" connection="amqp"/>
6 <consumer name="test.product.delete" queue="queue.for.test.product.delete" connection="amqp" handler="Magento\Queue\Model\ProductDeleteConsumer::processMessage" maxMessages="200" maxIdleTime="180" sleep="60" onlySpawnWhenMessageAvailable="0"/>
7</config>

consumer element#

AttributeDescription
name (required)The name of the consumer.
queue (required)Defines the queue name to send the message to.
handlerSpecifies the class and method that processes the message. The value must be specified in the format <Vendor>\Module\<ServiceName>::<methodName>.
consumerInstanceThe class name that consumes the message. Default value: Magento\Framework\MessageQueue\Consumer.
connectionConnection is defined dynamically based on deployment configuration of message queue in env.php. If AMQP is configured in deployment configuration, AMQP connection is used. Otherwise, db connection is used. If you still want to specify connection type for consumer, keep in mind that for AMQP connections, the connection name must match the connection attribute in the queue_topology.xml file. Otherwise, the connection name must be db.
maxMessagesSpecifies the maximum number of messages to consume.
maxIdleTimeDefines the maximum waiting time in seconds for a new message from the queue. If no message was handled within this period of time, the consumer exits. Default value: null
sleepSpecifies time in seconds to sleep before checking if a new message is available in the queue. Default value is null which equals to 1 second.
onlySpawnWhenMessageAvailableBoolean value (1 or 0 only) that identifies whether a consumer should be spawned only if there is available message in the related queue. Default value: null

It is possible to set onlySpawnWhenMessageAvailable globally by setting queue/only_spawn_when_message_available equal to 0 or 1 in app/etc/env.php. By default, the global value of only_spawn_when_message_available for all consumers is 1. The onlySpawnWhenMessageAvailable consumer attribute has higher priority than the global queue/only_spawn_when_message_available setting in app/etc/env.php. Therefore, it is possible to overwrite the global only_spawn_when_message_available value by setting onlySpawnWhenMessageAvailable equal to 0 or 1 for each consumer in queue_consumer.xml.

The onlySpawnWhenMessageAvailable and maxIdleTime attributes may be combined if a specific consumer needs to run infrequently. The consumer will only spawn when it is needed, and it terminates itself if it is inactive for a certain period. It is also possible to combine the global queue/only_spawn_when_message_available setting in app/etc/env.php with the queue/consumers-wait-for-messages setting. That means that the consumer will run only when there is an available message in the queue, and it will be terminated when there are no more messages to process. This combination of settings is recommended to save server resources such as CPU usage.

The consumers-wait-for-messages option works similar to onlySpawnWhenMessageAvailable. When it is set to false, the consumer processes all messages and exits if there are no available messages in the queue. The problem is that every time the cron job cron_consumers_runner runs, it spawns a new consumer process, the consumer checks if messages are available, and it terminates itself if there are no messages. Meanwhile, the onlySpawnWhenMessageAvailable attribute first checks if there are available messages, and it spawns a new consumer process only if there are messages. It means that it does not spawn unneeded processes which take up memory, live for a very short period, and then disappear.

handler element#

A handler is a class and method that processes a message. The application has two ways to define a handler for messages.

  • In the <handler> element of the module's communication.xml file
  • In the handler attribute of the module's queue_consumer.xml file

The following conditions determine how these handlers are processed:

  • If the consumer in queue_consumer.xml does not have a consumerInstance defined, then the system uses the default consumer: Magento\Framework\MessageQueue\Consumer. In this case, if the <consumer> element contains the handler attribute, then it will be used, and the <handler> element in communication.xml will be ignored.
  • If the consumer in queue_consumer.xml has a consumerInstance defined, then the specific consumer implementation defines how the handler is used.

The application provides these consumers out-of-the-box:

Class nameHandler in communication.xml will be executed?Handler in queue_consumer.xml will be executed?
Magento\Framework\MessageQueue\ConsumerOnly if not defined in queue_consumer.xmlYes, if exists
Magento\Framework\MessageQueue\BatchConsumerOnly if not defined in queue_consumer.xmlYes, if exists
Magento\AsynchronousOperations\Model\MassConsumerYes, if existsYes, if exists

queue_topology.xml#

The queue_topology.xml file defines the message routing rules and declares queues and exchanges. It contains the following elements:

  • exchange
  • exchange/binding (optional)
  • exchange/arguments (optional)
  • exchange/binding/arguments (optional)

Example#

Copied to your clipboard
1<?xml version="1.0"?>
2<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
3 <exchange name="magento-topic-based-exchange1">
4 <binding id="topicBasedRouting2" topic="anotherTopic" destination="topic-queue1">
5 <arguments>
6 <!--Not part of our use case, but will be processed if someone specifies them-->
7 <argument name="argument1" xsi:type="string">value</argument>
8 </arguments>
9 </binding>
10 <arguments>
11 <argument name="alternate-exchange" xsi:type="string">magento-log-exchange</argument>
12 </arguments>
13 </exchange>
14 <exchange name="magento-topic-based-exchange2" type="topic" connection="db">
15 <binding id="topicBasedRouting1" topic="#" destinationType="queue" destination="topic-queue2"/>
16 <arguments>
17 <argument name="alternate-exchange" xsi:type="string">magento-log-exchange</argument>
18 </arguments>
19 </exchange>
20</config>

exchange element#

AttributeDescription
name (required)A unique ID for the exchange.
typeSpecifies the type of exchange. The default value is topic because only topic type is supported.
connection (required)Connection is defined dynamically based on deployment configuration of message queue in env.php. If AMQP is configured in deployment configuration, AMQP connection is used. Otherwise, db connection is used. If you still want to specify connection, the connection name must be amqp for AMQP. For MySQL connections, the connection name must be db.
durableBoolean value indicating whether the exchange is persistent. Non-durable exchanges are purged when the server restarts. The default is true.
autoDeleteBoolean value indicating whether the exchange is deleted when all queues have finished using it. The default is false.
internalBoolean value. If set to true, the exchange may not be used directly by publishers, but only when bound to other exchanges. The default is false.

binding element#

The binding element is a subnode of the exchange element.

AttributeDescription
id (required)A unique ID for this binding.
topic (required)The name of a topic. You can specify an asterisk (*) or pound sign (#) as wildcards. These are described below the table.
destinationTypeThe default value is queue.
destination (required)Identifies the name of a queue.
disabledDetermines whether this binding is disabled. The default value is false.

Example topic names that include wildcards:

PatternDescriptionExample matching topicsExample non-matching topics
*.*.*Matches any topic that contains exactly two periods.mytopic.createOrder.success, mytopic.updatePrice.item1mytopic.createOrder, mytopic.createOrder.success.true
#Matches any topic name.mytopic, mytopic.createOrder.success, this.is.a.long.topic.nameNot applicable
mytopic.#Matches any topic name that begins with mytopic and has a period afterward.mytopic.success, mytopic.createOrder.errornew.mytopic.success,
*.Order.#There must be one string before .Order. There can be any number of strings (including 0) after that.mytopic.Order, mytopic.Order.Create, newtopic.Order.delete.successmytopic.Sales.Order.Create

arguments element#

The arguments element is an optional element that contains one or more argument elements. These arguments define key/value pairs that are passed to the broker for processing.

Each argument definition must have the following parameters:

AttributeDescription
nameThe parameter name
typeThe data type of the value

The following illustrates an arguments block:

Copied to your clipboard
1<arguments>
2 <argument name="warehouseId" xsi:type="int">1</argument>
3 <argument name="carrierName" xsi:type="string">USPS</argument>
4</arguments>

queue_publisher.xml#

The queue_publisher.xml file defines which connection and exchange to use to publish messages for a specific topic. It contains the following elements:

Example#

Copied to your clipboard
1<?xml version="1.0"?>
2<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
3 <publisher topic="magento.testModuleSynchronousAmqp.api.serviceInterface.execute" disabled="true" />
4 <publisher topic="asynchronous.test">
5 <connection name="amqp" exchange="magento" disabled="false"/>
6 <connection name="db" exchange="exch1" disabled="true"/>
7 </publisher>
8</config>

publisher element#

AttributeDescription
topic (required)The name of the topic.
disabledDetermines whether this queue is disabled. The default value is false.

connection element#

The connection element is a subnode of the publisher element. There must not be more than one enabled active connection to a publisher defined at a time. If you omit the connection element, connection will be defined dynamically based on deployment configuration of message queue in env.php and exchange magento will be used. If AMQP is configured in deployment configuration, AMQP connection is used. Otherwise, db connection is used.

AttributeDescription
nameConnection name is defined dynamically based on deployment configuration of message queue in env.php. If you still want to specify connection type for publisher, keep in mind that for AMQP connections, the connection name must match the connection attribute in the queue_topology.xml file. Otherwise, the connection name must be db.
exchangeThe name of the exchange to publish to. The default system exchange name is magento.
disabledDetermines whether this queue is disabled. The default value is false.

Updating queue.xml#

See Migrate message queue configuration for information about upgrading from Adobe Commerce and Magento Open Source 2.0 or 2.1.

Was this helpful?
  • Privacy
  • Terms of Use
  • Do not sell my personal information
  • AdChoices
Copyright © 2022 Adobe. All rights reserved.