Consumer::consume()   A
last analyzed

Complexity

Conditions 4
Paths 4

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 12
rs 9.8666
c 0
b 0
f 0
cc 4
nc 4
nop 2
1
<?php
2
namespace Cmp\Queues\Domain\Task;
3
4
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
5
use Cmp\Queues\Domain\Queue\Exception\GracefulStopException;
6
use Cmp\Queues\Domain\Queue\QueueReader;
7
8
class Consumer
9
{
10
    /**
11
     * @var QueueReader
12
     */
13
    protected $queueReader;
14
15
    /**
16
     * Consumer constructor.
17
     * @param QueueReader $queueReader
18
     */
19
    public function __construct(QueueReader $queueReader)
20
    {
21
        $this->queueReader = $queueReader;
22
    }
23
24
    /**
25
     * Consumes tasks indefinitely in a blocking manner
26
     * @param callable $callback Callable that'll be invoked when a message is received
27
     * @param int      $timeout (optional) If specified, the process will block a max of $timeout seconds. Indefinitely if 0
28
     */
29
    public function consume(callable $callback, $timeout=0)
30
    {
31
        while(true) {
32
            try {
33
                $this->queueReader->read($callback, $timeout);
34
            } catch(TimeoutReaderException $e) {
35
                break;
36
            } catch(GracefulStopException $e) {
37
                break;
38
            }
39
        }
40
    }
41
42
    /**
43
     * Purges all messages from the queue
44
     */
45
    public function purge()
46
    {
47
        $this->queueReader->purge();
48
    }
49
}