Edit in GitHubLog an issue
Thanks to Comwrap GmbH for contributing this topic!

Requests for specific stores

The Magento_AmqpStore module provides the ability for message queues to process asynchronous requests for specific stores.

Processing messages

Adobe Commerce and Magento Open Source process each message that is sent to the Message Queue Framework, adding information about the current store. The following plugin implements this behavior:

Copied to your clipboard
app/code/Magento/AmqpStore/Plugin/Framework/Amqp/Bulk/Exchange.php

The plugin executes before the enqueue method in Magento\Framework\Amqp\Bulk\Exchange.

By default, each AMQP message contains a list of properties. One of these properties is application_headers, and these headers are used for transfer the current store_id into the RabbitMQ queue.

Copied to your clipboard
public function beforeEnqueue(SubjectExchange $subject, $topic, array $envelopes)
{
try {
$storeId = $this->storeManager->getStore()->getId();
} catch (NoSuchEntityException $e) {
$errorMessage = sprintf(
"Can't get current storeId and inject to amqp message. Error %s.",
$e->getMessage()
);
$this->logger->error($errorMessage);
throw new \LogicException($errorMessage);
}
$updatedEnvelopes = [];
foreach ($envelopes as $envelope) {
$properties = $envelope->getProperties();
if (!isset($properties)) {
$properties = [];
}
if (isset($properties['application_headers'])) {
$headers = $properties['application_headers'];
if ($headers instanceof AMQPTable) {
try {
$headers->set('store_id', $storeId);
} catch (AMQPInvalidArgumentException $ea) {
$errorMessage = sprintf("Can't set storeId to amqp message. Error %s.", $ea->getMessage());
$this->logger->error($errorMessage);
throw new AMQPInvalidArgumentException($errorMessage);
}
$properties['application_headers'] = $headers;
}
} else {
$properties['application_headers'] = new AMQPTable(['store_id' => $storeId]);
}
$updatedEnvelopes[] = $this->envelopeFactory->create(
[
'body' => $envelope->getBody(),
'properties' => $properties
]
);
}
if (!empty($updatedEnvelopes)) {
$envelopes = $updatedEnvelopes;
}
return [$topic, $envelopes];
}

In this example, you can see that the plugin checks application_headers and adds the store_id parameter. If the headers do not exist, then plugin creates them. As a result, each RabbitMQ message receives information about the store that is affected by an asynchronous request.

Processing by consumer

Consumers pick up messages from the RabbitMQ queue and process them.

On a step when a consumer reads a message, the extension executes an around plugin, as shown here:

Copied to your clipboard
Magento\AmqpStore\Plugin\AsynchronousOperations\MassConsumerEnvelopeCallback::aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)
Copied to your clipboard
public function aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)
{
$amqpProperties = $message->getProperties();
if (isset($amqpProperties['application_headers'])) {
$headers = $amqpProperties['application_headers'];
if ($headers instanceof AMQPTable) {
$headers = $headers->getNativeData();
}
if (isset($headers['store_id'])) {
$storeId = $headers['store_id'];
try {
$currentStoreId = $this->storeManager->getStore()->getId();
} catch (NoSuchEntityException $e) {
$this->logger->error(
sprintf(
"Can't set currentStoreId during processing queue. Message rejected. Error %s.",
$e->getMessage()
)
);
$subject->getQueue()->reject($message, false, $e->getMessage());
return;
}
if (isset($storeId) && $storeId !== $currentStoreId) {
$this->storeManager->setCurrentStore($storeId);
}
}
}
$proceed($message);
if (isset($storeId, $currentStoreId) && $storeId !== $currentStoreId) {
$this->storeManager->setCurrentStore($currentStoreId);//restore original store value
}
}

The plugin checks the message headers and sets the current store value in storeManager to the received store_id value.

  • Privacy
  • Terms of Use
  • Do not sell or share my personal information
  • AdChoices
Copyright © 2024 Adobe. All rights reserved.