Issues (3627)

app/bundles/WebhookBundle/Model/WebhookModel.php (2 issues)

1
<?php
2
3
/*
4
 * @copyright   2014 Mautic Contributors. All rights reserved
5
 * @author      Mautic
6
 *
7
 * @link        http://mautic.org
8
 *
9
 * @license     GNU/GPLv3 http://www.gnu.org/licenses/gpl-3.0.html
10
 */
11
12
namespace Mautic\WebhookBundle\Model;
13
14
use Doctrine\Common\Collections\Criteria;
15
use JMS\Serializer\SerializationContext;
16
use JMS\Serializer\SerializerInterface;
17
use Mautic\ApiBundle\Serializer\Exclusion\PublishDetailsExclusionStrategy;
18
use Mautic\CoreBundle\Helper\CoreParametersHelper;
19
use Mautic\CoreBundle\Helper\EncryptionHelper;
20
use Mautic\CoreBundle\Model\FormModel;
21
use Mautic\WebhookBundle\Entity\Event;
22
use Mautic\WebhookBundle\Entity\EventRepository;
23
use Mautic\WebhookBundle\Entity\Log;
24
use Mautic\WebhookBundle\Entity\LogRepository;
25
use Mautic\WebhookBundle\Entity\Webhook;
26
use Mautic\WebhookBundle\Entity\WebhookQueue;
27
use Mautic\WebhookBundle\Entity\WebhookQueueRepository;
28
use Mautic\WebhookBundle\Entity\WebhookRepository;
29
use Mautic\WebhookBundle\Event as Events;
30
use Mautic\WebhookBundle\Event\WebhookEvent;
31
use Mautic\WebhookBundle\Form\Type\WebhookType;
32
use Mautic\WebhookBundle\Http\Client;
33
use Mautic\WebhookBundle\WebhookEvents;
34
use Symfony\Component\EventDispatcher\Event as SymfonyEvent;
35
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
36
use Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException;
37
38
class WebhookModel extends FormModel
39
{
40
    /**
41
     *  2 possible types of the processing of the webhooks.
42
     */
43
    const COMMAND_PROCESS   = 'command_process';
44
    const IMMEDIATE_PROCESS = 'immediate_process';
45
46
    /**
47
     * Whet queue mode is turned on.
48
     *
49
     * @var string
50
     */
51
    protected $queueMode;
52
53
    /**
54
     * How many entities to add into one queued webhook.
55
     *
56
     * @var int
57
     */
58
    protected $webhookLimit;
59
60
    /**
61
     * How many responses in 1 row can fail until the webhook disables itself.
62
     *
63
     * @var int
64
     */
65
    protected $disableLimit;
66
67
    /**
68
     * How many seconds will we wait for the response.
69
     *
70
     * @var int in seconds
71
     */
72
    protected $webhookTimeout;
73
74
    /**
75
     * The key is queue ID, the value is the WebhookQueue object.
76
     *
77
     * @var array
78
     */
79
    protected $webhookQueueIdList = [];
80
81
    /**
82
     * How many recent log records should be kept.
83
     *
84
     * @var int
85
     */
86
    protected $logMax;
87
88
    /**
89
     * @var SerializerInterface
90
     */
91
    protected $serializer;
92
93
    /**
94
     * Queued events default order by dir
95
     * Possible values: ['ASC', 'DESC'].
96
     *
97
     * @var string
98
     */
99
    protected $eventsOrderByDir;
100
101
    /**
102
     * @var Client
103
     */
104
    private $httpClient;
105
106
    /**
107
     * @var EventDispatcherInterface
108
     */
109
    private $eventDispatcher;
110
111
    public function __construct(
112
        CoreParametersHelper $coreParametersHelper,
113
        SerializerInterface $serializer,
114
        Client $httpClient,
115
        EventDispatcherInterface $eventDispatcher
116
    ) {
117
        $this->setConfigProps($coreParametersHelper);
118
        $this->serializer        = $serializer;
119
        $this->httpClient        = $httpClient;
120
        $this->eventDispatcher   = $eventDispatcher;
121
    }
122
123
    /**
124
     * @param Webhook $entity
125
     */
126
    public function saveEntity($entity, $unlock = true)
127
    {
128
        if (null === $entity->getSecret()) {
129
            $entity->setSecret(EncryptionHelper::generateKey());
130
        }
131
132
        parent::saveEntity($entity, $unlock);
133
    }
134
135
    /**
136
     * @param Webhook $entity
137
     * @param         $formFactory
138
     * @param null    $action
139
     *
140
     * @return mixed
141
     *
142
     * @throws \Symfony\Component\HttpKernel\Exception\NotFoundHttpException
143
     */
144
    public function createForm($entity, $formFactory, $action = null, $params = [])
145
    {
146
        if (!$entity instanceof Webhook) {
0 ignored issues
show
$entity is always a sub-type of Mautic\WebhookBundle\Entity\Webhook.
Loading history...
147
            throw new MethodNotAllowedHttpException(['Webhook']);
148
        }
149
150
        if (!empty($action)) {
151
            $params['action'] = $action;
152
        }
153
154
        $params['events'] = $this->getEvents();
155
156
        return $formFactory->create(WebhookType::class, $entity, $params);
157
    }
158
159
    /**
160
     * @return Webhook|null
161
     */
162
    public function getEntity($id = null)
163
    {
164
        if (null === $id) {
165
            return new Webhook();
166
        }
167
168
        return parent::getEntity($id);
169
    }
170
171
    /**
172
     * @return WebhookRepository
173
     */
174
    public function getRepository()
175
    {
176
        return $this->em->getRepository(Webhook::class);
177
    }
178
179
    /**
180
     * Gets array of custom events from bundles subscribed MauticWehbhookBundle::WEBHOOK_ON_BUILD.
181
     *
182
     * @return mixed
183
     */
184
    public function getEvents()
185
    {
186
        static $events;
187
188
        if (empty($events)) {
189
            //build them
190
            $events = [];
191
            $event  = new Events\WebhookBuilderEvent($this->translator);
192
            $this->dispatcher->dispatch(WebhookEvents::WEBHOOK_ON_BUILD, $event);
193
            $events = $event->getEvents();
194
        }
195
196
        return $events;
197
    }
198
199
    /**
200
     * Get a list of webhooks by matching events.
201
     *
202
     * @param string $type string of event type
203
     *
204
     * @return array
205
     */
206
    public function getEventWebooksByType($type)
207
    {
208
        return $this->getEventRepository()->getEntitiesByEventType($type);
209
    }
210
211
    /**
212
     * @param $type
213
     * @param $payload
214
     * @param $groups
215
     */
216
    public function queueWebhooksByType($type, $payload, array $groups = [])
217
    {
218
        return $this->queueWebhooks(
219
            $this->getEventWebooksByType($type),
220
            $payload,
221
            $groups
222
        );
223
    }
224
225
    /**
226
     * @param $webhookEvents
227
     * @param $payload
228
     */
229
    public function queueWebhooks($webhookEvents, $payload, array $serializationGroups = [])
230
    {
231
        if (!count($webhookEvents) || !is_array($webhookEvents)) {
232
            return;
233
        }
234
235
        /** @var \Mautic\WebhookBundle\Entity\Event $event */
236
        foreach ($webhookEvents as $event) {
237
            $webhook = $event->getWebhook();
238
            $queue   = $this->queueWebhook($webhook, $event, $payload, $serializationGroups);
239
240
            if (self::COMMAND_PROCESS === $this->queueMode) {
241
                // Queue to the database to process later
242
                $this->getQueueRepository()->saveEntity($queue);
243
            } else {
244
                // Immediately process
245
                $this->processWebhook($webhook, $queue);
246
            }
247
        }
248
    }
249
250
    /**
251
     * Creates a WebhookQueue entity, sets the date and returns the created entity.
252
     *
253
     * @param $event
254
     * @param $payload
255
     *
256
     * @return WebhookQueue
257
     */
258
    public function queueWebhook(Webhook $webhook, $event, $payload, array $serializationGroups = [])
259
    {
260
        $serializedPayload = $this->serializeData($payload, $serializationGroups);
261
262
        $queue = new WebhookQueue();
263
        $queue->setWebhook($webhook);
264
        $queue->setDateAdded(new \DateTime());
265
        $queue->setEvent($event);
266
        $queue->setPayload($serializedPayload);
267
268
        // fire events for when the queues are created
269
        if ($this->dispatcher->hasListeners(WebhookEvents::WEBHOOK_QUEUE_ON_ADD)) {
270
            $webhookQueueEvent = $event = new Events\WebhookQueueEvent($queue, $webhook, true);
0 ignored issues
show
The assignment to $event is dead and can be removed.
Loading history...
271
            $this->dispatcher->dispatch(WebhookEvents::WEBHOOK_QUEUE_ON_ADD, $webhookQueueEvent);
272
        }
273
274
        return $queue;
275
    }
276
277
    /**
278
     * Execute a list of webhooks to their specified endpoints.
279
     *
280
     * @param array|\Doctrine\ORM\Tools\Pagination\Paginator $webhooks
281
     */
282
    public function processWebhooks($webhooks)
283
    {
284
        foreach ($webhooks as $webhook) {
285
            $this->processWebhook($webhook);
286
        }
287
    }
288
289
    /**
290
     * @param WebhookQueue $queue
291
     *
292
     * @return bool
293
     */
294
    public function processWebhook(Webhook $webhook, WebhookQueue $queue = null)
295
    {
296
        // get the webhook payload
297
        $payload = $this->getWebhookPayload($webhook, $queue);
298
299
        // if there wasn't a payload we can stop here
300
        if (empty($payload)) {
301
            return false;
302
        }
303
304
        $start = microtime(true);
305
306
        try {
307
            $response = $this->httpClient->post($webhook->getWebhookUrl(), $payload, $webhook->getSecret());
308
309
            // remove successfully processed queues from the Webhook object so they won't get stored again
310
            foreach ($this->webhookQueueIdList as $queue) {
311
                $webhook->removeQueue($queue);
312
            }
313
314
            $responseBody = $response->getBody()->getContents();
315
            if (!$responseBody) {
316
                $responseBody = null; // Save null value to database
317
            }
318
319
            $responseStatusCode = $response->getStatusCode();
320
321
            $this->addLog($webhook, $response->getStatusCode(), (microtime(true) - $start), $responseBody);
322
323
            // throw an error exception if we don't get a 200 back
324
            if ($responseStatusCode >= 300 || $responseStatusCode < 200) {
325
                // The receiver of the webhook is telling us to stop bothering him with our requests by code 410
326
                if (410 === $responseStatusCode) {
327
                    $this->killWebhook($webhook, 'mautic.webhook.stopped.reason.410');
328
                }
329
330
                throw new \ErrorException($webhook->getWebhookUrl().' returned '.$responseStatusCode);
331
            }
332
        } catch (\Exception $e) {
333
            $message = $e->getMessage();
334
335
            if ($this->isSick($webhook)) {
336
                $this->killWebhook($webhook);
337
                $message .= ' '.$this->translator->trans('mautic.webhook.killed', ['%limit%' => $this->disableLimit]);
338
            }
339
340
            // log any errors but allow the script to keep running
341
            $this->logger->addError($message);
342
343
            // log that the request failed to display it to the user
344
            $this->addLog($webhook, 'N/A', (microtime(true) - $start), $message);
345
346
            return false;
347
        }
348
349
        // Run this on command as well as immediate send because if switched from queue to immediate
350
        // it can have some rows in the queue which will be send in every webhook forever
351
        if (!empty($this->webhookQueueIdList)) {
352
            /** @var \Mautic\WebhookBundle\Entity\WebhookQueueRepository $webhookQueueRepo */
353
            $webhookQueueRepo = $this->getQueueRepository();
354
355
            // delete all the queued items we just processed
356
            $webhookQueueRepo->deleteQueuesById(array_keys($this->webhookQueueIdList));
357
            $queueCount = $webhookQueueRepo->getQueueCountByWebhookId($webhook->getId());
358
359
            // reset the array to blank so none of the IDs are repeated
360
            $this->webhookQueueIdList = [];
361
362
            // if there are still items in the queue after processing we re-process
363
            // WARNING: this is recursive
364
            if ($queueCount > 0) {
365
                $this->processWebhook($webhook);
366
            }
367
        }
368
369
        return true;
370
    }
371
372
    /**
373
     * Look into the history and check if all the responses we care about had failed.
374
     * But let it run for a while after the user modified it. Lets not aggravate the user.
375
     *
376
     * @return bool
377
     */
378
    public function isSick(Webhook $webhook)
379
    {
380
        // Do not mess with the user will! (at least not now)
381
        if ($webhook->wasModifiedRecently()) {
382
            return false;
383
        }
384
385
        $successRadio = $this->getLogRepository()->getSuccessVsErrorStatusCodeRatio($webhook->getId(), $this->disableLimit);
386
387
        // If there are no log rows yet, consider it healthy
388
        if (null === $successRadio) {
389
            return false;
390
        }
391
392
        return !$successRadio;
393
    }
394
395
    /**
396
     * Unpublish the webhook so it will stop emit the requests
397
     * and notify user about it.
398
     *
399
     * @param string $reason
400
     */
401
    public function killWebhook(Webhook $webhook, $reason = 'mautic.webhook.stopped.reason')
402
    {
403
        $webhook->setIsPublished(false);
404
        $this->saveEntity($webhook);
405
406
        $event = new WebhookEvent($webhook, false, $reason);
407
        $this->eventDispatcher->dispatch(WebhookEvents::WEBHOOK_KILL, $event);
408
    }
409
410
    /**
411
     * Add a log for the webhook response HTTP status and save it.
412
     *
413
     * @param int    $statusCode
414
     * @param float  $runtime    in seconds
415
     * @param string $note
416
     */
417
    public function addLog(Webhook $webhook, $statusCode, $runtime, $note = null)
418
    {
419
        $log = new Log();
420
421
        if ($webhook->getId()) {
422
            $log->setWebhook($webhook);
423
            $this->getLogRepository()->removeOldLogs($webhook->getId(), $this->logMax);
424
        }
425
426
        $log->setNote($note);
427
        $log->setRuntime($runtime);
428
        $log->setStatusCode($statusCode);
429
        $log->setDateAdded(new \DateTime());
430
        $webhook->addLog($log);
431
432
        if ($webhook->getId()) {
433
            $this->saveEntity($webhook);
434
        }
435
    }
436
437
    /**
438
     * @return WebhookQueueRepository
439
     */
440
    public function getQueueRepository()
441
    {
442
        return $this->em->getRepository(WebhookQueue::class);
443
    }
444
445
    /**
446
     * @return EventRepository
447
     */
448
    public function getEventRepository()
449
    {
450
        return $this->em->getRepository(Event::class);
451
    }
452
453
    /**
454
     * @return LogRepository
455
     */
456
    public function getLogRepository()
457
    {
458
        return $this->em->getRepository(Log::class);
459
    }
460
461
    /**
462
     * Get the payload from the webhook.
463
     *
464
     * @param WebhookQueue $queue
465
     *
466
     * @return array
467
     */
468
    public function getWebhookPayload(Webhook $webhook, WebhookQueue $queue = null)
469
    {
470
        if ($payload = $webhook->getPayload()) {
471
            return $payload;
472
        }
473
474
        $payload = [];
475
476
        if (self::COMMAND_PROCESS === $this->queueMode) {
477
            $queuesArray = $this->getWebhookQueues($webhook);
478
        } else {
479
            $queuesArray = [isset($queue) ? [$queue] : []];
480
        }
481
482
        /* @var WebhookQueue $queue */
483
        foreach ($queuesArray as $queues) {
484
            foreach ($queues as $queue) {
485
                /** @var \Mautic\WebhookBundle\Entity\Event $event */
486
                $event = $queue->getEvent();
487
                $type  = $event->getEventType();
488
489
                // create new array level for each unique event type
490
                if (!isset($payload[$type])) {
491
                    $payload[$type] = [];
492
                }
493
494
                $queuePayload              = json_decode($queue->getPayload(), true);
495
                $queuePayload['timestamp'] = $queue->getDateAdded()->format('c');
496
497
                // its important to decode the payload form the DB as we re-encode it with the
498
                $payload[$type][] = $queuePayload;
499
500
                // Add to the webhookQueueIdList only if ID exists.
501
                // That means if it was stored to DB and not sent via immediate send.
502
                if ($queue->getId()) {
503
                    $this->webhookQueueIdList[$queue->getId()] = $queue;
504
505
                    // Clear the WebhookQueue entity from memory
506
                    $this->em->detach($queue);
507
                }
508
            }
509
        }
510
511
        return $payload;
512
    }
513
514
    /**
515
     * Get the queues and order by date so we get events.
516
     *
517
     * @return \Doctrine\ORM\Tools\Pagination\Paginator
518
     */
519
    public function getWebhookQueues(Webhook $webhook)
520
    {
521
        /** @var WebhookQueueRepository $queueRepo */
522
        $queueRepo = $this->getQueueRepository();
523
524
        return $queueRepo->getEntities(
525
            [
526
                'iterator_mode' => true,
527
                'start'         => 0,
528
                'limit'         => $this->webhookLimit,
529
                'orderBy'       => $queueRepo->getTableAlias().'.dateAdded',
530
                'orderByDir'    => $this->getEventsOrderbyDir($webhook),
531
                'filter'        => [
532
                    'force' => [
533
                        [
534
                            'column' => 'IDENTITY('.$queueRepo->getTableAlias().'.webhook)',
535
                            'expr'   => 'eq',
536
                            'value'  => $webhook->getId(),
537
                        ],
538
                    ],
539
                ],
540
            ]
541
        );
542
    }
543
544
    /**
545
     * Returns either Webhook's orderbyDir or the value from configuration as default.
546
     *
547
     * @return string
548
     */
549
    public function getEventsOrderbyDir(Webhook $webhook = null)
550
    {
551
        // Try to get the value from Webhook
552
        if ($webhook && $orderByDir = $webhook->getEventsOrderbyDir()) {
553
            return $orderByDir;
554
        }
555
556
        // Use the global config value if it's not set in the Webhook
557
        return $this->eventsOrderByDir;
558
    }
559
560
    /**
561
     * {@inheritdoc}
562
     *
563
     * @param $action
564
     * @param $event
565
     * @param $entity
566
     * @param $isNew
567
     *
568
     * @throws \Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException
569
     */
570
    protected function dispatchEvent($action, &$entity, $isNew = false, SymfonyEvent $event = null)
571
    {
572
        if (!$entity instanceof Webhook) {
573
            throw new MethodNotAllowedHttpException(['Webhook'], 'Entity must be of class Webhook()');
574
        }
575
576
        switch ($action) {
577
            case 'pre_save':
578
                $name = WebhookEvents::WEBHOOK_PRE_SAVE;
579
                break;
580
            case 'post_save':
581
                $name = WebhookEvents::WEBHOOK_POST_SAVE;
582
                break;
583
            case 'pre_delete':
584
                $name = WebhookEvents::WEBHOOK_PRE_DELETE;
585
                break;
586
            case 'post_delete':
587
                $name = WebhookEvents::WEBHOOK_POST_DELETE;
588
                break;
589
            default:
590
                return null;
591
        }
592
593
        if ($this->dispatcher->hasListeners($name)) {
594
            if (empty($event)) {
595
                $event = new WebhookEvent($entity, $isNew);
596
                $event->setEntityManager($this->em);
597
            }
598
            $this->dispatcher->dispatch($name, $event);
599
600
            return $event;
601
        } else {
602
            return null;
603
        }
604
    }
605
606
    /**
607
     * @param       $payload
608
     * @param array $groups
609
     *
610
     * @return mixed|string
611
     */
612
    public function serializeData($payload, $groups = [], array $customExclusionStrategies = [])
613
    {
614
        $context = SerializationContext::create();
615
        if (!empty($groups)) {
616
            $context->setGroups($groups);
617
        }
618
619
        //Only include FormEntity properties for the top level entity and not the associated entities
620
        $context->addExclusionStrategy(
621
            new PublishDetailsExclusionStrategy()
622
        );
623
624
        foreach ($customExclusionStrategies as $exclusionStrategy) {
625
            $context->addExclusionStrategy($exclusionStrategy);
626
        }
627
628
        //include null values
629
        $context->setSerializeNull(true);
630
631
        // serialize the data and send it as a payload
632
        return $this->serializer->serialize($payload, 'json', $context);
633
    }
634
635
    /**
636
     * @return string
637
     */
638
    public function getPermissionBase()
639
    {
640
        return 'webhook:webhooks';
641
    }
642
643
    /**
644
     * Sets all class properties from CoreParametersHelper.
645
     */
646
    private function setConfigProps(CoreParametersHelper $coreParametersHelper)
647
    {
648
        $this->webhookLimit     = (int) $coreParametersHelper->get('webhook_limit', 10);
649
        $this->disableLimit     = (int) $coreParametersHelper->get('webhook_disable_limit', 100);
650
        $this->webhookTimeout   = (int) $coreParametersHelper->get('webhook_timeout', 15);
651
        $this->logMax           = (int) $coreParametersHelper->get('webhook_log_max', 1000);
652
        $this->queueMode        = $coreParametersHelper->get('queue_mode');
653
        $this->eventsOrderByDir = $coreParametersHelper->get('events_orderby_dir', Criteria::ASC);
654
    }
655
}
656