Edit in GitHubLog an issue

Example bulk operations implementation

This document describes how bulk operations can be implemented. There are three primary tasks to accomplish this:

  • Create a publisher that sends messages to the message queue
  • Create a consumer that receives and processes messages
  • Configure the message queues

Create a publisher#

A publisher's duties include scheduling a bulk operation. It must generate a bulkUuid for each operation, send each operation to the message queue, and report on the status of each operations.

The following code sample shows how these duties can be completed.

Copied to your clipboard
1<?php
2/**
3 * Copyright © Magento, Inc. All rights reserved.
4 * See COPYING.txt for license details.
5 */
6
7use Magento\AsynchronousOperations\Api\Data\OperationInterface;
8use Magento\AsynchronousOperations\Api\Data\OperationInterfaceFactory;
9use Magento\Authorization\Model\UserContextInterface;
10use Magento\Framework\Bulk\BulkManagementInterface;
11use Magento\Framework\DataObject\IdentityGeneratorInterface;
12use Magento\Framework\Exception\LocalizedException;
13use Magento\Framework\Json\Helper\Data as JsonHelper;
14use Magento\Framework\UrlInterface;
15
16/**
17 * Class ScheduleBulk
18 */
19class ScheduleBulk
20{
21 /**
22 * @var BulkManagementInterface
23 */
24 private $bulkManagement;
25
26 /**
27 * @var OperationInterfaceFactory
28 */
29 private $operationFactory;
30
31 /**
32 * @var IdentityGeneratorInterface
33 */
34 private $identityService;
35
36 /**
37 * @var UrlInterface
38 */
39 private $urlBuilder;
40
41 /**
42 * @var UserContextInterface
43 */
44 private $userContext;
45
46 /**
47 * @var JsonHelper
48 */
49 private $jsonHelper;
50
51 /**
52 * ScheduleBulk constructor.
53 *
54 * @param BulkManagementInterface $bulkManagement
55 * @param OperationInterfaceFactory $operationFactory
56 * @param IdentityGeneratorInterface $identityService
57 * @param UserContextInterface $userContextInterface
58 * @param UrlInterface $urlBuilder
59 */
60 public function __construct(
61 BulkManagementInterface $bulkManagement,
62 OperationInterfaceFactory $operationFactory,
63 IdentityGeneratorInterface $identityService,
64 UserContextInterface $userContextInterface,
65 UrlInterface $urlBuilder,
66 JsonHelper $jsonHelper
67 ) {
68 $this->userContext = $userContextInterface;
69 $this->bulkManagement = $bulkManagement;
70 $this->operationFactory = $operationFactory;
71 $this->identityService = $identityService;
72 $this->urlBuilder = $urlBuilder;
73 $this->jsonHelper = $jsonHelper;
74
75 }
76
77 /**
78 * Schedule new bulk operation
79 *
80 * @param array $operationData
81 * @throws LocalizedException
82 * @return void
83 */
84 public function execute($operationData)
85 {
86 $operationCount = count($operationData);
87 if ($operationCount > 0) {
88 $bulkUuid = $this->identityService->generateId();
89 $bulkDescription = 'Specify here your bulk description';
90
91 $operations = [];
92 foreach ($operationData as $operation) {
93
94 $serializedData = [
95 //this data will be displayed in Failed item grid in ID column
96 'entity_id' => $operation['entity_id'],
97 //add here logic to add url for your entity(this link will be displayed in the Failed item grid)
98 'entity_link' => $this->urlBuilder->getUrl('your_url'),
99 //this data will be displayed in Failed item grid in the column "Meta Info"
100 'meta_information' => 'Specify here meta information for your entity',//this data will be displayed in Failed item grid in the column "Meta Info"
101 ];
102 $data = [
103 'data' => [
104 'bulk_uuid' => $bulkUuid,
105 //topic name must be equal to data specified in the queue configuration files
106 'topic_name' => '%your_topic name%',
107 'serialized_data' => $this->jsonHelper->jsonEncode($serializedData),
108 'status' => OperationInterface::STATUS_TYPE_OPEN,
109 ]
110 ];
111
112 /** @var OperationInterface $operation */
113 $operation = $this->operationFactory->create($data);
114 $operations[] = $operation;
115
116 }
117 $userId = $this->userContext->getUserId();
118 $result = $this->bulkManagement->scheduleBulk($bulkUuid, $operations, $bulkDescription, $userId);
119 if (!$result) {
120 throw new LocalizedException(
121 __('Something went wrong while processing the request.')
122 );
123 }
124 }
125 }
126}

Create a consumer#

A consumer class receives messages from the message queue and changes the status after it is processed. The following example defines a consumer that handles price update bulk operations.

