Message queues

Message queues provide an asynchronous communications mechanism in which the sender and the receiver of a message do not contact each other directly. When a sender places a message onto a queue, the message is stored until the recipient receives it.

Message Queue Framework overview

The Adobe Commerce Message Queue Framework (MQF) is a fully-functional system that allows a module to publish messages to queues and create consumers to receive them asynchronously.

The MQF supports the following messaging brokers:

Broker
Protocol
Description
RabbitMQ
AMQP 0.9.1
The primary messaging broker with a scalable platform for sending and receiving messages. Includes a mechanism for storing undelivered messages.
Apache ActiveMQ Artemis
STOMP
An alternative messaging broker using Simple Text Oriented Messaging Protocol (STOMP) for reliable and scalable messaging.
MySQL adapter
Database
A basic message queue system that stores messages in the database using three tables: queue, queue_message, and queue_message_status. Cron jobs ensure consumers receive messages.
data-variant=info
data-slots=text
The MySQL adapter is not scalable. Use an external message broker like RabbitMQ or ActiveMQ Artemis for production environments whenever possible.

See Configure message queues for information about setting up the message queue system.

Publish messages to a queue

Use the publish method defined in PublisherInterface to send a message to the queue:

$publisher->publish($topic, $message);

When using the MySQL adapter, a message published to multiple queues creates:

Retrieving these messages requires a join on the queue, queue_message, and queue_message_status tables.

Instantiate consumers

The procedure for instantiating a consumer differs depending on the message queue system.

RabbitMQ and ActiveMQ Artemis

For external brokers, consumers are defined in a queue_consumer.xml file. The consumer listens to the queue, receives messages, and invokes a callback method for each one.

The following example instantiates the customer_created_listener consumer, which calls Magento\Some\Class::processMessage($message) for each message:

$this->consumerFactory->get('customer_created_listener')
    ->process();

MySQL adapter

For the MySQL adapter, implement ConsumerInterface::process($maxNumberOfMessages) and perform the following steps:

  1. Get the queue name using ConsumerConfigurationInterface::getQueueName.
  2. Select $maxNumberOfMessages records, filtering on queue_name. Join all three tables (queue, queue_message, queue_message_status). Extract fewer records at a time to improve load distribution across consumers.
  3. Decode the message using the topic name from ConsumerConfigurationInterface.
  4. Invoke ConsumerConfigurationInterface::getCallback with the decoded data.

Switch from MySQL to an external broker

You can switch from the MySQL adapter to an external message broker by adding runtime configuration to redefine the adapter for a topic. The configuration disables the db connection and enables the external broker connection.

The following example shows how to switch a topic to an external broker. Replace the placeholder values based on your broker:

Broker
Publisher value
Connection name
RabbitMQ
amqp-magento
amqp
ActiveMQ Artemis
stomp-magento
stomp
'queue' => [
    'topics' => [
        '<topic.name>' => [
            'publisher' => '<amqp-magento|stomp-magento>'
        ]
    ],
    'config' => [
        'publishers' => [
            '<topic.name>' => [
                'connections' => [
                    '<amqp|stomp>' => [
                        'name' => '<amqp|stomp>',
                        'exchange' => 'magento',
                        'disabled' => false
                    ],
                    'db' => [
                        'name' => 'db',
                        'exchange' => 'magento',
                        'disabled' => true
                    ]
                ]
            ]
        ]
    ],
    'consumers' => [
        '<topic.name>' => [
            'connection' => '<amqp|stomp>',
        ]
    ]
],

For example, to switch the product_action_attribute.update topic to RabbitMQ, use amqp-magento as the publisher and amqp as the connection name.