Issues (3627)

bundles/ChannelBundle/Model/MessageQueueModel.php (4 issues)

1
<?php
2
3
/*
4
 * @copyright   2014 Mautic Contributors. All rights reserved
5
 * @author      Mautic
6
 *
7
 * @link        http://mautic.org
8
 *
9
 * @license     GNU/GPLv3 http://www.gnu.org/licenses/gpl-3.0.html
10
 */
11
12
namespace Mautic\ChannelBundle\Model;
13
14
use Mautic\ChannelBundle\ChannelEvents;
15
use Mautic\ChannelBundle\Entity\MessageQueue;
16
use Mautic\ChannelBundle\Event\MessageQueueBatchProcessEvent;
17
use Mautic\ChannelBundle\Event\MessageQueueEvent;
18
use Mautic\ChannelBundle\Event\MessageQueueProcessEvent;
19
use Mautic\CoreBundle\Helper\CoreParametersHelper;
20
use Mautic\CoreBundle\Model\FormModel;
21
use Mautic\LeadBundle\Entity\Lead;
22
use Mautic\LeadBundle\Model\CompanyModel;
23
use Mautic\LeadBundle\Model\LeadModel;
24
use Symfony\Component\EventDispatcher\Event;
25
26
class MessageQueueModel extends FormModel
27
{
28
    /** @var string A default message reschedule interval */
29
    const DEFAULT_RESCHEDULE_INTERVAL = 'PT15M';
30
31
    /**
32
     * @var LeadModel
33
     */
34
    protected $leadModel;
35
36
    /**
37
     * @var CompanyModel
38
     */
39
    protected $companyModel;
40
41
    /**
42
     * @var CoreParametersHelper
43
     */
44
    protected $coreParametersHelper;
45
46
    public function __construct(LeadModel $leadModel, CompanyModel $companyModel, CoreParametersHelper $coreParametersHelper)
47
    {
48
        $this->leadModel            = $leadModel;
49
        $this->companyModel         = $companyModel;
50
        $this->coreParametersHelper = $coreParametersHelper;
51
    }
52
53
    /**
54
     * @return \Doctrine\ORM\EntityRepository|\Mautic\ChannelBundle\Entity\MessageQueueRepository
55
     */
56
    public function getRepository()
57
    {
58
        return $this->em->getRepository('MauticChannelBundle:MessageQueue');
59
    }
60
61
    /**
62
     * @param        $channel
63
     * @param        $channelId
64
     * @param null   $campaignEventId
65
     * @param int    $attempts
66
     * @param int    $priority
67
     * @param null   $messageQueue
68
     * @param string $statTableName
69
     * @param string $statContactColumn
70
     * @param string $statSentColumn
71
     *
72
     * @return array
73
     */
74
    public function processFrequencyRules(
75
        array &$leads,
76
        $channel,
77
        $channelId,
78
        $campaignEventId = null,
79
        $attempts = 3,
80
        $priority = MessageQueue::PRIORITY_NORMAL,
81
        $messageQueue = null,
82
        $statTableName = 'email_stats',
83
        $statContactColumn = 'lead_id',
84
        $statSentColumn = 'date_sent'
85
    ) {
86
        $leadIds = array_keys($leads);
87
        $leadIds = array_combine($leadIds, $leadIds);
88
89
        /** @var \Mautic\LeadBundle\Entity\FrequencyRuleRepository $frequencyRulesRepo */
90
        $frequencyRulesRepo     = $this->em->getRepository('MauticLeadBundle:FrequencyRule');
91
        $defaultFrequencyNumber = $this->coreParametersHelper->get($channel.'_frequency_number');
92
        $defaultFrequencyTime   = $this->coreParametersHelper->get($channel.'_frequency_time');
93
94
        $dontSendTo = $frequencyRulesRepo->getAppliedFrequencyRules(
95
            $channel,
96
            $leadIds,
97
            $defaultFrequencyNumber,
98
            $defaultFrequencyTime,
99
            $statTableName,
100
            $statContactColumn,
101
            $statSentColumn
102
        );
103
104
        $queuedContacts = [];
105
        if (!empty($dontSendTo)) {
106
            foreach ($dontSendTo as $frequencyRuleMet) {
107
                // We only deal with date intervals here (no time intervals) so it's safe to use 'P'
108
                $scheduleInterval = new \DateInterval('P1'.substr($frequencyRuleMet['frequency_time'], 0, 1));
109
                if ($messageQueue && isset($messageQueue[$frequencyRuleMet['lead_id']])) {
110
                    $this->reschedule($messageQueue[$frequencyRuleMet['lead_id']], $scheduleInterval);
111
                } else {
112
                    // Queue this message to be processed by frequency and priority
113
                    $this->queue(
114
                        [$leads[$frequencyRuleMet['lead_id']]],
115
                        $channel,
116
                        $channelId,
117
                        $scheduleInterval,
118
                        $attempts,
119
                        $priority,
120
                        $campaignEventId
121
                    );
122
                }
123
                $queuedContacts[$frequencyRuleMet['lead_id']] = $frequencyRuleMet['lead_id'];
124
                unset($leads[$frequencyRuleMet['lead_id']]);
125
            }
126
        }
127
128
        return $queuedContacts;
129
    }
130
131
    /**
132
     * Adds messages to the queue.
133
     *
134
     * @param array    $leads
135
     * @param string   $channel
136
     * @param int      $channelId
137
     * @param int      $maxAttempts
138
     * @param int      $priority
139
     * @param int|null $campaignEventId
140
     * @param array    $options
141
     *
142
     * @return bool
143
     */
144
    public function queue(
145
        $leads,
146
        $channel,
147
        $channelId,
148
        \DateInterval $scheduledInterval,
149
        $maxAttempts = 1,
150
        $priority = 1,
151
        $campaignEventId = null,
152
        $options = []
153
    ) {
154
        $messageQueues = [];
155
156
        $scheduledDate = (new \DateTime())->add($scheduledInterval);
157
158
        foreach ($leads as $lead) {
159
            $leadId = (is_array($lead)) ? $lead['id'] : $lead->getId();
160
            if (!empty($this->getRepository()->findMessage($channel, $channelId, $leadId))) {
161
                continue;
162
            }
163
164
            $messageQueue = new MessageQueue();
165
            if ($campaignEventId) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $campaignEventId of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
166
                $messageQueue->setEvent($this->em->getReference('MauticCampaignBundle:Event', $campaignEventId));
167
            }
168
            $messageQueue->setChannel($channel);
169
            $messageQueue->setChannelId($channelId);
170
            $messageQueue->setDatePublished(new \DateTime());
171
            $messageQueue->setMaxAttempts($maxAttempts);
172
            $messageQueue->setLead(
173
                ($lead instanceof Lead) ? $lead : $this->em->getReference('MauticLeadBundle:Lead', $leadId)
174
            );
175
            $messageQueue->setPriority($priority);
176
            $messageQueue->setScheduledDate($scheduledDate);
177
            $messageQueue->setOptions($options);
178
179
            $messageQueues[] = $messageQueue;
180
        }
181
182
        if ($messageQueues) {
183
            $this->saveEntities($messageQueues);
184
            $this->em->clear(MessageQueue::class);
185
        }
186
187
        return true;
188
    }
