Completed
Pull Request — master (#1)
by Daniel
12:19
created

Queue::getQueuebag()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\Queue;
4
5
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPChannel;
6
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnection;
7
use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
8
use Cmobi\RabbitmqBundle\Connection\Exception\InvalidAMQPChannelException;
9
use Psr\Log\LoggerInterface;
10
11
class Queue implements QueueInterface
12
{
13
    private $connectionManager;
14
    private $connection;
15
    private $channel;
16
    private $queueBag;
17
    private $callback;
18
    private $logger;
19
20
    public function __construct(ConnectionManager $connectionManager, QueueBagInterface $queueBag, LoggerInterface $logger)
21
    {
22
        $this->connectionManager = $connectionManager;
23
        $this->connection = $this->getConnectionManager()->getConnection();
24
        $this->queueBag = $queueBag;
25
        $this->logger = $logger;
26
    }
27
28
    /**
29
     * @return CmobiAMQPChannel
30
     * @throws InvalidAMQPChannelException
31
     */
32
    protected function getChannel()
33
    {
34
        if ($this->channel instanceof CmobiAMQPChannel) {
35
            return $this->channel;
36
        }
37
        $this->channel = $this->getConnection()->channel();
38
39
        if (! $this->channel instanceof CmobiAMQPChannel) {
40
            throw new InvalidAMQPChannelException('Failed get AMQPChannel');
41
        }
42
43
        return $this->channel;
44
    }
45
46
    protected function createQueue()
47
    {
48
        $queueBag = $this->getQueuebag();
49
50
        $this->getChannel()->basic_qos(null, $queueBag->getBasicQos(), null);
0 ignored issues
show
Documentation introduced by
null is of type null, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
51
52
        if ($queueBag->getExchangeDeclare()) {
53
            $this->getChannel()->exchangeDeclare($queueBag->getExchangeDeclare());
54
            list ($queueName, , ) = $this->getChannel()->queueDeclare($queueBag->getQueueDeclare());
55
            $this->getChannel()->queue_bind($queueName, $queueBag->getExchange());
56
        } else {
57
            $this->getChannel()->queueDeclare($queueBag->getQueueDeclare());
58
        }
59
        $this->getChannel()->basicConsume($queueBag->getQueueConsume(), $this->getCallback()->toClosure());
60
    }
61
62
    /**
63
     * Declare and start queue in broker
64
     */
65
    public function start()
66
    {
67
        $this->createQueue();
68
69
        while(count($this->getChannel()->callbacks)) {
70
            try {
71
                $this->getChannel()->wait();
72
            } catch (\Exception $e) {
73
                $this->logger->error($e->getMessage());
74
                $this->forceReconnect();
75
76
                continue;
77
            }
78
        }
79
        $connection = $this->getChannel()->getConnection();
80
        $this->getChannel()->close();
81
        $connection->close();
82
    }
83
84
    /**
85
     * @return QueueBagInterface
86
     */
87
    public function getQueuebag()
88
    {
89
        return $this->queueBag;
90
    }
91
92
    /**
93
     * @param QueueCallbackInterface $callback
94
     */
95
    public function setCallback(QueueCallbackInterface $callback)
96
    {
97
        $this->callback = $callback;
98
    }
99
100
    /**
101
     * @return QueueCallbackInterface
102
     */
103
    public function getCallback()
104
    {
105
        return $this->callback;
106
    }
107
108
    /**
109
     * @return ConnectionManager
110
     */
111
    public function getConnectionManager()
112
    {
113
        return $this->connectionManager;
114
    }
115
116
    /**
117
     * @return CmobiAMQPConnection
118
     */
119
    public function getConnection()
120
    {
121
        return $this->connection;
122
    }
123
124
    /**
125
     * Retry connect to message broker until it can.
126
     */
127
    /**
128
     * @param CmobiAMQPConnection|null $connection
129
     * @return CmobiAMQPChannel
130
     */
131
    public function forceReconnect(CmobiAMQPConnection $connection = null)
132
    {
133
        do {
134
            try {
135
                $failed = false;
136
                $this->logger->warning('forceReconnect() - trying connect...');
137
                $this->connection = $this->getConnectionManager()->getConnection();
138
                $this->channel = $this->getConnection()->channel();
139
                $this->createQueue();
140
            } catch (\Exception $e) {
141
                $failed = true;
142
                sleep(3);
143
                $this->logger->error('forceReconnect() - ' . $e->getMessage());
144
            }
145
        } while ($failed);
146
        $this->logger->warning('forceReconnect() - connected!');
147
148
        return $this->channel;
149
    }
150
}