1 | <?php |
||
2 | |||
3 | /* |
||
4 | * @copyright 2014 Mautic Contributors. All rights reserved |
||
5 | * @author Mautic |
||
6 | * |
||
7 | * @link http://mautic.org |
||
8 | * |
||
9 | * @license GNU/GPLv3 http://www.gnu.org/licenses/gpl-3.0.html |
||
10 | */ |
||
11 | |||
12 | namespace Mautic\ChannelBundle\Model; |
||
13 | |||
14 | use Mautic\ChannelBundle\ChannelEvents; |
||
15 | use Mautic\ChannelBundle\Entity\MessageQueue; |
||
16 | use Mautic\ChannelBundle\Event\MessageQueueBatchProcessEvent; |
||
17 | use Mautic\ChannelBundle\Event\MessageQueueEvent; |
||
18 | use Mautic\ChannelBundle\Event\MessageQueueProcessEvent; |
||
19 | use Mautic\CoreBundle\Helper\CoreParametersHelper; |
||
20 | use Mautic\CoreBundle\Model\FormModel; |
||
21 | use Mautic\LeadBundle\Entity\Lead; |
||
22 | use Mautic\LeadBundle\Model\CompanyModel; |
||
23 | use Mautic\LeadBundle\Model\LeadModel; |
||
24 | use Symfony\Component\EventDispatcher\Event; |
||
25 | |||
26 | class MessageQueueModel extends FormModel |
||
27 | { |
||
28 | /** @var string A default message reschedule interval */ |
||
29 | const DEFAULT_RESCHEDULE_INTERVAL = 'PT15M'; |
||
30 | |||
31 | /** |
||
32 | * @var LeadModel |
||
33 | */ |
||
34 | protected $leadModel; |
||
35 | |||
36 | /** |
||
37 | * @var CompanyModel |
||
38 | */ |
||
39 | protected $companyModel; |
||
40 | |||
41 | /** |
||
42 | * @var CoreParametersHelper |
||
43 | */ |
||
44 | protected $coreParametersHelper; |
||
45 | |||
46 | public function __construct(LeadModel $leadModel, CompanyModel $companyModel, CoreParametersHelper $coreParametersHelper) |
||
47 | { |
||
48 | $this->leadModel = $leadModel; |
||
49 | $this->companyModel = $companyModel; |
||
50 | $this->coreParametersHelper = $coreParametersHelper; |
||
51 | } |
||
52 | |||
53 | /** |
||
54 | * @return \Doctrine\ORM\EntityRepository|\Mautic\ChannelBundle\Entity\MessageQueueRepository |
||
55 | */ |
||
56 | public function getRepository() |
||
57 | { |
||
58 | return $this->em->getRepository('MauticChannelBundle:MessageQueue'); |
||
59 | } |
||
60 | |||
61 | /** |
||
62 | * @param $channel |
||
63 | * @param $channelId |
||
64 | * @param null $campaignEventId |
||
65 | * @param int $attempts |
||
66 | * @param int $priority |
||
67 | * @param null $messageQueue |
||
68 | * @param string $statTableName |
||
69 | * @param string $statContactColumn |
||
70 | * @param string $statSentColumn |
||
71 | * |
||
72 | * @return array |
||
73 | */ |
||
74 | public function processFrequencyRules( |
||
75 | array &$leads, |
||
76 | $channel, |
||
77 | $channelId, |
||
78 | $campaignEventId = null, |
||
79 | $attempts = 3, |
||
80 | $priority = MessageQueue::PRIORITY_NORMAL, |
||
81 | $messageQueue = null, |
||
82 | $statTableName = 'email_stats', |
||
83 | $statContactColumn = 'lead_id', |
||
84 | $statSentColumn = 'date_sent' |
||
85 | ) { |
||
86 | $leadIds = array_keys($leads); |
||
87 | $leadIds = array_combine($leadIds, $leadIds); |
||
88 | |||
89 | /** @var \Mautic\LeadBundle\Entity\FrequencyRuleRepository $frequencyRulesRepo */ |
||
90 | $frequencyRulesRepo = $this->em->getRepository('MauticLeadBundle:FrequencyRule'); |
||
91 | $defaultFrequencyNumber = $this->coreParametersHelper->get($channel.'_frequency_number'); |
||
92 | $defaultFrequencyTime = $this->coreParametersHelper->get($channel.'_frequency_time'); |
||
93 | |||
94 | $dontSendTo = $frequencyRulesRepo->getAppliedFrequencyRules( |
||
95 | $channel, |
||
96 | $leadIds, |
||
97 | $defaultFrequencyNumber, |
||
98 | $defaultFrequencyTime, |
||
99 | $statTableName, |
||
100 | $statContactColumn, |
||
101 | $statSentColumn |
||
102 | ); |
||
103 | |||
104 | $queuedContacts = []; |
||
105 | if (!empty($dontSendTo)) { |
||
106 | foreach ($dontSendTo as $frequencyRuleMet) { |
||
107 | // We only deal with date intervals here (no time intervals) so it's safe to use 'P' |
||
108 | $scheduleInterval = new \DateInterval('P1'.substr($frequencyRuleMet['frequency_time'], 0, 1)); |
||
109 | if ($messageQueue && isset($messageQueue[$frequencyRuleMet['lead_id']])) { |
||
110 | $this->reschedule($messageQueue[$frequencyRuleMet['lead_id']], $scheduleInterval); |
||
111 | } else { |
||
112 | // Queue this message to be processed by frequency and priority |
||
113 | $this->queue( |
||
114 | [$leads[$frequencyRuleMet['lead_id']]], |
||
115 | $channel, |
||
116 | $channelId, |
||
117 | $scheduleInterval, |
||
118 | $attempts, |
||
119 | $priority, |
||
120 | $campaignEventId |
||
121 | ); |
||
122 | } |
||
123 | $queuedContacts[$frequencyRuleMet['lead_id']] = $frequencyRuleMet['lead_id']; |
||
124 | unset($leads[$frequencyRuleMet['lead_id']]); |
||
125 | } |
||
126 | } |
||
127 | |||
128 | return $queuedContacts; |
||
129 | } |
||
130 | |||
131 | /** |
||
132 | * Adds messages to the queue. |
||
133 | * |
||
134 | * @param array $leads |
||
135 | * @param string $channel |
||
136 | * @param int $channelId |
||
137 | * @param int $maxAttempts |
||
138 | * @param int $priority |
||
139 | * @param int|null $campaignEventId |
||
140 | * @param array $options |
||
141 | * |
||
142 | * @return bool |
||
143 | */ |
||
144 | public function queue( |
||
145 | $leads, |
||
146 | $channel, |
||
147 | $channelId, |
||
148 | \DateInterval $scheduledInterval, |
||
149 | $maxAttempts = 1, |
||
150 | $priority = 1, |
||
151 | $campaignEventId = null, |
||
152 | $options = [] |
||
153 | ) { |
||
154 | $messageQueues = []; |
||
155 | |||
156 | $scheduledDate = (new \DateTime())->add($scheduledInterval); |
||
157 | |||
158 | foreach ($leads as $lead) { |
||
159 | $leadId = (is_array($lead)) ? $lead['id'] : $lead->getId(); |
||
160 | if (!empty($this->getRepository()->findMessage($channel, $channelId, $leadId))) { |
||
161 | continue; |
||
162 | } |
||
163 | |||
164 | $messageQueue = new MessageQueue(); |
||
165 | if ($campaignEventId) { |
||
166 | $messageQueue->setEvent($this->em->getReference('MauticCampaignBundle:Event', $campaignEventId)); |
||
167 | } |
||
168 | $messageQueue->setChannel($channel); |
||
169 | $messageQueue->setChannelId($channelId); |
||
170 | $messageQueue->setDatePublished(new \DateTime()); |
||
171 | $messageQueue->setMaxAttempts($maxAttempts); |
||
172 | $messageQueue->setLead( |
||
173 | ($lead instanceof Lead) ? $lead : $this->em->getReference('MauticLeadBundle:Lead', $leadId) |
||
174 | ); |
||
175 | $messageQueue->setPriority($priority); |
||
176 | $messageQueue->setScheduledDate($scheduledDate); |
||
177 | $messageQueue->setOptions($options); |
||
178 | |||
179 | $messageQueues[] = $messageQueue; |
||
180 | } |
||
181 | |||
182 | if ($messageQueues) { |
||
183 | $this->saveEntities($messageQueues); |
||
184 | $this->em->clear(MessageQueue::class); |
||
185 | } |
||
186 | |||
187 | return true; |
||
188 | } |
||
189 | |||
190 | /** |
||
191 | * @deprecated to be removed in 3.0; use queue method instead |
||
192 | * |
||
193 | * @param $leads |
||
194 | * @param $channel |
||
195 | * @param $channelId |
||
196 | * @param null $scheduledInterval |
||
197 | * @param int $maxAttempts |
||
198 | * @param int $priority |
||
199 | * @param null $campaignEventId |
||
200 | * @param array $options |
||
201 | * |
||
202 | * @return bool |
||
203 | */ |
||
204 | public function addToQueue( |
||
205 | $leads, |
||
206 | $channel, |
||
207 | $channelId, |
||
208 | $scheduledInterval = null, |
||
209 | $maxAttempts = 1, |
||
210 | $priority = 1, |
||
211 | $campaignEventId = null, |
||
212 | $options = [] |
||
213 | ) { |
||
214 | $messageQueues = []; |
||
215 | |||
216 | if (!$scheduledInterval) { |
||
217 | $scheduledDate = new \DateTime(); |
||
218 | } elseif (!$scheduledInterval instanceof \DateTime) { |
||
219 | $scheduledInterval = (('H' === $scheduledInterval) ? 'PT' : 'P').$scheduledInterval; |
||
220 | $scheduledDate = (new \DateTime())->add(new \DateInterval($scheduledInterval)); |
||
221 | } |
||
222 | |||
223 | foreach ($leads as $lead) { |
||
224 | $leadId = (is_array($lead)) ? $lead['id'] : $lead->getId(); |
||
225 | if (!empty($this->getRepository()->findMessage($channel, $channelId, $leadId))) { |
||
226 | continue; |
||
227 | } |
||
228 | |||
229 | $messageQueue = new MessageQueue(); |
||
230 | if ($campaignEventId) { |
||
231 | $messageQueue->setEvent($this->em->getReference('MauticCampaignBundle:Event', $campaignEventId)); |
||
232 | } |
||
233 | $messageQueue->setChannel($channel); |
||
234 | $messageQueue->setChannelId($channelId); |
||
235 | $messageQueue->setDatePublished(new \DateTime()); |
||
236 | $messageQueue->setMaxAttempts($maxAttempts); |
||
237 | $messageQueue->setLead( |
||
238 | ($lead instanceof Lead) ? $lead : $this->em->getReference('MauticLeadBundle:Lead', $leadId) |
||
239 | ); |
||
240 | $messageQueue->setPriority($priority); |
||
241 | $messageQueue->setScheduledDate($scheduledDate); |
||
242 | $messageQueue->setOptions($options); |
||
243 | |||
244 | $messageQueues[] = $messageQueue; |
||
245 | } |
||
246 | |||
247 | if ($messageQueues) { |
||
248 | $this->saveEntities($messageQueues); |
||
249 | $this->em->clear(MessageQueue::class); |
||
250 | } |
||
251 | |||
252 | return true; |
||
253 | } |
||
254 | |||
255 | /** |
||
256 | * @param null $channel |
||
257 | * @param null $channelId |
||
258 | * |
||
259 | * @return int |
||
260 | */ |
||
261 | public function sendMessages($channel = null, $channelId = null) |
||
262 | { |
||
263 | // Note when the process started for batch purposes |
||
264 | $processStarted = new \DateTime(); |
||
265 | $limit = 50; |
||
266 | $counter = 0; |
||
267 | while ($queue = $this->getRepository()->getQueuedMessages($limit, $processStarted, $channel, $channelId)) { |
||
268 | $counter += $this->processMessageQueue($queue); |
||
269 | |||
270 | // Remove the entities from memory |
||
271 | $this->em->clear(MessageQueue::class); |
||
272 | $this->em->clear(Lead::class); |
||
273 | $this->em->clear(\Mautic\CampaignBundle\Entity\Event::class); |
||
274 | } |
||
275 | |||
276 | return $counter; |
||
277 | } |
||
278 | |||
279 | /** |
||
280 | * @param $queue |
||
281 | * |
||
282 | * @return int |
||
283 | */ |
||
284 | public function processMessageQueue($queue) |
||
285 | { |
||
286 | if (!is_array($queue)) { |
||
287 | if (!$queue instanceof MessageQueue) { |
||
288 | throw new \InvalidArgumentException('$queue must be an instance of '.MessageQueue::class); |
||
289 | } |
||
290 | |||
291 | $queue = [$queue->getId() => $queue]; |
||
292 | } |
||
293 | |||
294 | $counter = 0; |
||
295 | $contacts = []; |
||
296 | $byChannel = []; |
||
297 | |||
298 | // Lead entities will not have profile fields populated due to the custom field use - therefore to optimize resources, |
||
299 | // get a list of leads to fetch details all at once along with company details for dynamic email content, etc |
||
300 | /** @var MessageQueue $message */ |
||
301 | foreach ($queue as $message) { |
||
302 | if ($message->getLead()) { |
||
303 | $contacts[$message->getId()] = $message->getLead()->getId(); |
||
304 | } |
||
305 | } |
||
306 | if (!empty($contacts)) { |
||
307 | $contactData = $this->leadModel->getRepository()->getContacts($contacts); |
||
308 | $companyData = $this->companyModel->getRepository()->getCompaniesForContacts($contacts); |
||
309 | foreach ($contacts as $messageId => $contactId) { |
||
310 | $contactData[$contactId]['companies'] = isset($companyData[$contactId]) ? $companyData[$contactId] : null; |
||
311 | $queue[$messageId]->getLead()->setFields($contactData[$contactId]); |
||
312 | } |
||
313 | } |
||
314 | // Group queue by channel and channel ID - this make it possible for processing listeners to batch process such as |
||
315 | // sending emails in batches to 3rd party transactional services via HTTP APIs |
||
316 | foreach ($queue as $key => $message) { |
||
317 | if (MessageQueue::STATUS_SENT == $message->getStatus()) { |
||
318 | unset($queue[$key]); |
||
319 | continue; |
||
320 | } |
||
321 | |||
322 | $messageChannel = $message->getChannel(); |
||
323 | $messageChannelId = $message->getChannelId(); |
||
324 | if (!$messageChannelId) { |
||
325 | $messageChannelId = 0; |
||
326 | } |
||
327 | |||
328 | if (!isset($byChannel[$messageChannel])) { |
||
329 | $byChannel[$messageChannel] = []; |
||
330 | } |
||
331 | if (!isset($byChannel[$messageChannel][$messageChannelId])) { |
||
332 | $byChannel[$messageChannel][$messageChannelId] = []; |
||
333 | } |
||
334 | |||
335 | $byChannel[$messageChannel][$messageChannelId][] = $message; |
||
336 | } |
||
337 | |||
338 | // First try to batch process each channel |
||
339 | foreach ($byChannel as $messageChannel => $channelMessages) { |
||
340 | foreach ($channelMessages as $messageChannelId => $messages) { |
||
341 | $event = new MessageQueueBatchProcessEvent($messages, $messageChannel, $messageChannelId); |
||
342 | $ignore = null; |
||
343 | $this->dispatchEvent('process_batch_message_queue', $ignore, false, $event); |
||
344 | } |
||
345 | } |
||
346 | unset($byChannel); |
||
347 | |||
348 | // Now check to see if the message was processed by the listener and if not |
||
349 | // send it through a single process event listener |
||
350 | foreach ($queue as $message) { |
||
351 | if (!$message->isProcessed()) { |
||
352 | $event = new MessageQueueProcessEvent($message); |
||
353 | $this->dispatchEvent('process_message_queue', $message, false, $event); |
||
354 | } |
||
355 | |||
356 | if ($message->isSuccess()) { |
||
357 | ++$counter; |
||
358 | $message->setSuccess(); |
||
359 | $message->setLastAttempt(new \DateTime()); |
||
360 | $message->setDateSent(new \DateTime()); |
||
361 | $message->setStatus(MessageQueue::STATUS_SENT); |
||
362 | } elseif ($message->isFailed()) { |
||
363 | // Failure such as email delivery issue or something so retry in a short time |
||
364 | $this->reschedule($message, new \DateInterval(self::DEFAULT_RESCHEDULE_INTERVAL)); |
||
365 | } // otherwise assume the listener did something such as rescheduling the message |
||
366 | } |
||
367 | |||
368 | //add listener |
||
369 | $this->saveEntities($queue); |
||
370 | |||
371 | return $counter; |
||
372 | } |
||
373 | |||
374 | /** |
||
375 | * @param bool $persist |
||
376 | */ |
||
377 | public function reschedule($message, \DateInterval $rescheduleInterval, $leadId = null, $channel = null, $channelId = null, $persist = false) |
||
378 | { |
||
379 | if (!$message instanceof MessageQueue && $leadId && $channel && $channelId) { |
||
380 | $message = $this->getRepository()->findMessage($channel, $channelId, $leadId); |
||
381 | $persist = true; |
||
382 | } |
||
383 | |||
384 | if (!$message) { |
||
385 | return; |
||
386 | } |
||
387 | |||
388 | $message->setAttempts($message->getAttempts() + 1); |
||
389 | $message->setLastAttempt(new \DateTime()); |
||
390 | $rescheduleTo = clone $message->getScheduledDate(); |
||
391 | |||
392 | $rescheduleTo->add($rescheduleInterval); |
||
393 | $message->setScheduledDate($rescheduleTo); |
||
394 | $message->setStatus(MessageQueue::STATUS_RESCHEDULED); |
||
395 | |||
396 | if ($persist) { |
||
397 | $this->saveEntity($message); |
||
398 | } |
||
399 | |||
400 | // Mark as processed for listeners |
||
401 | $message->setProcessed(); |
||
402 | } |
||
403 | |||
404 | /** |
||
405 | * @deprecated to be removed in 3.0; use reschedule method instead |
||
406 | * |
||
407 | * @param $message |
||
408 | * @param string $rescheduleInterval |
||
409 | * @param null $leadId |
||
410 | * @param null $channel |
||
411 | * @param null $channelId |
||
412 | * @param bool $persist |
||
413 | */ |
||
414 | public function rescheduleMessage($message, $rescheduleInterval = null, $leadId = null, $channel = null, $channelId = null, $persist = false) |
||
415 | { |
||
416 | $rescheduleInterval = null == $rescheduleInterval ? self::DEFAULT_RESCHEDULE_INTERVAL : ('P'.$rescheduleInterval); |
||
417 | |||
418 | return $this->reschedule($message, new \DateInterval($rescheduleInterval), $leadId, $channel, $channelId, $persist); |
||
0 ignored issues
–
show
|
|||
419 | } |
||
420 | |||
421 | /** |
||
422 | * @param $channel |
||
423 | * @param array $channelIds |
||
424 | */ |
||
425 | public function getQueuedChannelCount($channel, $channelIds = []) |
||
426 | { |
||
427 | return $this->getRepository()->getQueuedChannelCount($channel, $channelIds); |
||
428 | } |
||
429 | |||
430 | /** |
||
431 | * {@inheritdoc} |
||
432 | * |
||
433 | * @param $action |
||
434 | * @param $entity |
||
435 | * @param $isNew |
||
436 | * @param $event |
||
437 | * |
||
438 | * @throws \Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException |
||
439 | */ |
||
440 | protected function dispatchEvent($action, &$entity, $isNew = false, Event $event = null) |
||
441 | { |
||
442 | switch ($action) { |
||
443 | case 'process_message_queue': |
||
444 | $name = ChannelEvents::PROCESS_MESSAGE_QUEUE; |
||
445 | break; |
||
446 | case 'process_batch_message_queue': |
||
447 | $name = ChannelEvents::PROCESS_MESSAGE_QUEUE_BATCH; |
||
448 | break; |
||
449 | case 'post_save': |
||
450 | $name = ChannelEvents::MESSAGE_QUEUED; |
||
451 | break; |
||
452 | default: |
||
453 | return null; |
||
454 | } |
||
455 | |||
456 | if ($this->dispatcher->hasListeners($name)) { |
||
457 | if (empty($event)) { |
||
458 | $event = new MessageQueueEvent($entity, $isNew); |
||
459 | $event->setEntityManager($this->em); |
||
460 | } |
||
461 | $this->dispatcher->dispatch($name, $event); |
||
462 | |||
463 | return $event; |
||
464 | } else { |
||
465 | return null; |
||
466 | } |
||
467 | } |
||
468 | } |
||
469 |
This check looks for function or method calls that always return null and whose return value is used.
The method
getObject()
can return nothing but null, so it makes no sense to use the return value.The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.