Completed
Pull Request — master (#10)
by Marc
03:01
created

Consumer   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 40
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Importance

Changes 0
Metric Value
wmc 5
lcom 1
cbo 1
dl 0
loc 40
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A consume() 0 10 3
A purge() 0 4 1
1
<?php
2
namespace Cmp\Queues\Domain\Task;
3
4
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
5
use Cmp\Queues\Domain\Queue\QueueReader;
6
7
class Consumer
8
{
9
    /**
10
     * @var QueueReader
11
     */
12
    protected $queueReader;
13
14
    /**
15
     * Consumer constructor.
16
     * @param QueueReader $queueReader
17
     */
18
    public function __construct(QueueReader $queueReader)
19
    {
20
        $this->queueReader = $queueReader;
21
    }
22
23
    /**
24
     * Consumes tasks indefinitely in a blocking manner
25
     * @param callable $callback Callable that'll be invoked when a message is received
26
     * @param int      $timeout (optional) If specified, the process will block a max of $timeout seconds. Indefinitely if 0
27
     */
28
    public function consume(callable $callback, $timeout=0)
29
    {
30
        while(true) {
31
            try {
32
                $this->queueReader->read($callback, $timeout);
33
            } catch(TimeoutReaderException $e) {
34
                break;
35
            }
36
        }
37
    }
38
39
    /**
40
     * Purges all messages from the queue
41
     */
42
    public function purge()
43
    {
44
        $this->queueReader->purge();
45
    }
46
}