Copied to your clipboard
1<?php
2/**
3 * Copyright © Magento, Inc. All rights reserved.
4 * See COPYING.txt for license details.
5 */
6
7namespace Magento\SharedCatalog\Model\ResourceModel\ProductItem\Price;
8
9use Magento\AsynchronousOperations\Api\Data\OperationInterface;
10use Magento\AsynchronousOperations\Api\Data\OperationInterfaceFactory;
11use Magento\Framework\Bulk\BulkManagementInterface;
12use Magento\Framework\Bulk\OperationManagementInterface;
13use Magento\Framework\DB\Adapter\ConnectionException;
14use Magento\Framework\DB\Adapter\DeadlockException;
15use Magento\Framework\DB\Adapter\LockWaitException;
16use Magento\Framework\Exception\LocalizedException;
17use Magento\Framework\Exception\NoSuchEntityException;
18use Magento\Framework\Exception\TemporaryStateExceptionInterface;
19use Magento\Framework\Json\Helper\Data as JsonHelper;
20use Psr\Log\LoggerInterface;
21
22/**
23 * Class Consumer
24 */
25class Consumer
26{
27 /**
28 * @var LoggerInterface
29 */
30 private $logger;
31
32 /**
33 * @var JsonHelper
34 */
35 private $jsonHelper;
36
37 /**
38 * @var OperationManagementInterface
39 */
40 private $operationManagement;
41
42 /**
43 * Consumer constructor.
44 *
45 * @param LoggerInterface $logger
46 * @param JsonHelper $jsonHelper
47 */
48 public function __construct(
49 LoggerInterface $logger,
50 JsonHelper $jsonHelper,
51 OperationManagementInterface $operationManagement
52 ) {
53 $this->logger = $logger;
54 $this->jsonHelper = $jsonHelper;
55 $this->operationManagement = $operationManagement;
56 }
57
58 /**
59 * Processing operation for update price
60 *
61 * @param OperationInterface $operation
62 * @return void
63 */
64 public function processOperation(OperationInterface $operation)
65 {
66 $status = OperationInterface::STATUS_TYPE_COMPLETE;
67 $errorCode = null;
68 $message = null;
69 $serializedData = $operation->getSerializedData();
70 $unserializedData = $this->jsonHelper->jsonDecode($serializedData);
71 try {
72 //add here your own logic for async operations
73 } catch (\Zend_Db_Adapter_Exception $e) {
74 //here sample how to process exceptions if they occurred
75 $this->logger->critical($e->getMessage());
76 //you can add here your own type of exception when operation can be retried
77 if (
78 $e instanceof LockWaitException
79 || $e instanceof DeadlockException
80 || $e instanceof ConnectionException
81 ) {
82 $status = OperationInterface::STATUS_TYPE_RETRIABLY_FAILED;
83 $errorCode = $e->getCode();
84 $message = __($e->getMessage());
85 } else {
86 $status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
87 $errorCode = $e->getCode();
88 $message = __('Sorry, something went wrong during product prices update. Please see log for details.');
89 }
90
91 } catch (NoSuchEntityException $e) {
92 $this->logger->critical($e->getMessage());
93 $status = ($e instanceof TemporaryStateExceptionInterface) ? OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED : OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
94 $errorCode = $e->getCode();
95
96 $message = $e->getMessage();
97 unset($unserializedData['entity_link']);
98 $serializedData = $this->jsonHelper->jsonEncode($unserializedData);
99 } catch (LocalizedException $e) {
100 $this->logger->critical($e->getMessage());
101 $status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
102 $errorCode = $e->getCode();
103 $message = $e->getMessage();
104 } catch (\Exception $e) {
105 $this->logger->critical($e->getMessage());
106 $status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED;
107 $errorCode = $e->getCode();
108 $message = __('Sorry, something went wrong during product prices update. Please see log for details.');
109 }
110
111 //update operation status based on result performing operation(it was successfully executed or exception occurs
112 $this->operationManagement->changeOperationStatus(
113 $operation->getId(),
114 $status,
115 $errorCode,
116 $message,
117 $serializedData
118 );
119 }
120}

Configure message queues#

The message queue topology must be configured to implement bulk operations. Create or edit the following files in the module's app/code/<vendor>/<module_name>/etc directory.

  • communication.xml
  • di.xml
  • queue_consumer.xml
  • queue_publisher.xml
  • queue_topology.xml

For more information about the di.xml file, see Dependency Injection. For information the other files, see Configure message queues.

Create communication.xml#

The communication.xml file defines aspects of the message queue system that apply to all topics for the module. Create this file with the following contents:

Copied to your clipboard
1<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
2 <topic name="<your_topic_name>" request="Magento\AsynchronousOperations\Api\Data\OperationInterface">
3 <handler name="<your_handler_name>" type="<Consumer_Class>" method="<consumer_method>" />
4 </topic>
5</config>

Create di.xml#

Add the following type to the module's di.xml file.

Copied to your clipboard
1<type name="Magento\Framework\MessageQueue\MergerFactory">
2 <arguments>
3 <argument name="mergers" xsi:type="array">
4 <item name="<your_consumer_name>" xsi:type="string"><Merger_Class></item>
5 </argument>
6 </arguments>
7</type>

Create queue_consumer.xml#

The queue_consumer.xml file defines the relationship between a queue and its consumer. Create this file with the following contents:

Copied to your clipboard
1<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
2 <consumer name="<consumer_name>" queue="<queue_name>" connection="amqp" consumerInstance="Magento\Framework\MessageQueue\Consumer" handler="<Consumer_Class>::<Consumer_method>"/>
3</config>

Create queue_publisher.xml#

The queue_publisher.xml file defines the exchange where a topic is published. Create this file with the following contents:

Copied to your clipboard
1<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
2 <publisher topic="<topic_name>">
3 <connection name="amqp" exchange="<exchange>" />
4 </publisher>
5</config>

Create queue_topology.xml#

The queue_topology.xml file defines the message routing rules and declares queues and exchanges. Create this file with the following contents:

Copied to your clipboard
1<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
2 <exchange name="magento" type="topic" connection="amqp">
3 <binding id="defaultBinding" topic="" destinationType="queue" destination="<queue_name>"/>
4 </exchange>
5</config>
Was this helpful?
  • Privacy
  • Terms of Use
  • Do not sell my personal information
  • AdChoices
Copyright © 2022 Adobe. All rights reserved.