189
190
    /**
191
     * @deprecated to be removed in 3.0; use queue method instead
192
     *
193
     * @param       $leads
194
     * @param       $channel
195
     * @param       $channelId
196
     * @param null  $scheduledInterval
197
     * @param int   $maxAttempts
198
     * @param int   $priority
199
     * @param null  $campaignEventId
200
     * @param array $options
201
     *
202
     * @return bool
203
     */
204
    public function addToQueue(
205
        $leads,
206
        $channel,
207
        $channelId,
208
        $scheduledInterval = null,
209
        $maxAttempts = 1,
210
        $priority = 1,
211
        $campaignEventId = null,
212
        $options = []
213
    ) {
214
        $messageQueues = [];
215
216
        if (!$scheduledInterval) {
217
            $scheduledDate = new \DateTime();
218
        } elseif (!$scheduledInterval instanceof \DateTime) {
219
            $scheduledInterval = (('H' === $scheduledInterval) ? 'PT' : 'P').$scheduledInterval;
220
            $scheduledDate     = (new \DateTime())->add(new \DateInterval($scheduledInterval));
221
        }
222
223
        foreach ($leads as $lead) {
224
            $leadId = (is_array($lead)) ? $lead['id'] : $lead->getId();
225
            if (!empty($this->getRepository()->findMessage($channel, $channelId, $leadId))) {
226
                continue;
227
            }
228
229
            $messageQueue = new MessageQueue();
230
            if ($campaignEventId) {
231
                $messageQueue->setEvent($this->em->getReference('MauticCampaignBundle:Event', $campaignEventId));
232
            }
233
            $messageQueue->setChannel($channel);
234
            $messageQueue->setChannelId($channelId);
235
            $messageQueue->setDatePublished(new \DateTime());
236
            $messageQueue->setMaxAttempts($maxAttempts);
237
            $messageQueue->setLead(
238
                ($lead instanceof Lead) ? $lead : $this->em->getReference('MauticLeadBundle:Lead', $leadId)
0 ignored issues
show
It seems like $lead instanceof Mautic\...dBundle:Lead', $leadId) can also be of type null; however, parameter $lead of Mautic\ChannelBundle\Ent...MessageQueue::setLead() does only seem to accept Mautic\LeadBundle\Entity\Lead, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

238
                /** @scrutinizer ignore-type */ ($lead instanceof Lead) ? $lead : $this->em->getReference('MauticLeadBundle:Lead', $leadId)
Loading history...
239
            );
240
            $messageQueue->setPriority($priority);
241
            $messageQueue->setScheduledDate($scheduledDate);
242
            $messageQueue->setOptions($options);
243
244
            $messageQueues[] = $messageQueue;
245
        }
246
247
        if ($messageQueues) {
248
            $this->saveEntities($messageQueues);
249
            $this->em->clear(MessageQueue::class);
250
        }
251
252
        return true;
253
    }
