Edit in GitHubLog an issue
Thanks to Comwrap GmbH for contributing this topic!

Requests for specific stores

The Magento_AmqpStore module provides the ability for message queues to process asynchronous requests for specific stores.

Processing messages#

Adobe Commerce and Magento Open Source process each message that is sent to the Message Queue Framework, adding information about the current store. The following plugin implements this behavior:

Copied to your clipboard
app/code/Magento/AmqpStore/Plugin/Framework/Amqp/Bulk/Exchange.php

The plugin executes before the enqueue method in Magento\Framework\Amqp\Bulk\Exchange.

By default, each AMQP message contains a list of properties. One of these properties is application_headers, and these headers are used for transfer the current store_id into the RabbitMQ queue.

Copied to your clipboard
1public function beforeEnqueue(SubjectExchange $subject, $topic, array $envelopes)
2{
3 try {
4 $storeId = $this->storeManager->getStore()->getId();
5 } catch (NoSuchEntityException $e) {
6 $errorMessage = sprintf(
7 "Can't get current storeId and inject to amqp message. Error %s.",
8 $e->getMessage()
9 );
10 $this->logger->error($errorMessage);
11 throw new \LogicException($errorMessage);
12 }
13 $updatedEnvelopes = [];
14 foreach ($envelopes as $envelope) {
15 $properties = $envelope->getProperties();
16 if (!isset($properties)) {
17 $properties = [];
18 }
19 if (isset($properties['application_headers'])) {
20 $headers = $properties['application_headers'];
21 if ($headers instanceof AMQPTable) {
22 try {
23 $headers->set('store_id', $storeId);
24 } catch (AMQPInvalidArgumentException $ea) {
25 $errorMessage = sprintf("Can't set storeId to amqp message. Error %s.", $ea->getMessage());
26 $this->logger->error($errorMessage);
27 throw new AMQPInvalidArgumentException($errorMessage);
28 }
29 $properties['application_headers'] = $headers;
30 }
31 } else {
32 $properties['application_headers'] = new AMQPTable(['store_id' => $storeId]);
33 }
34 $updatedEnvelopes[] = $this->envelopeFactory->create(
35 [
36 'body' => $envelope->getBody(),
37 'properties' => $properties
38 ]
39 );
40 }
41 if (!empty($updatedEnvelopes)) {
42 $envelopes = $updatedEnvelopes;
43 }
44 return [$topic, $envelopes];
45}

In this example, you can see that the plugin checks application_headers and adds the store_id parameter. If the headers do not exist, then plugin creates them. As a result, each RabbitMQ message receives information about the store that is affected by an asynchronous request.

Processing by consumer#

Consumers pick up messages from the RabbitMQ queue and process them.

On a step when a consumer reads a message, the extension executes an around plugin, as shown here:

Copied to your clipboard
Magento\AmqpStore\Plugin\AsynchronousOperations\MassConsumerEnvelopeCallback::aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)
Copied to your clipboard
1public function aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)
2 {
3 $amqpProperties = $message->getProperties();
4 if (isset($amqpProperties['application_headers'])) {
5 $headers = $amqpProperties['application_headers'];
6 if ($headers instanceof AMQPTable) {
7 $headers = $headers->getNativeData();
8 }
9 if (isset($headers['store_id'])) {
10 $storeId = $headers['store_id'];
11 try {
12 $currentStoreId = $this->storeManager->getStore()->getId();
13 } catch (NoSuchEntityException $e) {
14 $this->logger->error(
15 sprintf(
16 "Can't set currentStoreId during processing queue. Message rejected. Error %s.",
17 $e->getMessage()
18 )
19 );
20 $subject->getQueue()->reject($message, false, $e->getMessage());
21 return;
22 }
23 if (isset($storeId) && $storeId !== $currentStoreId) {
24 $this->storeManager->setCurrentStore($storeId);
25 }
26 }
27 }
28 $proceed($message);
29 if (isset($storeId, $currentStoreId) && $storeId !== $currentStoreId) {
30 $this->storeManager->setCurrentStore($currentStoreId);//restore original store value
31 }
32 }

The plugin checks the message headers and sets the current store value in storeManager to the received store_id value.

Was this helpful?
  • Privacy
  • Terms of Use
  • Do not sell my personal information
  • AdChoices
Copyright © 2022 Adobe. All rights reserved.