Completed
Push — master ( 82de2a...ab45a8 )
by Grégoire
11s
created

src/Command/ConsumerHandlerCommand.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Sonata Project package.
7
 *
8
 * (c) Thomas Rabaix <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Sonata\NotificationBundle\Command;
15
16
use Sonata\NotificationBundle\Backend\BackendInterface;
17
use Sonata\NotificationBundle\Backend\QueueDispatcherInterface;
18
use Sonata\NotificationBundle\Consumer\ConsumerInterface;
19
use Sonata\NotificationBundle\Event\IterateEvent;
20
use Sonata\NotificationBundle\Exception\HandlingException;
21
use Sonata\NotificationBundle\Model\MessageInterface;
22
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
23
use Symfony\Component\Console\Input\InputInterface;
24
use Symfony\Component\Console\Input\InputOption;
25
use Symfony\Component\Console\Output\OutputInterface;
26
27
class ConsumerHandlerCommand extends ContainerAwareCommand
0 ignored issues
show
Deprecated Code introduced by
The class Symfony\Bundle\Framework...d\ContainerAwareCommand has been deprecated with message: since Symfony 4.2, use {@see Command} instead.

This class, trait or interface has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the type will be removed from the class and what other constant to use instead.

Loading history...
28
{
29
    /**
30
     * {@inheritdoc}
31
     */
32
    public function configure(): void
33
    {
34
        $this->setName('sonata:notification:start');
35
        $this->setDescription('Listen for incoming messages');
36
        $this->addOption('iteration', 'i', InputOption::VALUE_OPTIONAL, 'Only run n iterations before exiting', false);
37
        $this->addOption('type', null, InputOption::VALUE_OPTIONAL, 'Use a specific backed based on a message type, "all" with doctrine backend will handle all notifications no matter their type', null);
38
        $this->addOption('show-details', 'd', InputOption::VALUE_OPTIONAL, 'Show consumers return details', true);
39
    }
40
41
    /**
42
     * {@inheritdoc}
43
     */
44
    public function execute(InputInterface $input, OutputInterface $output): void
45
    {
46
        $startDate = new \DateTime();
47
48
        $output->writeln(sprintf('[%s] <info>Checking listeners</info>', $startDate->format('r')));
49
        foreach ($this->getNotificationDispatcher()->getListeners() as $type => $listeners) {
50
            $output->writeln(sprintf(' - %s', $type));
51
            foreach ($listeners as $listener) {
52
                if (!$listener[0] instanceof ConsumerInterface) {
53
                    throw new \RuntimeException(sprintf(
54
                        'The registered service does not implement the ConsumerInterface (class=%s',
55
                        \get_class($listener[0])
56
                    ));
57
                }
58
59
                $output->writeln(sprintf('   > %s', \get_class($listener[0])));
60
            }
61
        }
62
63
        $type = $input->getOption('type');
64
        $showDetails = $input->getOption('show-details');
65
66
        $output->write(sprintf('[%s] <info>Retrieving backend</info> ...', $startDate->format('r')));
67
        $backend = $this->getBackend($type);
68
69
        $output->writeln('');
70
        $output->write(sprintf('[%s] <info>Initialize backend</info> ...', $startDate->format('r')));
71
72
        // initialize the backend
73
        $backend->initialize();
74
75
        $output->writeln(' done!');
76
77
        if (null === $type) {
78
            $output->writeln(sprintf(
79
                '[%s] <info>Starting the backend handler</info> - %s',
80
                $startDate->format('r'),
81
                \get_class($backend)
82
            ));
83
        } else {
84
            $output->writeln(sprintf(
85
                '[%s] <info>Starting the backend handler</info> - %s (type: %s)',
86
                $startDate->format('r'),
87
                \get_class($backend),
88
                $type
89
            ));
90
        }
91
92
        $startMemoryUsage = memory_get_usage(true);
93
        $i = 0;
94
        $iterator = $backend->getIterator();
95
        foreach ($iterator as $message) {
96
            ++$i;
97
98
            if (!$message instanceof MessageInterface) {
99
                throw new \RuntimeException('The iterator must return a MessageInterface instance');
100
            }
101
102
            if (!$message->getType()) {
103
                $output->write('<error>Skipping : no type defined </error>');
104
105
                continue;
106
            }
107
108
            $date = new \DateTime();
109
            $output->write(sprintf('[%s] <info>%s</info> #%s: ', $date->format('r'), $message->getType(), $i));
110
            $memoryUsage = memory_get_usage(true);
111
112
            try {
113
                $start = microtime(true);
114
                $returnInfos = $backend->handle($message, $this->getNotificationDispatcher());
115
116
                $currentMemory = memory_get_usage(true);
117
118
                $output->writeln(sprintf('<comment>OK! </comment> - %0.04fs, %ss, %s, %s - %s = %s, %0.02f%%',
119
                    microtime(true) - $start,
120
                    $date->format('U') - $message->getCreatedAt()->format('U'),
121
                    $this->formatMemory($currentMemory - $memoryUsage),
122
                    $this->formatMemory($currentMemory),
123
                    $this->formatMemory($startMemoryUsage),
124
                    $this->formatMemory($currentMemory - $startMemoryUsage),
125
                    ($currentMemory - $startMemoryUsage) / $startMemoryUsage * 100
126
                ));
127
128
                if ($showDetails && null !== $returnInfos) {
129
                    $output->writeln($returnInfos->getReturnMessage());
130
                }
131
            } catch (HandlingException $e) {
132
                $output->writeln(sprintf('<error>KO! - %s</error>', $e->getPrevious()->getMessage()));
133
            } catch (\Exception $e) {
134
                $output->writeln(sprintf('<error>KO! - %s</error>', $e->getMessage()));
135
            }
136
137
            $this->getEventDispatcher()->dispatch(
138
                IterateEvent::EVENT_NAME,
139
                new IterateEvent($iterator, $backend, $message)
140
            );
141
142
            if ($input->getOption('iteration') && $i >= (int) $input->getOption('iteration')) {
143
                $output->writeln('End of iteration cycle');
144
145
                return;
146
            }
147
        }
148
    }
149
150
    /**
151
     * @param string $type
152
     * @param string $backend
153
     *
154
     * @throws \RuntimeException
155
     */
156
    protected function throwTypeNotFoundException($type, $backend): void
157
    {
158
        throw new \RuntimeException(
159
            "The requested backend for the type '".$type." 'does not exist. \nMake sure the backend '".
160
            \get_class($backend)."' \nsupports multiple queues and the routing_key is defined. (Currently rabbitmq only)"
161
        );
162
    }
163
164
    /**
165
     * @param $memory
166
     *
167
     * @return string
168
     */
169
    private function formatMemory($memory)
170
    {
171
        if ($memory < 1024) {
172
            return $memory.'b';
173
        } elseif ($memory < 1048576) {
174
            return round($memory / 1024, 2).'Kb';
175
        }
176
177
        return round($memory / 1048576, 2).'Mb';
178
    }
179
180
    /**
181
     * @param string $type
182
     *
183
     * @return BackendInterface
184
     */
185
    private function getBackend($type = null)
186
    {
187
        $backend = $this->getContainer()->get('sonata.notification.backend');
188
189
        if ($type && !array_key_exists($type, $this->getNotificationDispatcher()->getListeners())) {
190
            throw new \RuntimeException(sprintf('The type `%s` does not exist, available types: %s', $type, implode(', ', array_keys($this->getNotificationDispatcher()->getListeners()))));
191
        }
192
193
        if (null !== $type && !$backend instanceof QueueDispatcherInterface) {
194
            throw new \RuntimeException(sprintf(
195
                'Unable to use the provided type %s with a non QueueDispatcherInterface backend',
196
                $type
197
            ));
198
        }
199
200
        if ($backend instanceof QueueDispatcherInterface) {
201
            return $backend->getBackend($type);
202
        }
203
204
        return $backend;
205
    }
206
207
    /**
208
     * @return EventDispatcherInterface
209
     */
210
    private function getNotificationDispatcher()
211
    {
212
        return $this->getContainer()->get('sonata.notification.dispatcher');
213
    }
214
215
    /**
216
     * @return EventDispatcherInterface
217
     */
218
    private function getEventDispatcher()
219
    {
220
        return $this->getContainer()->get('event_dispatcher');
221
    }
222
}
223