Completed
Pull Request — master (#37)
by Peter
08:32
created

Consumer::getSubscribedEvents()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 8
ccs 0
cts 4
cp 0
rs 9.4285
c 1
b 0
f 0
cc 1
eloc 5
nc 1
nop 0
crap 2
1
<?php
2
3
namespace TreeHouse\QueueBundle\Consumer;
4
5
use Symfony\Component\Console\Output\OutputInterface;
6
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
7
use TreeHouse\Queue\Consumer\ConsumerInterface;
8
use TreeHouse\Queue\Event\ConsumeEvent;
9
use TreeHouse\Queue\Event\ConsumeExceptionEvent;
10
use TreeHouse\Queue\QueueEvents;
11
use TreeHouse\QueueBundle\Consumer\Limiter\LimiterInterface;
12
use TreeHouse\QueueBundle\Consumer\Limiter\LimitReachedException;
13
14
class Consumer implements EventSubscriberInterface
15
{
16
    /**
17
     * @var ConsumerInterface
18
     */
19
    private $consumer;
20
21
    /**
22
     * @var OutputInterface
23
     */
24
    private $output;
25
26
    /**
27
     * @var string
28
     */
29
    private $consumerTag;
30
31
    /**
32
     * @var LimiterInterface[]
33
     */
34
    private $limiters = [];
35
36
    /**
37
     * @var int
38
     */
39
    private $startTime;
40
41
    /**
42
     * @var int
43
     */
44
    private $minDuration = 15;
45
46
    /**
47
     * @var int
48
     */
49
    private $processed = 0;
50
51
    /**
52
     * @var int
53
     */
54
    private $batchSize = 25;
55
56
    /**
57
     * @var int
58
     */
59
    private $coolDownTime = 0;
60
61
    /**
62
     * @param ConsumerInterface $consumer
63
     * @param OutputInterface   $output
64
     * @param string           $consumerTag
65
     */
66
    public function __construct(ConsumerInterface $consumer, OutputInterface $output, $consumerTag = null)
67
    {
68
        $this->consumer = $consumer;
69
        $this->output = $output;
70
        $this->consumerTag = $consumerTag;
71
    }
72
73
    /**
74
     * @inheritdoc
75
     */
76
    public static function getSubscribedEvents()
77
    {
78
        return [
79
            QueueEvents::CONSUME_MESSAGE => 'onConsumeMessage',
80
            QueueEvents::CONSUMED_MESSAGE => 'onMessageConsumed',
81
            QueueEvents::CONSUME_EXCEPTION => 'onConsumeException',
82
        ];
83
    }
84
85
    /**
86
     * @param LimiterInterface $limiter
87
     */
88
    public function addLimiter(LimiterInterface $limiter)
89
    {
90
        $this->limiters[] = $limiter;
91
    }
92
93
    /**
94
     * @param int $duration
95
     *
96
     * @return $this
97
     */
98
    public function mustRunFor($duration)
99
    {
100
        $this->minDuration = $duration;
101
102
        return $this;
103
    }
104
105
    /**
106
     * @param int $batchSize
107
     *
108
     * @return $this
109
     */
110
    public function flushAfter($batchSize)
111
    {
112
        $this->batchSize = $batchSize;
113
114
        return $this;
115
    }
116
117
    /**
118
     * @param int $coolDownTime
119
     *
120
     * @return $this
121
     */
122
    public function waitBetweenMessages($coolDownTime)
123
    {
124
        $this->coolDownTime = $coolDownTime;
125
126
        return $this;
127
    }
128
129
    /**
130
     * @return int
131
     */
132
    public function getProcessed()
133
    {
134
        return $this->processed;
135
    }
136
137
    /**
138
     * @return int
139
     */
140
    public function getStartTime()
141
    {
142
        return $this->startTime;
143
    }
144
145
    /**
146
     * @return int
147
     */
148
    public function getDuration()
149
    {
150
        return time() - $this->startTime;
151
    }
152
153
    /**
154
     * @throws \Exception
155
     */
156
    public function consume()
157
    {
158
        $this->consumer->getEventDispatcher()->addSubscriber($this);
159
160
        $this->startTime = time();
161
162
        try {
163
            $this->consumer->consume($this->consumerTag);
164
165
            $this->shutdown();
166
        } catch (\Exception $e) {
167
            $this->output->writeln(
168
                sprintf('Uncaught %s thrown by consumer, shutting down gracefully', get_class($e)),
169
                OutputInterface::VERBOSITY_VERBOSE
170
            );
171
172
            $this->shutdown();
173
174
            throw $e;
175
        }
176
    }
177
178
    /**
179
     * @param ConsumeEvent $event
180
     */
181
    public function onConsumeMessage(ConsumeEvent $event)
182
    {
183
        $envelope = $event->getEnvelope();
184
        $fullPayload = $this->output->getVerbosity() > OutputInterface::VERBOSITY_VERBOSE;
185
186
        $this->output->writeln(
187
            sprintf(
188
                '<comment>[%s]</comment> Processing payload <info>%s</info>',
189
                $envelope->getDeliveryTag(),
190
                $this->getPayloadOutput($envelope->getBody(), $fullPayload)
191
            )
192
        );
193
    }
194
195
    /**
196
     * @param ConsumeEvent $event
197
     */
198
    public function onMessageConsumed(ConsumeEvent $event)
199
    {
200
        $envelope = $event->getEnvelope();
201
202
        $this->output->writeln(
203
            sprintf(
204
                '<comment>[%s]</comment> processed with result: <info>%s</info>',
205
                $envelope->getDeliveryTag(),
206
                json_encode($event->getResult())
207
            )
208
        );
209
210
        // see if batch is completed
211
        if (++$this->processed % $this->batchSize === 0) {
212
            $this->flush();
213
        }
214
215
        try {
216
            foreach ($this->limiters as $limiter) {
217
                $limiter->limitReached($this);
218
            }
219
        } catch (LimitReachedException $e) {
220
            $this->output->writeln(
221
                $e->getMessage(),
222
                OutputInterface::VERBOSITY_VERBOSE
223
            );
224
225
            $event->stopConsuming();
226
        }
227
228
        // cool down
229
        usleep($this->coolDownTime);
230
    }
231
232
    /**
233
     * @param ConsumeExceptionEvent $event
234
     */
235
    public function onConsumeException(ConsumeExceptionEvent $event)
236
    {
237
        $envelope = $event->getEnvelope();
238
        $exception = $event->getException();
239
240
        $this->output->writeln(
241
            sprintf(
242
                '<comment>[%s]</comment> raised <info>%s</info>: <error>"%s"</error>',
243
                $envelope->getDeliveryTag(),
244
                get_class($exception),
245
                $exception->getMessage()
246
            )
247
        );
248
    }
249
250
    /**
251
     * @param string $payload
252
     * @param bool   $fullPayload
253
     *
254
     * @return string
255
     */
256
    private function getPayloadOutput($payload, $fullPayload = false)
257
    {
258
        if ($fullPayload === true) {
259
            return $payload;
260
        }
261
262
        $maxWidth = 100;
263
        if (mb_strwidth($payload, 'utf8') > $maxWidth) {
264
            $payload = mb_substr($payload, 0, $maxWidth - 10) . '...';
265
        }
266
267
        return $payload;
268
    }
269
270
    /**
271
     * Dispatches flush event.
272
     */
273
    private function flush()
274
    {
275
        $this->output->writeln(
276
            'Batch completed, flushing',
277
            OutputInterface::VERBOSITY_VERBOSE
278
        );
279
280
        $this->consumer->getEventDispatcher()->dispatch(QueueEvents::CONSUME_FLUSH);
281
    }
282
283
    /**
284
     * Shutdown procedure
285
     */
286
    private function shutdown()
287
    {
288
        $this->output->writeln('Shutting down consumer');
289
290
        // flush remaining changes
291
        $this->flush();
292
293
        // cancel the subscription with the queue
294
        $this->consumer->cancel($this->consumerTag);
295
296
        // make sure consumer doesn't quit to quickly, or supervisor will mark it as a failed restart,
297
        // and putting the process in FATAL state.
298
        $duration = $this->getDuration();
299
        if ($duration < $this->minDuration) {
300
            $remaining = $this->minDuration - $duration;
301
302
            $this->output->writeln(
303
                sprintf('Sleeping for %d seconds so consumer has run for %d seconds', $remaining, $this->minDuration),
304
                OutputInterface::VERBOSITY_VERBOSE
305
            );
306
307
            sleep($remaining);
308
        }
309
    }
310
}
311