254
255
    /**
256
     * @param null $channel
257
     * @param null $channelId
258
     *
259
     * @return int
260
     */
261
    public function sendMessages($channel = null, $channelId = null)
262
    {
263
        // Note when the process started for batch purposes
264
        $processStarted = new \DateTime();
265
        $limit          = 50;
266
        $counter        = 0;
267
        while ($queue = $this->getRepository()->getQueuedMessages($limit, $processStarted, $channel, $channelId)) {
268
            $counter += $this->processMessageQueue($queue);
269
270
            // Remove the entities from memory
271
            $this->em->clear(MessageQueue::class);
272
            $this->em->clear(Lead::class);
273
            $this->em->clear(\Mautic\CampaignBundle\Entity\Event::class);
274
        }
275
276
        return $counter;
277
    }
278
279
    /**
280
     * @param $queue
281
     *
282
     * @return int
283
     */
284
    public function processMessageQueue($queue)
285
    {
286
        if (!is_array($queue)) {
287
            if (!$queue instanceof MessageQueue) {
288
                throw new \InvalidArgumentException('$queue must be an instance of '.MessageQueue::class);
289
            }
290
291
            $queue = [$queue->getId() => $queue];
292
        }
293
294
        $counter   = 0;
295
        $contacts  = [];
296
        $byChannel = [];
297
298
        // Lead entities will not have profile fields populated due to the custom field use - therefore to optimize resources,
299
        // get a list of leads to fetch details all at once along with company details for dynamic email content, etc
300
        /** @var MessageQueue $message */
301
        foreach ($queue as $message) {
302
            if ($message->getLead()) {
303
                $contacts[$message->getId()] = $message->getLead()->getId();
304
            }
305
        }
306
        if (!empty($contacts)) {
307
            $contactData = $this->leadModel->getRepository()->getContacts($contacts);
308
            $companyData = $this->companyModel->getRepository()->getCompaniesForContacts($contacts);
309
            foreach ($contacts as $messageId => $contactId) {
310
                $contactData[$contactId]['companies'] = isset($companyData[$contactId]) ? $companyData[$contactId] : null;
311
                $queue[$messageId]->getLead()->setFields($contactData[$contactId]);
312
            }
313
        }
314
        // Group queue by channel and channel ID - this make it possible for processing listeners to batch process such as
315
        // sending emails in batches to 3rd party transactional services via HTTP APIs
316
        foreach ($queue as $key => $message) {
317
            if (MessageQueue::STATUS_SENT == $message->getStatus()) {
318
                unset($queue[$key]);
319
                continue;
320
            }
321
322
            $messageChannel   = $message->getChannel();
323
            $messageChannelId = $message->getChannelId();
324
            if (!$messageChannelId) {
325
                $messageChannelId = 0;
326
            }
327
328
            if (!isset($byChannel[$messageChannel])) {
329
                $byChannel[$messageChannel] = [];
330
            }
331
            if (!isset($byChannel[$messageChannel][$messageChannelId])) {
332
                $byChannel[$messageChannel][$messageChannelId] = [];
333
            }
334
335
            $byChannel[$messageChannel][$messageChannelId][] = $message;
336
        }
337
338
        // First try to batch process each channel
339
        foreach ($byChannel as $messageChannel => $channelMessages) {
340
            foreach ($channelMessages as $messageChannelId => $messages) {
341
                $event  = new MessageQueueBatchProcessEvent($messages, $messageChannel, $messageChannelId);
342
                $ignore = null;
343
                $this->dispatchEvent('process_batch_message_queue', $ignore, false, $event);
344
            }
345
        }
346
        unset($byChannel);
347
348
        // Now check to see if the message was processed by the listener and if not
349
        // send it through a single process event listener
350
        foreach ($queue as $message) {
351
            if (!$message->isProcessed()) {
352
                $event = new MessageQueueProcessEvent($message);
353
                $this->dispatchEvent('process_message_queue', $message, false, $event);
354
            }
355
356
            if ($message->isSuccess()) {
357
                ++$counter;
358
                $message->setSuccess();
359
                $message->setLastAttempt(new \DateTime());
360
                $message->setDateSent(new \DateTime());
361
                $message->setStatus(MessageQueue::STATUS_SENT);
362
            } elseif ($message->isFailed()) {
363
                // Failure such as email delivery issue or something so retry in a short time
364
                $this->reschedule($message, new \DateInterval(self::DEFAULT_RESCHEDULE_INTERVAL));
365
            } // otherwise assume the listener did something such as rescheduling the message
366
        }
367
368
        //add listener
369
        $this->saveEntities($queue);
370
371
        return $counter;
372
    }
