ConsumerService::run()   B
last analyzed

Complexity

Conditions 7
Paths 28

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 29
rs 8.5226
c 0
b 0
f 0
cc 7
nc 28
nop 1
1
<?php
2
3
namespace Mouf\AmqpClient;
4
5
use Mouf\AmqpClient\Objects\Queue;
6
7
/**
8
 * Function to consume RabbitMq queue.
9
 *
10
 * @author Marc Teyssier
11
 */
12
class ConsumerService
13
{
14
    /**
15
     * @param Client $client
16
     */
17
    private $client;
18
19
    /**
20
     * @param Queue[] $queues
21
     */
22
    private $queues;
23
24
    /**
25
     * @param Client  $client
26
     * @param Queue[] $queues List of queue to listen. If empty, all queues from the client will be listened
27
     */
28
    public function __construct(Client $client, array $queues = [])
29
    {
30
        $this->client = $client;
31
        $this->queues = $queues;
32
    }
33
34
    /**
35
     * Call this function in your script to consume the RabbitMq queue.
36
     */
37
    public function run($nonBlocking = false)
38
    {
39
        if (empty($this->queues)) {
40
            $queues = $this->client->getQueues();
41
        } else {
42
            $queues = $this->queues;
43
        }
44
45
        foreach ($queues as $queue) {
46
            /* @var Queue $queue */
47
            $queue->consume();
48
        }
49
50
        $channel = $this->client->getChannel();
51
        if ($nonBlocking) {
52
            if (count($channel->callbacks)) {
53
                $channel->wait(null, true);
54
            }
55
        } else {
56
            while (count($channel->callbacks)) {
57
                $channel->wait();
58
            }
59
        }
60
61
        foreach ($queues as $queue) {
62
            /* @var Queue $queue */
63
            $queue->cancelConsume();
64
        }
65
    }
66
}
67