Completed
Push — master ( 306a32...5392c5 )
by Peter
08:07
created

Consumer::shutdown()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 24
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 24
ccs 0
cts 12
cp 0
rs 8.9713
cc 2
eloc 11
nc 2
nop 0
crap 6
1
<?php
2
3
namespace TreeHouse\QueueBundle\Consumer;
4
5
use Symfony\Component\Console\Output\OutputInterface;
6
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
7
use TreeHouse\Queue\Consumer\ConsumerInterface;
8
use TreeHouse\Queue\Event\ConsumeEvent;
9
use TreeHouse\Queue\Event\ConsumeExceptionEvent;
10
use TreeHouse\Queue\QueueEvents;
11
use TreeHouse\QueueBundle\Consumer\Limiter\LimiterInterface;
12
use TreeHouse\QueueBundle\Consumer\Limiter\LimitReachedException;
13
14
class Consumer implements EventSubscriberInterface
15
{
16
    /**
17
     * @var ConsumerInterface
18
     */
19
    private $consumer;
20
21
    /**
22
     * @var OutputInterface
23
     */
24
    private $output;
25
26
    /**
27
     * @var string
28
     */
29
    private $consumerTag;
30
31
    /**
32
     * @var LimiterInterface[]
33
     */
34
    private $limiters = [];
35
36
    /**
37
     * @var int
38
     */
39
    private $startTime;
40
41
    /**
42
     * @var int
43
     */
44
    private $minDuration = 15;
45
46
    /**
47
     * @var int
48
     */
49
    private $processed = 0;
50
51
    /**
52
     * @var int
53
     */
54
    private $batchSize = 25;
55
56
    /**
57
     * @var int
58
     */
59
    private $coolDownTime = 0;
60
61
    /**
62
     * @param ConsumerInterface $consumer
63
     * @param OutputInterface   $output
64
     * @param string           $consumerTag
65
     */
66
    public function __construct(ConsumerInterface $consumer, OutputInterface $output, $consumerTag = null)
67
    {
68
        $this->consumer = $consumer;
69
        $this->output = $output;
70
        $this->consumerTag = $consumerTag;
71
72
        $this->consumer->getEventDispatcher()->addSubscriber($this);
73
    }
74
75
    /**
76
     * @inheritdoc
77
     */
78
    public static function getSubscribedEvents()
79
    {
80
        return [
81
            QueueEvents::CONSUME_MESSAGE => 'onConsumeMessage',
82
            QueueEvents::CONSUMED_MESSAGE => 'onMessageConsumed',
83
            QueueEvents::CONSUME_EXCEPTION => 'onConsumeException',
84
        ];
85
    }
86
87
    /**
88
     * @param LimiterInterface $limiter
89
     */
90
    public function addLimiter(LimiterInterface $limiter)
91
    {
92
        $this->limiters[] = $limiter;
93
    }
94
95
    /**
96
     * @param int $duration
97
     *
98
     * @return $this
99
     */
100
    public function mustRunFor($duration)
101
    {
102
        $this->minDuration = $duration;
103
104
        return $this;
105
    }
106
107
    /**
108
     * @param int $batchSize
109
     *
110
     * @return $this
111
     */
112
    public function flushAfter($batchSize)
113
    {
114
        $this->batchSize = $batchSize;
115
116
        return $this;
117
    }
118
119
    /**
120
     * @param int $coolDownTime
121
     *
122
     * @return $this
123
     */
124
    public function waitBetweenMessages($coolDownTime)
125
    {
126
        $this->coolDownTime = $coolDownTime;
127
128
        return $this;
129
    }
130
131
    /**
132
     * @return int
133
     */
134
    public function getProcessed()
135
    {
136
        return $this->processed;
137
    }
138
139
    /**
140
     * @return int
141
     */
142
    public function getStartTime()
143
    {
144
        return $this->startTime;
145
    }
146
147
    /**
148
     * @return int
149
     */
150
    public function getDuration()
151
    {
152
        return time() - $this->startTime;
153
    }
154
155
    /**
156
     * @throws \Exception
157
     */
158
    public function consume()
159
    {
160
        $this->startTime = time();
161
162
        try {
163
            $this->consumer->consume($this->consumerTag);
164
165
            $this->shutdown();
166
        } catch (\Exception $e) {
167
            $this->output->writeln(
168
                sprintf('Uncaught %s thrown by consumer, shutting down gracefully', get_class($e)),
169
                OutputInterface::VERBOSITY_VERBOSE
170
            );
171
172
            $this->shutdown();
173
174
            throw $e;
175
        }
176
    }
177
178
    /**
179
     * @param ConsumeEvent $event
180
     */
181
    public function onConsumeMessage(ConsumeEvent $event)
182
    {
183
        $envelope = $event->getEnvelope();
184
        $fullPayload = $this->output->getVerbosity() > OutputInterface::VERBOSITY_VERBOSE;
185
186
        $this->output->writeln(
187
            sprintf(
188
                '<comment>[%s]</comment> Processing payload <info>%s</info>',
189
                $envelope->getDeliveryTag(),
190
                $this->getPayloadOutput($envelope->getBody(), $fullPayload)
191
            )
192
        );
193
    }
194
195
    /**
196
     * @param ConsumeEvent $event
197
     */
198
    public function onMessageConsumed(ConsumeEvent $event)
199
    {
200
        $envelope = $event->getEnvelope();
201
202
        $this->output->writeln(
203
            sprintf(
204
                '<comment>[%s]</comment> processed with result: <info>%s</info>',
205
                $envelope->getDeliveryTag(),
206
                json_encode($event->getResult())
207
            )
208
        );
209
210
        // see if batch is completed
211
        if (++$this->processed % $this->batchSize === 0) {
212
            $this->flush();
213
        }
214
215
        try {
216
            foreach ($this->limiters as $limiter) {
217
                $limiter->limitReached($this);
218
            }
219
        } catch (LimitReachedException $e) {
220
            $this->output->writeln(
221
                $e->getMessage(),
222
                OutputInterface::VERBOSITY_VERBOSE
223
            );
224
225
            $event->stopConsuming();
226
        }
227
228
        // cool down
229
        usleep($this->coolDownTime);
230
    }
231
232
    /**
233
     * @param ConsumeExceptionEvent $event
234
     */
235
    public function onConsumeException(ConsumeExceptionEvent $event)
236
    {
237
        $envelope = $event->getEnvelope();
238
        $exception = $event->getException();
239
240
        $this->output->writeln(
241
            sprintf(
242
                '<comment>[%s]</comment> raised <info>%s</info>: <error>"%s"</error>',
243
                $envelope->getDeliveryTag(),
244
                get_class($exception),
245
                $exception->getMessage()
246
            )
247
        );
248
    }
249
250
    /**
251
     * @param string $payload
252
     * @param bool   $fullPayload
253
     *
254
     * @return string
255
     */
256
    private function getPayloadOutput($payload, $fullPayload = false)
257
    {
258
        if ($fullPayload === true) {
259
            return $payload;
260
        }
261
262
        $maxWidth = 100;
263
        if (mb_strwidth($payload, 'utf8') > $maxWidth) {
264
            $payload = mb_substr($payload, 0, $maxWidth - 10) . '...';
265
        }
266
267
        return $payload;
268
    }
269
270
    /**
271
     * Dispatches flush event.
272
     */
273
    private function flush()
274
    {
275
        $this->output->writeln(
276
            'Batch completed, flushing',
277
            OutputInterface::VERBOSITY_VERBOSE
278
        );
279
280
        $this->consumer->getEventDispatcher()->dispatch(QueueEvents::CONSUME_FLUSH);
281
    }
282
283
    /**
284
     * Shutdown procedure
285
     */
286
    private function shutdown()
287
    {
288
        $this->output->writeln('Shutting down consumer');
289
290
        // flush remaining changes
291
        $this->flush();
292
293
        // cancel the subscription with the queue
294
        $this->consumer->cancel($this->consumerTag);
295
296
        // make sure consumer doesn't quit to quickly, or supervisor will mark it as a failed restart,
297
        // and putting the process in FATAL state.
298
        $duration = $this->getDuration();
299
        if ($duration < $this->minDuration) {
300
            $remaining = $this->minDuration - $duration;
301
302
            $this->output->writeln(
303
                sprintf('Sleeping for %d seconds so consumer has run for %d seconds', $remaining, $this->minDuration),
304
                OutputInterface::VERBOSITY_VERBOSE
305
            );
306
307
            sleep($remaining);
308
        }
309
    }
310
}
311