These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace Oro\Bundle\EmailBundle\Sync; |
||
4 | |||
5 | use Doctrine\Common\Persistence\ManagerRegistry; |
||
6 | use Doctrine\ORM\EntityManager; |
||
7 | use Doctrine\ORM\Query; |
||
8 | use Doctrine\ORM\Query\Expr; |
||
9 | |||
10 | use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorage; |
||
11 | use Symfony\Component\Security\Core\Authentication\Token\TokenInterface; |
||
12 | |||
13 | use Psr\Log\LoggerAwareInterface; |
||
14 | use Psr\Log\LoggerAwareTrait; |
||
15 | use Psr\Log\NullLogger; |
||
16 | |||
17 | use Oro\Bundle\EmailBundle\Entity\EmailOrigin; |
||
18 | use Oro\Bundle\EmailBundle\Exception\SyncFolderTimeoutException; |
||
19 | use Oro\Bundle\OrganizationBundle\Entity\Organization; |
||
20 | use Oro\Bundle\SecurityBundle\Authentication\Token\OrganizationToken; |
||
21 | |||
22 | /** |
||
23 | * @SuppressWarnings(PHPMD.ExcessiveClassComplexity) |
||
24 | */ |
||
25 | abstract class AbstractEmailSynchronizer implements LoggerAwareInterface |
||
26 | { |
||
27 | use LoggerAwareTrait; |
||
28 | |||
29 | const SYNC_CODE_IN_PROCESS = 1; |
||
30 | const SYNC_CODE_FAILURE = 2; |
||
31 | const SYNC_CODE_SUCCESS = 3; |
||
32 | |||
33 | /** @var ManagerRegistry */ |
||
34 | protected $doctrine; |
||
35 | |||
36 | /** @var KnownEmailAddressCheckerFactory */ |
||
37 | protected $knownEmailAddressCheckerFactory; |
||
38 | |||
39 | /** @var TokenStorage */ |
||
40 | protected $tokenStorage; |
||
41 | |||
42 | /** @var KnownEmailAddressCheckerInterface */ |
||
43 | private $knownEmailAddressChecker; |
||
44 | |||
45 | /** @var TokenInterface */ |
||
46 | private $currentToken; |
||
47 | |||
48 | /** |
||
49 | * Constructor |
||
50 | * |
||
51 | * @param ManagerRegistry $doctrine |
||
52 | * @param KnownEmailAddressCheckerFactory $knownEmailAddressCheckerFactory |
||
53 | */ |
||
54 | protected function __construct( |
||
55 | ManagerRegistry $doctrine, |
||
56 | KnownEmailAddressCheckerFactory $knownEmailAddressCheckerFactory |
||
57 | ) { |
||
58 | $this->doctrine = $doctrine; |
||
59 | $this->knownEmailAddressCheckerFactory = $knownEmailAddressCheckerFactory; |
||
60 | } |
||
61 | |||
62 | /** |
||
63 | * @param TokenStorage $tokenStorage |
||
64 | */ |
||
65 | public function setTokenStorage(TokenStorage $tokenStorage) |
||
66 | { |
||
67 | $this->tokenStorage = $tokenStorage; |
||
68 | $this->currentToken = $tokenStorage->getToken(); |
||
69 | } |
||
70 | |||
71 | /** |
||
72 | * Returns TRUE if this class supports synchronization of the given origin. |
||
73 | * |
||
74 | * @param EmailOrigin $origin |
||
75 | * @return bool |
||
76 | */ |
||
77 | abstract public function supports(EmailOrigin $origin); |
||
78 | |||
79 | /** |
||
80 | * Performs a synchronization of emails for one email origin. |
||
81 | * Algorithm how an email origin is selected see in findOriginToSync method. |
||
82 | * |
||
83 | * @param int $maxConcurrentTasks The maximum number of synchronization jobs running in the same time |
||
84 | * @param int $minExecIntervalInMin The minimum time interval (in minutes) between two synchronizations |
||
85 | * of the same email origin |
||
86 | * @param int $maxExecTimeInMin The maximum execution time (in minutes) |
||
87 | * Set -1 to unlimited |
||
88 | * Defaults to -1 |
||
89 | * @param int $maxTasks The maximum number of email origins which can be synchronized |
||
90 | * Set -1 to unlimited |
||
91 | * Defaults to 1 |
||
92 | * @return int |
||
93 | * |
||
94 | * @throws \Exception |
||
95 | * |
||
96 | * @SuppressWarnings(PHPMD.NPathComplexity) |
||
97 | */ |
||
98 | public function sync($maxConcurrentTasks, $minExecIntervalInMin, $maxExecTimeInMin = -1, $maxTasks = 1) |
||
99 | { |
||
100 | if ($this->logger === null) { |
||
101 | $this->logger = new NullLogger(); |
||
102 | } |
||
103 | |||
104 | if (!$this->checkConfiguration()) { |
||
105 | $this->logger->info('Exit because synchronization was not configured or disabled.'); |
||
106 | return 0; |
||
107 | } |
||
108 | |||
109 | $startTime = $this->getCurrentUtcDateTime(); |
||
110 | $this->resetHangedOrigins(); |
||
111 | |||
112 | $maxExecTimeout = $maxExecTimeInMin > 0 |
||
113 | ? new \DateInterval('PT' . $maxExecTimeInMin . 'M') |
||
114 | : false; |
||
115 | $processedOrigins = []; |
||
116 | $failedOriginIds = []; |
||
117 | while (true) { |
||
118 | $origin = $this->findOriginToSync($maxConcurrentTasks, $minExecIntervalInMin); |
||
119 | if ($origin === null) { |
||
120 | $this->logger->info('Exit because nothing to synchronise.'); |
||
121 | break; |
||
122 | } |
||
123 | |||
124 | if (isset($processedOrigins[$origin->getId()])) { |
||
125 | $this->logger->info('Exit because all origins have been synchronised.'); |
||
126 | break; |
||
127 | } |
||
128 | |||
129 | if ($maxExecTimeout !== false) { |
||
130 | $date = $this->getCurrentUtcDateTime(); |
||
131 | if ($date->sub($maxExecTimeout) >= $startTime) { |
||
132 | $this->logger->info('Exit because allocated time frame elapsed.'); |
||
133 | break; |
||
134 | } |
||
135 | } |
||
136 | |||
137 | $processedOrigins[$origin->getId()] = true; |
||
138 | try { |
||
139 | $this->doSyncOrigin($origin); |
||
140 | } catch (SyncFolderTimeoutException $ex) { |
||
141 | break; |
||
142 | } catch (\Exception $ex) { |
||
143 | $failedOriginIds[] = $origin->getId(); |
||
144 | } |
||
145 | |||
146 | if ($maxTasks > 0 && count($processedOrigins) >= $maxTasks) { |
||
147 | $this->logger->info('Exit because the limit of tasks are reached.'); |
||
148 | break; |
||
149 | } |
||
150 | } |
||
151 | |||
152 | $this->assertSyncSuccess($failedOriginIds); |
||
153 | |||
154 | return 0; |
||
155 | } |
||
156 | |||
157 | /** |
||
158 | * Performs a synchronization of emails for the given email origins. |
||
159 | * |
||
160 | * @param int[] $originIds |
||
161 | * @throws \Exception |
||
162 | */ |
||
163 | public function syncOrigins(array $originIds) |
||
164 | { |
||
165 | if ($this->logger === null) { |
||
166 | $this->logger = new NullLogger(); |
||
167 | } |
||
168 | |||
169 | if (!$this->checkConfiguration()) { |
||
170 | $this->logger->info('Exit because synchronization was not configured.'); |
||
171 | } |
||
172 | |||
173 | $failedOriginIds = []; |
||
174 | foreach ($originIds as $originId) { |
||
175 | $origin = $this->findOrigin($originId); |
||
176 | if ($origin !== null) { |
||
177 | try { |
||
178 | $this->doSyncOrigin($origin); |
||
179 | } catch (SyncFolderTimeoutException $ex) { |
||
180 | break; |
||
181 | } catch (\Exception $ex) { |
||
182 | $failedOriginIds[] = $origin->getId(); |
||
183 | } |
||
184 | } |
||
185 | } |
||
186 | |||
187 | $this->assertSyncSuccess($failedOriginIds); |
||
188 | } |
||
189 | |||
190 | /** |
||
191 | * Checks configuration |
||
192 | * This method can be used for preliminary check if the synchronization can be launched |
||
193 | * |
||
194 | * @return bool |
||
195 | */ |
||
196 | protected function checkConfiguration() |
||
197 | { |
||
198 | return true; |
||
199 | } |
||
200 | |||
201 | /** |
||
202 | * Performs a synchronization of emails for the given email origin. |
||
203 | * |
||
204 | * @param EmailOrigin $origin |
||
205 | * @throws \Exception |
||
206 | */ |
||
207 | protected function doSyncOrigin(EmailOrigin $origin) |
||
208 | { |
||
209 | $this->impersonateOrganization($origin->getOrganization()); |
||
210 | try { |
||
211 | $processor = $this->createSynchronizationProcessor($origin); |
||
212 | if ($processor instanceof LoggerAwareInterface) { |
||
213 | $processor->setLogger($this->logger); |
||
214 | } |
||
215 | } catch (\Exception $ex) { |
||
216 | $this->logger->error(sprintf('Skip origin synchronization. Error: %s', $ex->getMessage())); |
||
217 | |||
218 | throw $ex; |
||
219 | } |
||
220 | |||
221 | try { |
||
222 | if ($this->changeOriginSyncState($origin, self::SYNC_CODE_IN_PROCESS)) { |
||
223 | $syncStartTime = $this->getCurrentUtcDateTime(); |
||
224 | $processor->process($origin, $syncStartTime); |
||
225 | $this->changeOriginSyncState($origin, self::SYNC_CODE_SUCCESS, $syncStartTime); |
||
226 | } else { |
||
227 | $this->logger->info('Skip because it is already in process.'); |
||
228 | } |
||
229 | } catch (SyncFolderTimeoutException $ex) { |
||
230 | $this->logger->info($ex->getMessage()); |
||
231 | $this->changeOriginSyncState($origin, self::SYNC_CODE_SUCCESS); |
||
232 | |||
233 | throw $ex; |
||
234 | } catch (\Exception $ex) { |
||
235 | try { |
||
236 | $this->changeOriginSyncState($origin, self::SYNC_CODE_FAILURE); |
||
237 | } catch (\Exception $innerEx) { |
||
238 | // ignore any exception here |
||
239 | $this->logger->error( |
||
240 | sprintf('Cannot set the fail state. Error: %s', $innerEx->getMessage()), |
||
241 | ['exception' => $innerEx] |
||
242 | ); |
||
243 | } |
||
244 | |||
245 | $this->logger->error( |
||
246 | sprintf('The synchronization failed. Error: %s', $ex->getMessage()), |
||
247 | ['exception' => $ex] |
||
248 | ); |
||
249 | |||
250 | throw $ex; |
||
251 | } |
||
252 | } |
||
253 | |||
254 | /** |
||
255 | * Switches the security context to the given organization |
||
256 | * @todo: Should be deleted after email sync process will be refactored |
||
257 | */ |
||
258 | protected function impersonateOrganization(Organization $organization = null) |
||
259 | { |
||
260 | if ($this->currentToken === null && $organization) { |
||
261 | $this->tokenStorage->setToken( |
||
262 | new OrganizationToken($organization) |
||
263 | ); |
||
264 | } |
||
265 | } |
||
266 | |||
267 | /** |
||
268 | * Returns default entity manager |
||
269 | * |
||
270 | * @return EntityManager |
||
271 | */ |
||
272 | View Code Duplication | protected function getEntityManager() |
|
273 | { |
||
274 | /** @var EntityManager $em */ |
||
275 | $em = $this->doctrine->getManager(); |
||
276 | if (!$em->isOpen()) { |
||
277 | $this->doctrine->resetManager(); |
||
278 | $em = $this->doctrine->getManager(); |
||
279 | } |
||
280 | |||
281 | return $em; |
||
282 | } |
||
283 | |||
284 | /** |
||
285 | * Makes sure $this->knownEmailAddressChecker initialized |
||
286 | */ |
||
287 | protected function getKnownEmailAddressChecker() |
||
288 | { |
||
289 | if (!$this->knownEmailAddressChecker) { |
||
290 | $this->knownEmailAddressChecker = $this->knownEmailAddressCheckerFactory->create(); |
||
291 | if ($this->knownEmailAddressChecker instanceof LoggerAwareInterface) { |
||
292 | $this->knownEmailAddressChecker->setLogger($this->logger); |
||
293 | } |
||
294 | } |
||
295 | |||
296 | return $this->knownEmailAddressChecker; |
||
297 | } |
||
298 | |||
299 | /** |
||
300 | * Gets entity name implementing EmailOrigin |
||
301 | * |
||
302 | * @return string |
||
303 | */ |
||
304 | abstract protected function getEmailOriginClass(); |
||
305 | |||
306 | /** |
||
307 | * Creates a processor is used to synchronize emails |
||
308 | * |
||
309 | * @param object $origin An instance of class implementing EmailOrigin entity |
||
310 | * @return AbstractEmailSynchronizationProcessor |
||
311 | */ |
||
312 | abstract protected function createSynchronizationProcessor($origin); |
||
313 | |||
314 | /** |
||
315 | * Updates a state of the given email origin |
||
316 | * |
||
317 | * @param EmailOrigin $origin |
||
318 | * @param int $syncCode Can be one of self::SYNC_CODE_* constants |
||
319 | * @param \DateTime|null $synchronizedAt |
||
320 | * @return bool true if the synchronization code was updated; false if no any changes are needed |
||
321 | */ |
||
322 | protected function changeOriginSyncState(EmailOrigin $origin, $syncCode, $synchronizedAt = null) |
||
323 | { |
||
324 | $repo = $this->getEntityManager()->getRepository($this->getEmailOriginClass()); |
||
325 | $qb = $repo->createQueryBuilder('o') |
||
326 | ->update() |
||
327 | ->set('o.syncCode', ':code') |
||
328 | ->set('o.syncCodeUpdatedAt', ':updated') |
||
329 | ->where('o.id = :id') |
||
330 | ->setParameter('code', $syncCode) |
||
331 | ->setParameter('updated', $this->getCurrentUtcDateTime()) |
||
332 | ->setParameter('id', $origin->getId()); |
||
333 | |||
334 | if ($synchronizedAt !== null) { |
||
335 | $qb |
||
336 | ->set('o.synchronizedAt', ':synchronized') |
||
337 | ->setParameter('synchronized', $synchronizedAt); |
||
338 | } |
||
339 | |||
340 | if ($syncCode === self::SYNC_CODE_IN_PROCESS) { |
||
341 | $qb->andWhere('(o.syncCode IS NULL OR o.syncCode <> :code)'); |
||
342 | } |
||
343 | |||
344 | if ($syncCode === self::SYNC_CODE_SUCCESS) { |
||
345 | $qb->set('o.syncCount', 'o.syncCount + 1'); |
||
346 | } |
||
347 | |||
348 | $affectedRows = $qb->getQuery()->execute(); |
||
349 | |||
350 | return $affectedRows > 0; |
||
351 | } |
||
352 | |||
353 | /** |
||
354 | * Finds an email origin to be synchronised |
||
355 | * |
||
356 | * @param int $maxConcurrentTasks The maximum number of synchronization jobs running in the same time |
||
357 | * @param int $minExecIntervalInMin The minimum time interval (in minutes) between two synchronizations |
||
358 | * of the same email origin |
||
359 | * @return EmailOrigin |
||
360 | */ |
||
361 | protected function findOriginToSync($maxConcurrentTasks, $minExecIntervalInMin) |
||
362 | { |
||
363 | $this->logger->info('Finding an email origin ...'); |
||
364 | |||
365 | $now = $this->getCurrentUtcDateTime(); |
||
366 | $border = clone $now; |
||
367 | if ($minExecIntervalInMin > 0) { |
||
368 | $border->sub(new \DateInterval('PT' . $minExecIntervalInMin . 'M')); |
||
369 | } |
||
370 | $min = clone $now; |
||
371 | $min->sub(new \DateInterval('P1Y')); |
||
372 | |||
373 | // rules: |
||
374 | // - items with earlier sync code modification dates have higher priority |
||
375 | // - previously failed items are shifted at 30 minutes back (it means that if sync failed |
||
376 | // the next sync is performed only after 30 minutes) |
||
377 | // - "In Process" items are moved at the end |
||
378 | $repo = $this->getEntityManager()->getRepository($this->getEmailOriginClass()); |
||
379 | $query = $repo->createQueryBuilder('o') |
||
380 | ->select( |
||
381 | 'o' |
||
382 | . ', CASE WHEN o.syncCode = :inProcess THEN 0 ELSE 1 END AS HIDDEN p1' |
||
383 | . ', (COALESCE(o.syncCode, 1000) * 30' |
||
384 | . ' + TIMESTAMPDIFF(MINUTE, COALESCE(o.syncCodeUpdatedAt, :min), :now)' |
||
385 | . ' / (CASE o.syncCode WHEN :success THEN 100 ELSE 1 END)) AS HIDDEN p2' |
||
386 | ) |
||
387 | ->where('o.isActive = :isActive AND (o.syncCodeUpdatedAt IS NULL OR o.syncCodeUpdatedAt <= :border)') |
||
388 | ->orderBy('p1, p2 DESC, o.syncCodeUpdatedAt') |
||
389 | ->setParameter('inProcess', self::SYNC_CODE_IN_PROCESS) |
||
390 | ->setParameter('success', self::SYNC_CODE_SUCCESS) |
||
391 | ->setParameter('isActive', true) |
||
392 | ->setParameter('now', $now) |
||
393 | ->setParameter('min', $min) |
||
394 | ->setParameter('border', $border) |
||
395 | ->setMaxResults($maxConcurrentTasks + 1) |
||
396 | ->getQuery(); |
||
397 | |||
398 | /** @var EmailOrigin[] $origins */ |
||
399 | $origins = $query->getResult(); |
||
400 | $result = null; |
||
401 | foreach ($origins as $origin) { |
||
402 | if ($origin->getSyncCode() !== self::SYNC_CODE_IN_PROCESS) { |
||
403 | $result = $origin; |
||
404 | break; |
||
405 | } |
||
406 | } |
||
407 | |||
408 | if ($result === null) { |
||
409 | if (!empty($origins)) { |
||
410 | $this->logger->info('The maximum number of concurrent tasks is reached.'); |
||
411 | } |
||
412 | $this->logger->info('An email origin was not found.'); |
||
413 | } else { |
||
414 | $this->logger->info(sprintf('Found "%s" email origin. Id: %d.', (string)$result, $result->getId())); |
||
415 | } |
||
416 | |||
417 | return $result; |
||
418 | } |
||
419 | |||
420 | /** |
||
421 | * Finds active email origin by its id |
||
422 | * |
||
423 | * @param int $originId |
||
424 | * @return EmailOrigin|null |
||
425 | */ |
||
426 | protected function findOrigin($originId) |
||
427 | { |
||
428 | $this->logger->info(sprintf('Finding an email origin (id: %d) ...', $originId)); |
||
429 | |||
430 | $repo = $this->getEntityManager()->getRepository($this->getEmailOriginClass()); |
||
431 | $query = $repo->createQueryBuilder('o') |
||
432 | ->where('o.isActive = :isActive AND o.id = :id') |
||
433 | ->setParameter('isActive', true) |
||
434 | ->setParameter('id', $originId) |
||
435 | ->setMaxResults(1) |
||
436 | ->getQuery(); |
||
437 | $origins = $query->getResult(); |
||
438 | |||
439 | /** @var EmailOrigin $result */ |
||
440 | $result = !empty($origins) ? $origins[0] : null; |
||
441 | |||
442 | if ($result === null) { |
||
443 | $this->logger->info('An email origin was not found.'); |
||
444 | } else { |
||
445 | $this->logger->info(sprintf('Found "%s" email origin. Id: %d.', (string)$result, $result->getId())); |
||
446 | } |
||
447 | |||
448 | return $result; |
||
449 | } |
||
450 | |||
451 | /** |
||
452 | * Marks outdated "In Process" origins as "Failure" if exist |
||
453 | */ |
||
454 | protected function resetHangedOrigins() |
||
455 | { |
||
456 | $this->logger->info('Resetting hanged email origins ...'); |
||
457 | |||
458 | $now = $this->getCurrentUtcDateTime(); |
||
459 | $border = clone $now; |
||
460 | $border->sub(new \DateInterval('P1D')); |
||
461 | |||
462 | $repo = $this->getEntityManager()->getRepository($this->getEmailOriginClass()); |
||
463 | $query = $repo->createQueryBuilder('o') |
||
464 | ->update() |
||
465 | ->set('o.syncCode', ':failure') |
||
466 | ->where('o.syncCode = :inProcess AND o.syncCodeUpdatedAt <= :border') |
||
467 | ->setParameter('inProcess', self::SYNC_CODE_IN_PROCESS) |
||
468 | ->setParameter('failure', self::SYNC_CODE_FAILURE) |
||
469 | ->setParameter('border', $border) |
||
470 | ->getQuery(); |
||
471 | |||
472 | $affectedRows = $query->execute(); |
||
473 | $this->logger->info(sprintf('Updated %d row(s).', $affectedRows)); |
||
474 | } |
||
475 | |||
476 | /** |
||
477 | * Gets a DateTime object that is set to the current date and time in UTC. |
||
478 | * |
||
479 | * @return \DateTime |
||
480 | */ |
||
481 | protected function getCurrentUtcDateTime() |
||
482 | { |
||
483 | return new \DateTime('now', new \DateTimeZone('UTC')); |
||
484 | } |
||
485 | |||
486 | /** |
||
487 | * @param $failedOriginIds |
||
488 | * @throws \Exception |
||
489 | */ |
||
490 | private function assertSyncSuccess(array $failedOriginIds) |
||
491 | { |
||
492 | if ($failedOriginIds) { |
||
0 ignored issues
–
show
|
|||
493 | throw new \Exception( |
||
494 | sprintf( |
||
495 | 'The email synchronization failed for the following origins: %s.', |
||
496 | implode(', ', $failedOriginIds) |
||
497 | ) |
||
498 | ); |
||
499 | } |
||
500 | } |
||
501 | } |
||
502 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.