Conveyor   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 231
Duplicated Lines 0 %

Importance

Changes 7
Bugs 1 Features 1
Metric Value
wmc 14
eloc 67
c 7
b 1
f 1
dl 0
loc 231
rs 10

7 Methods

Rating   Name   Duplication   Size   Complexity  
A receive() 0 15 1
A ensureConnection() 0 10 3
A distribute() 0 36 3
A get() 0 28 2
A publish() 0 6 1
A __construct() 0 12 1
A send() 0 17 3
1
<?php
2
3
/*
4
 * This file is part of the Veslo project <https://github.com/symfony-doge/veslo>.
5
 *
6
 * (C) 2019 Pavel Petrov <[email protected]>.
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://opensource.org/licenses/GPL-3.0 GPL-3.0
12
 */
13
14
declare(strict_types=1);
15
16
namespace Veslo\AppBundle\Workflow\Vacancy\Research;
17
18
use Bunny\Channel;
19
use Bunny\Client;
20
use Bunny\Message;
21
use Exception;
22
use Psr\Log\LoggerInterface;
23
use Symfony\Component\Serializer\Serializer;
24
use Symfony\Component\Workflow\Workflow;
25
use Veslo\AppBundle\Exception\Workflow\Conveyor\ConnectionFailedException;
26
use Veslo\AppBundle\Exception\Workflow\Conveyor\DistributionFailedException;
27
use Veslo\AppBundle\Workflow\Vacancy\Research\Conveyor\Payload;
28
29
/**
30
 * Manages data exchange between workers using workflow
31
 *
32
 * @see https://symfony.com/doc/current/components/workflow.html
33
 */