373
374
    /**
375
     * @param bool $persist
376
     */
377
    public function reschedule($message, \DateInterval $rescheduleInterval, $leadId = null, $channel = null, $channelId = null, $persist = false)
378
    {
379
        if (!$message instanceof MessageQueue && $leadId && $channel && $channelId) {
380
            $message = $this->getRepository()->findMessage($channel, $channelId, $leadId);
381
            $persist = true;
382
        }
383
384
        if (!$message) {
385
            return;
386
        }
387
388
        $message->setAttempts($message->getAttempts() + 1);
389
        $message->setLastAttempt(new \DateTime());
390
        $rescheduleTo = clone $message->getScheduledDate();
391
392
        $rescheduleTo->add($rescheduleInterval);
393
        $message->setScheduledDate($rescheduleTo);
394
        $message->setStatus(MessageQueue::STATUS_RESCHEDULED);
395
396
        if ($persist) {
397
            $this->saveEntity($message);
398
        }
399
400
        // Mark as processed for listeners
401
        $message->setProcessed();
402
    }
403
404
    /**
405
     * @deprecated to be removed in 3.0; use reschedule method instead
406
     *
407
     * @param        $message
408
     * @param string $rescheduleInterval
409
     * @param null   $leadId
410
     * @param null   $channel
411
     * @param null   $channelId
412
     * @param bool   $persist
413
     */
414
    public function rescheduleMessage($message, $rescheduleInterval = null, $leadId = null, $channel = null, $channelId = null, $persist = false)
415
    {
416
        $rescheduleInterval = null == $rescheduleInterval ? self::DEFAULT_RESCHEDULE_INTERVAL : ('P'.$rescheduleInterval);
0 ignored issues
show
It seems like you are loosely comparing $rescheduleInterval of type null|string against null; this is ambiguous if the string can be empty. Consider using a strict comparison === instead.
Loading history...
417
418
        return $this->reschedule($message, new \DateInterval($rescheduleInterval), $leadId, $channel, $channelId, $persist);
0 ignored issues
show
Are you sure the usage of $this->reschedule($messa..., $channelId, $persist) targeting Mautic\ChannelBundle\Mod...ueueModel::reschedule() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
419
    }
420
421
    /**
422
     * @param       $channel
423
     * @param array $channelIds
424
     */
425
    public function getQueuedChannelCount($channel, $channelIds = [])
426
    {
427
        return $this->getRepository()->getQueuedChannelCount($channel, $channelIds);
428
    }
429
430
    /**
431
     * {@inheritdoc}
432
     *
433
     * @param $action
434
     * @param $entity
435
     * @param $isNew
436
     * @param $event
437
     *
438
     * @throws \Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException
439
     */
440
    protected function dispatchEvent($action, &$entity, $isNew = false, Event $event = null)
441
    {
442
        switch ($action) {
443
            case 'process_message_queue':
444
                $name = ChannelEvents::PROCESS_MESSAGE_QUEUE;
445
                break;
446
            case 'process_batch_message_queue':
447
                $name = ChannelEvents::PROCESS_MESSAGE_QUEUE_BATCH;
448
                break;
449
            case 'post_save':
450
                $name = ChannelEvents::MESSAGE_QUEUED;
451
                break;
452
            default:
453
                return null;
454
        }
455
456
        if ($this->dispatcher->hasListeners($name)) {
457
            if (empty($event)) {
458
                $event = new MessageQueueEvent($entity, $isNew);
459
                $event->setEntityManager($this->em);
460
            }
461
            $this->dispatcher->dispatch($name, $event);
462
463
            return $event;
464
        } else {
465
            return null;
466
        }
467
    }
468
}
469