Completed
Push — master ( e32eeb...714cda )
by
unknown
89:50 queued 46:29
created

EmailBundle/Sync/AbstractEmailSynchronizer.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace Oro\Bundle\EmailBundle\Sync;
4
5
use Doctrine\Common\Persistence\ManagerRegistry;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\Query;
8
use Doctrine\ORM\Query\Expr;
9
10
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorage;
11
use Symfony\Component\Security\Core\Authentication\Token\TokenInterface;
12
13
use Psr\Log\LoggerAwareInterface;
14
use Psr\Log\LoggerAwareTrait;
15
use Psr\Log\NullLogger;
16
17
use Oro\Bundle\EmailBundle\Entity\EmailOrigin;
18
use Oro\Bundle\EmailBundle\Exception\SyncFolderTimeoutException;
19
use Oro\Bundle\OrganizationBundle\Entity\Organization;
20
use Oro\Bundle\SecurityBundle\Authentication\Token\OrganizationToken;
21
22
/**
23
 * @SuppressWarnings(PHPMD.ExcessiveClassComplexity)
24
 */
25
abstract class AbstractEmailSynchronizer implements LoggerAwareInterface
26
{
27
    use LoggerAwareTrait;
28
29
    const SYNC_CODE_IN_PROCESS = 1;
30
    const SYNC_CODE_FAILURE    = 2;
31
    const SYNC_CODE_SUCCESS    = 3;
32
33
    /** @var ManagerRegistry */
34
    protected $doctrine;
35
36
    /** @var KnownEmailAddressCheckerFactory */
37
    protected $knownEmailAddressCheckerFactory;
38
39
    /** @var TokenStorage */
40
    protected $tokenStorage;
41
42
    /** @var KnownEmailAddressCheckerInterface */
43
    private $knownEmailAddressChecker;
44
45
    /** @var TokenInterface */
46
    private $currentToken;
47
48
    /**
49
     * Constructor
50
     *
51
     * @param ManagerRegistry                 $doctrine
52
     * @param KnownEmailAddressCheckerFactory $knownEmailAddressCheckerFactory
53
     */
54
    protected function __construct(
55
        ManagerRegistry $doctrine,
56
        KnownEmailAddressCheckerFactory $knownEmailAddressCheckerFactory
57
    ) {
58
        $this->doctrine                        = $doctrine;
59
        $this->knownEmailAddressCheckerFactory = $knownEmailAddressCheckerFactory;
60
    }
61
62
    /**
63
     * @param TokenStorage $tokenStorage
64
     */
65
    public function setTokenStorage(TokenStorage $tokenStorage)
66
    {
67
        $this->tokenStorage = $tokenStorage;
68
        $this->currentToken = $tokenStorage->getToken();
69
    }
70
71
    /**
72
     * Returns TRUE if this class supports synchronization of the given origin.
73
     *
74
     * @param EmailOrigin $origin
75
     * @return bool
76
     */
77
    abstract public function supports(EmailOrigin $origin);
78
79
    /**
80
     * Performs a synchronization of emails for one email origin.
81
     * Algorithm how an email origin is selected see in findOriginToSync method.
82
     *
83
     * @param int $maxConcurrentTasks   The maximum number of synchronization jobs running in the same time
84
     * @param int $minExecIntervalInMin The minimum time interval (in minutes) between two synchronizations
85
     *                                  of the same email origin
86
     * @param int $maxExecTimeInMin     The maximum execution time (in minutes)
87
     *                                  Set -1 to unlimited
88
     *                                  Defaults to -1
89
     * @param int $maxTasks             The maximum number of email origins which can be synchronized
90
     *                                  Set -1 to unlimited
91
     *                                  Defaults to 1
92
     * @return int
93
     *
94
     * @throws \Exception
95
     *
96
     * @SuppressWarnings(PHPMD.NPathComplexity)
97
     */
98
    public function sync($maxConcurrentTasks, $minExecIntervalInMin, $maxExecTimeInMin = -1, $maxTasks = 1)
99
    {
100
        if ($this->logger === null) {
101
            $this->logger = new NullLogger();
102
        }
103
104
        if (!$this->checkConfiguration()) {
105
            $this->logger->info('Exit because synchronization was not configured or disabled.');
106
            return 0;
107
        }
108
109
        $startTime = $this->getCurrentUtcDateTime();
110
        $this->resetHangedOrigins();
111
112
        $maxExecTimeout = $maxExecTimeInMin > 0
113
            ? new \DateInterval('PT' . $maxExecTimeInMin . 'M')
114
            : false;
115
        $processedOrigins = [];
116
        $failedOriginIds = [];
117
        while (true) {
118
            $origin = $this->findOriginToSync($maxConcurrentTasks, $minExecIntervalInMin);
119
            if ($origin === null) {
120
                $this->logger->info('Exit because nothing to synchronise.');
121
                break;
122
            }
123
124
            if (isset($processedOrigins[$origin->getId()])) {
125
                $this->logger->info('Exit because all origins have been synchronised.');
126
                break;
127
            }
128
129
            if ($maxExecTimeout !== false) {
130
                $date = $this->getCurrentUtcDateTime();
131
                if ($date->sub($maxExecTimeout) >= $startTime) {
132
                    $this->logger->info('Exit because allocated time frame elapsed.');
133
                    break;
134
                }
135
            }
136
137
            $processedOrigins[$origin->getId()] = true;
138
            try {
139
                $this->doSyncOrigin($origin);
140
            } catch (SyncFolderTimeoutException $ex) {
141
                break;
142
            } catch (\Exception $ex) {
143
                $failedOriginIds[] = $origin->getId();
144
            }
145
146
            if ($maxTasks > 0 && count($processedOrigins) >= $maxTasks) {
147
                $this->logger->info('Exit because the limit of tasks are reached.');
148
                break;
149
            }
150
        }
151
152
        $this->assertSyncSuccess($failedOriginIds);
153
154
        return 0;
155
    }
156
157
    /**
158
     * Performs a synchronization of emails for the given email origins.
159
     *
160
     * @param int[] $originIds
161
     * @throws \Exception
162
     */
163
    public function syncOrigins(array $originIds)
164
    {
165
        if ($this->logger === null) {
166
            $this->logger = new NullLogger();
167
        }
168
169
        if (!$this->checkConfiguration()) {
170
            $this->logger->info('Exit because synchronization was not configured.');
171
        }
172
173
        $failedOriginIds = [];
174
        foreach ($originIds as $originId) {
175
            $origin = $this->findOrigin($originId);
176
            if ($origin !== null) {
177
                try {
178
                    $this->doSyncOrigin($origin);
179
                } catch (SyncFolderTimeoutException $ex) {
180
                    break;
181
                } catch (\Exception $ex) {
182
                    $failedOriginIds[] = $origin->getId();
183
                }
184
            }
185
        }
186
187
        $this->assertSyncSuccess($failedOriginIds);
188
    }
189
190
    /**
191
     * Checks configuration
192
     * This method can be used for preliminary check if the synchronization can be launched
193
     *
194
     * @return bool
195
     */
196
    protected function checkConfiguration()
197
    {
198
        return true;
199
    }
200
201
    /**
202
     * Performs a synchronization of emails for the given email origin.
203
     *
204
     * @param EmailOrigin $origin
205
     * @throws \Exception
206
     */
207
    protected function doSyncOrigin(EmailOrigin $origin)
208
    {
209
        $this->impersonateOrganization($origin->getOrganization());
210
        try {
211
            $processor = $this->createSynchronizationProcessor($origin);
212
            if ($processor instanceof LoggerAwareInterface) {
213
                $processor->setLogger($this->logger);
214
            }
215
        } catch (\Exception $ex) {
216
            $this->logger->error(sprintf('Skip origin synchronization. Error: %s', $ex->getMessage()));
217
218
            throw $ex;
219
        }
220
221
        try {
222
            if ($this->changeOriginSyncState($origin, self::SYNC_CODE_IN_PROCESS)) {
223
                $syncStartTime = $this->getCurrentUtcDateTime();
224
                $processor->process($origin, $syncStartTime);
225
                $this->changeOriginSyncState($origin, self::SYNC_CODE_SUCCESS, $syncStartTime);
226
            } else {
227
                $this->logger->info('Skip because it is already in process.');
228
            }
229
        } catch (SyncFolderTimeoutException $ex) {
230
            $this->logger->info($ex->getMessage());
231
            $this->changeOriginSyncState($origin, self::SYNC_CODE_SUCCESS);
232
233
            throw $ex;
234
        } catch (\Exception $ex) {
235
            try {
236
                $this->changeOriginSyncState($origin, self::SYNC_CODE_FAILURE);
237
            } catch (\Exception $innerEx) {
238
                // ignore any exception here
239
                $this->logger->error(
240
                    sprintf('Cannot set the fail state. Error: %s', $innerEx->getMessage()),
241
                    ['exception' => $innerEx]
242
                );
243
            }
244
245
            $this->logger->error(
246
                sprintf('The synchronization failed. Error: %s', $ex->getMessage()),
247
                ['exception' => $ex]
248
            );
249
250
            throw $ex;
251
        }
252
    }
253
254
    /**
255
     * Switches the security context to the given organization
256
     * @todo: Should be deleted after email sync process will be refactored
257
     */
258
    protected function impersonateOrganization(Organization $organization = null)
259
    {
260
        if ($this->currentToken === null && $organization) {
261
            $this->tokenStorage->setToken(
262
                new OrganizationToken($organization)
263
            );
264
        }
265
    }
266
267
    /**
268
     * Returns default entity manager
269
     *
270
     * @return EntityManager
271
     */
272 View Code Duplication
    protected function getEntityManager()
273
    {
274
        /** @var EntityManager $em */
275
        $em = $this->doctrine->getManager();
276
        if (!$em->isOpen()) {
277
            $this->doctrine->resetManager();
278
            $em = $this->doctrine->getManager();
279
        }
280
281
        return $em;
282
    }
283
284
    /**
285
     * Makes sure $this->knownEmailAddressChecker initialized
286
     */
287
    protected function getKnownEmailAddressChecker()
288
    {
289
        if (!$this->knownEmailAddressChecker) {
290
            $this->knownEmailAddressChecker = $this->knownEmailAddressCheckerFactory->create();
291
            if ($this->knownEmailAddressChecker instanceof LoggerAwareInterface) {
292
                $this->knownEmailAddressChecker->setLogger($this->logger);
293
            }
294
        }
295
296
        return $this->knownEmailAddressChecker;
297
    }
298
299
    /**
300
     * Gets entity name implementing EmailOrigin
301
     *
302
     * @return string
303
     */
304
    abstract protected function getEmailOriginClass();
305
306
    /**
307
     * Creates a processor is used to synchronize emails
308
     *
309
     * @param object $origin An instance of class implementing EmailOrigin entity
310
     * @return AbstractEmailSynchronizationProcessor
311
     */
312
    abstract protected function createSynchronizationProcessor($origin);
313
314
    /**
315
     * Updates a state of the given email origin
316
     *
317
     * @param EmailOrigin $origin
318
     * @param int $syncCode Can be one of self::SYNC_CODE_* constants
319
     * @param \DateTime|null $synchronizedAt
320
     * @return bool true if the synchronization code was updated; false if no any changes are needed
321
     */
322
    protected function changeOriginSyncState(EmailOrigin $origin, $syncCode, $synchronizedAt = null)
323
    {
324
        $repo = $this->getEntityManager()->getRepository($this->getEmailOriginClass());
325
        $qb   = $repo->createQueryBuilder('o')
326
            ->update()
327
            ->set('o.syncCode', ':code')
328
            ->set('o.syncCodeUpdatedAt', ':updated')
329
            ->where('o.id = :id')
330
            ->setParameter('code', $syncCode)
331
            ->setParameter('updated', $this->getCurrentUtcDateTime())
332
            ->setParameter('id', $origin->getId());
333
334
        if ($synchronizedAt !== null) {
335
            $qb
336
                ->set('o.synchronizedAt', ':synchronized')
337
                ->setParameter('synchronized', $synchronizedAt);
338
        }
339
340
        if ($syncCode === self::SYNC_CODE_IN_PROCESS) {
341
            $qb->andWhere('(o.syncCode IS NULL OR o.syncCode <> :code)');
342
        }
343
344
        if ($syncCode === self::SYNC_CODE_SUCCESS) {
345
            $qb->set('o.syncCount', 'o.syncCount + 1');
346
        }
347
348
        $affectedRows = $qb->getQuery()->execute();
349
350
        return $affectedRows > 0;
351
    }
352
353
    /**
354
     * Finds an email origin to be synchronised
355
     *
356
     * @param int $maxConcurrentTasks   The maximum number of synchronization jobs running in the same time
357
     * @param int $minExecIntervalInMin The minimum time interval (in minutes) between two synchronizations
358
     *                                  of the same email origin
359
     * @return EmailOrigin
360
     */
361
    protected function findOriginToSync($maxConcurrentTasks, $minExecIntervalInMin)
362
    {
363
        $this->logger->info('Finding an email origin ...');
364
365
        $now = $this->getCurrentUtcDateTime();
366
        $border = clone $now;
367
        if ($minExecIntervalInMin > 0) {
368
            $border->sub(new \DateInterval('PT' . $minExecIntervalInMin . 'M'));
369
        }
370
        $min = clone $now;
371
        $min->sub(new \DateInterval('P1Y'));
372
373
        // rules:
374
        // - items with earlier sync code modification dates have higher priority
375
        // - previously failed items are shifted at 30 minutes back (it means that if sync failed
376
        //   the next sync is performed only after 30 minutes)
377
        // - "In Process" items are moved at the end
378
        $repo   = $this->getEntityManager()->getRepository($this->getEmailOriginClass());
379
        $query = $repo->createQueryBuilder('o')
380
            ->select(
381
                'o'
382
                . ', CASE WHEN o.syncCode = :inProcess THEN 0 ELSE 1 END AS HIDDEN p1'
383
                . ', (COALESCE(o.syncCode, 1000) * 30'
384
                . ' + TIMESTAMPDIFF(MINUTE, COALESCE(o.syncCodeUpdatedAt, :min), :now)'
385
                . ' / (CASE o.syncCode WHEN :success THEN 100 ELSE 1 END)) AS HIDDEN p2'
386
            )
387
            ->where('o.isActive = :isActive AND (o.syncCodeUpdatedAt IS NULL OR o.syncCodeUpdatedAt <= :border)')
388
            ->orderBy('p1, p2 DESC, o.syncCodeUpdatedAt')
389
            ->setParameter('inProcess', self::SYNC_CODE_IN_PROCESS)
390
            ->setParameter('success', self::SYNC_CODE_SUCCESS)
391
            ->setParameter('isActive', true)
392
            ->setParameter('now', $now)
393
            ->setParameter('min', $min)
394
            ->setParameter('border', $border)
395
            ->setMaxResults($maxConcurrentTasks + 1)
396
            ->getQuery();
397
398
        /** @var EmailOrigin[] $origins */
399
        $origins = $query->getResult();
400
        $result = null;
401
        foreach ($origins as $origin) {
402
            if ($origin->getSyncCode() !== self::SYNC_CODE_IN_PROCESS) {
403
                $result = $origin;
404
                break;
405
            }
406
        }
407
408
        if ($result === null) {
409
            if (!empty($origins)) {
410
                $this->logger->info('The maximum number of concurrent tasks is reached.');
411
            }
412
            $this->logger->info('An email origin was not found.');
413
        } else {
414
            $this->logger->info(sprintf('Found "%s" email origin. Id: %d.', (string)$result, $result->getId()));
415
        }
416
417
        return $result;
418
    }
419
420
    /**
421
     * Finds active email origin by its id
422
     *
423
     * @param int $originId
424
     * @return EmailOrigin|null
425
     */
426
    protected function findOrigin($originId)
427
    {
428
        $this->logger->info(sprintf('Finding an email origin (id: %d) ...', $originId));
429
430
        $repo  = $this->getEntityManager()->getRepository($this->getEmailOriginClass());
431
        $query = $repo->createQueryBuilder('o')
432
            ->where('o.isActive = :isActive AND o.id = :id')
433
            ->setParameter('isActive', true)
434
            ->setParameter('id', $originId)
435
            ->setMaxResults(1)
436
            ->getQuery();
437
        $origins = $query->getResult();
438
439
        /** @var EmailOrigin $result */
440
        $result = !empty($origins) ? $origins[0] : null;
441
442
        if ($result === null) {
443
            $this->logger->info('An email origin was not found.');
444
        } else {
445
            $this->logger->info(sprintf('Found "%s" email origin. Id: %d.', (string)$result, $result->getId()));
446
        }
447
448
        return $result;
449
    }
450
451
    /**
452
     * Marks outdated "In Process" origins as "Failure" if exist
453
     */
454
    protected function resetHangedOrigins()
455
    {
456
        $this->logger->info('Resetting hanged email origins ...');
457
458
        $now = $this->getCurrentUtcDateTime();
459
        $border = clone $now;
460
        $border->sub(new \DateInterval('P1D'));
461
462
        $repo  = $this->getEntityManager()->getRepository($this->getEmailOriginClass());
463
        $query = $repo->createQueryBuilder('o')
464
            ->update()
465
            ->set('o.syncCode', ':failure')
466
            ->where('o.syncCode = :inProcess AND o.syncCodeUpdatedAt <= :border')
467
            ->setParameter('inProcess', self::SYNC_CODE_IN_PROCESS)
468
            ->setParameter('failure', self::SYNC_CODE_FAILURE)
469
            ->setParameter('border', $border)
470
            ->getQuery();
471
472
        $affectedRows = $query->execute();
473
        $this->logger->info(sprintf('Updated %d row(s).', $affectedRows));
474
    }
475
476
    /**
477
     * Gets a DateTime object that is set to the current date and time in UTC.
478
     *
479
     * @return \DateTime
480
     */
481
    protected function getCurrentUtcDateTime()
482
    {
483
        return new \DateTime('now', new \DateTimeZone('UTC'));
484
    }
485
486
    /**
487
     * @param $failedOriginIds
488
     * @throws \Exception
489
     */
490
    private function assertSyncSuccess(array $failedOriginIds)
491
    {
492
        if ($failedOriginIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $failedOriginIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
493
            throw new \Exception(
494
                sprintf(
495
                    'The email synchronization failed for the following origins: %s.',
496
                    implode(', ', $failedOriginIds)
497
                )
498
            );
499
        }
500
    }
501
}
502