34
class Conveyor
35
{
36
    /**
37
     * Logger as it is
38
     *
39
     * @var LoggerInterface
40
     */
41
    private $logger;
42
43
    /**
44
     * State machine, represent business process
45
     *
46
     * @var Workflow
47
     */
48
    private $workflow;
49
50
    /**
51
     * Converts data in the appropriate format
52
     *
53
     * @var Serializer
54
     */
55
    private $serializer;
56
57
    /**
58
     * Communicates with message broker
59
     *
60
     * @var Client
61
     */
62
    private $amqpClient;
63
64
    /**
65
     * Prefix for workflow-related queues
66
     *
67
     * @var string
68
     */
69
    private $queuePrefix;
70
71
    /**
72
     * Conveyor constructor.
73
     *
74
     * @param LoggerInterface $logger      Logger as it is
75
     * @param Workflow        $workflow    State machine, represent business process
76
     * @param Serializer      $serializer  Converts data in the appropriate format
77
     * @param Client          $amqpClient  Communicates with message broker
78
     * @param string          $queuePrefix Prefix for workflow-related queues
79
     */
80
    public function __construct(
81
        LoggerInterface $logger,
82
        Workflow $workflow,
83
        Serializer $serializer,
84
        Client $amqpClient,
85
        string $queuePrefix
86
    ) {
87
        $this->logger      = $logger;
88
        $this->workflow    = $workflow;
89
        $this->serializer  = $serializer;
90
        $this->amqpClient  = $amqpClient;
91
        $this->queuePrefix = $queuePrefix;
92
    }
93
94
    /**
95
     * Sends payload data to queues for processing according to configured workflow transitions
96
     *
97
     * @param object $dto Data to be passed through workflow
98
     *
99
     * @return void
100
     */
101
    public function send(object $dto): void
102
    {
103
        $payload     = new Payload($dto);
104
        $transitions = $this->workflow->getEnabledTransitions($payload);
105
106
        if (0 >= count($transitions)) {
107
            return;
108
        }
109
110
        $queueNames = [];
111
112
        foreach ($transitions as $transition) {
113
            $transitionName = $transition->getName();
114
            $queueNames[]   = $this->queuePrefix . $transitionName;
115
        }
116
117
        $this->distribute($payload, $queueNames);
118
    }
119
120
    /**
121
     * Returns data transfer object filled up with data from queues according to configured workflow transitions
122
     *
123
     * @param string $dtoClass Class of data transfer object
124
     *
125
     * @return object|null
126
     */
127
    public function receive(string $dtoClass): ?object
128
    {
129
        // TODO: if (!in_array($dtoName, $this->dtoNames, true))
130
131
        $dto = new $dtoClass;
132
133
        $transitions = $this->workflow->getEnabledTransitions(new Payload($dto));
134
135
        // Space for multiple queue logic here.
136
        $transition = array_shift($transitions);
137
138
        $transitionName = $transition->getName();
139
        $queueName      = $this->queuePrefix . $transitionName;
140
141
        return $this->get($dtoClass, $queueName);
142
    }
143
144
    /**
145
     * Distributes payload data among the queues for conveyor processing via workflow
146
     *
147
     * @param Payload $payload    Data to be passed through workflow
148
     * @param array   $queueNames Queue names for publishing
149
     *
150
     * @return void
151
     *
152
     * @throws DistributionFailedException
153
     */
154
    private function distribute(Payload $payload, array $queueNames): void
155
    {
156
        $this->ensureConnection();
157
158
        $channel = $this->amqpClient->channel();
159
        $channel->txSelect();
160
161
        foreach ($queueNames as $queueName) {
162
            try {
163
                $channel->queueDeclare($queueName);
164
                $this->publish($payload, $channel, $queueName);
165
            } catch (Exception $e) {
166
                $channel->txRollback();
167
168
                // closing an amqp channel only if some error has occurred, otherwise we are fine.
169
                $channel->close()->then(function () {
170
                    $this->amqpClient->disconnect();
171
                });
172
173
                $this->logger->critical(
174
                    'Payload publish failed.',
175
                    [
176
                        'message'   => $e->getMessage(),
177
                        'payload'   => $this->serializer->normalize($payload),
178
                        'queueName' => $queueName,
179
                    ]
180
                );
181
182
                throw DistributionFailedException::withQueueName($queueName);
183
            }
184
        }
185
186
        $channel->txCommit();
187
188
        $normalizedPayload = $this->serializer->normalize($payload);
189
        $this->logger->info('Payload distributed.', ['queueNames' => $queueNames, 'payload' => $normalizedPayload]);
190
    }
191
192
    /**
193
     * Publishes payload data to queue for conveyor processing via workflow
194
     *
195
     * @param Payload $payload   Data to be passed through workflow
196
     * @param Channel $channel   Channel for communication with message broker
197
     * @param string  $queueName Queue name for publishing
198
     *
199
     * @return void
200
     */
201
    private function publish(Payload $payload, Channel $channel, string $queueName): void
202
    {
203
        $data    = $payload->getData();
204
        $message = $this->serializer->serialize($data, 'json');
205
206
        $channel->publish($message, ['content_type' => 'application/json'], '', $queueName);
207
    }
208
209
    /**
210
     * Returns a dto by specified classname filled up with payload data from queue
211
     *
212
     * @param string $dtoClass  Class of data transfer object
213
     * @param string $queueName Queue name for payload data fetching
214
     *
215
     * @return object|null Dto or null if target queue is empty
216
     */
217
    // TODO: extract into gateway
218
    private function get(string $dtoClass, string $queueName): ?object
219
    {
220
        $this->ensureConnection();
221
222
        $channel = $this->amqpClient->channel();
223
        $channel->queueDeclare($queueName);
224
225
        // Immediate Ack here; we CAN afford a message loss as the trade-off
226
        // between "a single vacancy loss" and CPU & RAM cost
227
        // while trying to process unnecessary, duplicate data in whole conveyor.
228
        $message = $channel->get($queueName, true);
229
        // Также хорошая статья по теме:
230
        // https://habr.com/ru/company/yandex/blog/442762/ (Идемпотентность при внешних операциях)
231
232
        if (!$message instanceof Message) {
233
            return null;
234
        }
235
236
        $this->logger->debug(
237
            'Payload received.',
238
            [
239
                'dtoClass'       => $dtoClass,
240
                'queueName'      => $queueName,
241
                'messageContent' => $message->content,
242
            ]
243
        );
244
245
        return $this->serializer->deserialize($message->content, $dtoClass, 'json');
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->serializer...ent, $dtoClass, 'json') could return the type array which is incompatible with the type-hinted return null|object. Consider adding an additional type-check to rule them out.
Loading history...
246
    }
247
248
    /**
249
     * Ensures connection with message broker is established
250
     *
251
     * @return void
252
     *
253
     * @throws ConnectionFailedException
254
     */
255
    private function ensureConnection(): void
256
    {
257
        if ($this->amqpClient->isConnected()) {
258
            return;
259
        }
260
261
        try {
262
            $this->amqpClient->connect();
263
        } catch (Exception $e) {
264
            throw ConnectionFailedException::withPrevious($e);
265
        }
266
    }
267
}
268