Requests for specific stores
The Magento_AmqpStore
module provides the ability for message queues to process asynchronous requests for specific stores.
Processing messages#
Adobe Commerce and Magento Open Source process each message that is sent to the Message Queue Framework, adding information about the current store. The following plugin implements this behavior:
Copied to your clipboardapp/code/Magento/AmqpStore/Plugin/Framework/Amqp/Bulk/Exchange.php
The plugin executes before the enqueue
method in Magento\Framework\Amqp\Bulk\Exchange
.
By default, each AMQP message contains a list of properties. One of these properties is application_headers
, and these headers are used for transfer the current store_id
into the RabbitMQ queue.
Copied to your clipboard1public function beforeEnqueue(SubjectExchange $subject, $topic, array $envelopes)2{3 try {4 $storeId = $this->storeManager->getStore()->getId();5 } catch (NoSuchEntityException $e) {6 $errorMessage = sprintf(7 "Can't get current storeId and inject to amqp message. Error %s.",8 $e->getMessage()9 );10 $this->logger->error($errorMessage);11 throw new \LogicException($errorMessage);12 }13 $updatedEnvelopes = [];14 foreach ($envelopes as $envelope) {15 $properties = $envelope->getProperties();16 if (!isset($properties)) {17 $properties = [];18 }19 if (isset($properties['application_headers'])) {20 $headers = $properties['application_headers'];21 if ($headers instanceof AMQPTable) {22 try {23 $headers->set('store_id', $storeId);24 } catch (AMQPInvalidArgumentException $ea) {25 $errorMessage = sprintf("Can't set storeId to amqp message. Error %s.", $ea->getMessage());26 $this->logger->error($errorMessage);27 throw new AMQPInvalidArgumentException($errorMessage);28 }29 $properties['application_headers'] = $headers;30 }31 } else {32 $properties['application_headers'] = new AMQPTable(['store_id' => $storeId]);33 }34 $updatedEnvelopes[] = $this->envelopeFactory->create(35 [36 'body' => $envelope->getBody(),37 'properties' => $properties38 ]39 );40 }41 if (!empty($updatedEnvelopes)) {42 $envelopes = $updatedEnvelopes;43 }44 return [$topic, $envelopes];45}
In this example, you can see that the plugin checks application_headers
and adds the store_id
parameter. If the headers do not exist, then plugin creates them. As a result, each RabbitMQ message receives information about the store that is affected by an asynchronous request.
Processing by consumer#
Consumers pick up messages from the RabbitMQ queue and process them.
On a step when a consumer reads a message, the extension executes an around plugin, as shown here:
Copied to your clipboardMagento\AmqpStore\Plugin\AsynchronousOperations\MassConsumerEnvelopeCallback::aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)
Copied to your clipboard1public function aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)2 {3 $amqpProperties = $message->getProperties();4 if (isset($amqpProperties['application_headers'])) {5 $headers = $amqpProperties['application_headers'];6 if ($headers instanceof AMQPTable) {7 $headers = $headers->getNativeData();8 }9 if (isset($headers['store_id'])) {10 $storeId = $headers['store_id'];11 try {12 $currentStoreId = $this->storeManager->getStore()->getId();13 } catch (NoSuchEntityException $e) {14 $this->logger->error(15 sprintf(16 "Can't set currentStoreId during processing queue. Message rejected. Error %s.",17 $e->getMessage()18 )19 );20 $subject->getQueue()->reject($message, false, $e->getMessage());21 return;22 }23 if (isset($storeId) && $storeId !== $currentStoreId) {24 $this->storeManager->setCurrentStore($storeId);25 }26 }27 }28 $proceed($message);29 if (isset($storeId, $currentStoreId) && $storeId !== $currentStoreId) {30 $this->storeManager->setCurrentStore($currentStoreId);//restore original store value31 }32 }
The plugin checks the message headers and sets the current store value in storeManager
to the received store_id
value.