Completed
Pull Request — master (#9)
by Marc
06:42
created

Consumer::purge()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
namespace Cmp\Queues\Domain\Task;
3
4
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
5
use Cmp\Queues\Domain\Queue\QueueReader;
6
7
class Consumer
8
{
9
    /**
10
     * @var QueueReader
11
     */
12
    protected $queueReader;
13
14
    /**
15
     * Consumer constructor.
16
     * @param QueueReader $queueReader
17
     */
18
    public function __construct(QueueReader $queueReader)
19
    {
20
        $this->queueReader = $queueReader;
21
    }
22
23
    /**
24
     * Consumes tasks indefinitely in a blocking manner
25
     * @param callable $callback Callable that'll be invoked when a message is received
26
     * @param int      $timeout (optional) If specified, the process will block a max of $timeout seconds. Indefinitely if 0
27
     */
28
    public function consume(callable $callback, $timeout=0)
29
    {
30
        while(true) {
31
            try {
32
                $this->consumeOnce($callback, $timeout);
33
            } catch(TimeoutReaderException $e) {
34
                break;
35
            }
36
        }
37
    }
38
39
    /**
40
     * Consumes a single task in a blocking manner
41
     * @param callable $callback Callable that'll be invoked when a message is received
42
     * @param int      $timeout The process will block a max of $timeout seconds, or indefinitely if 0
43
     */
44
    public function consumeOnce(callable $callback, $timeout)
45
    {
46
        $this->queueReader->read($callback, $timeout);
47
    }
48
49
    /**
50
     * Purges all messages from the queue
51
     */
52
    public function purge()
53
    {
54
        $this->queueReader->purge();
55
    }
56
}