Queue::createQueue()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 15
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 10
nc 2
nop 0
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\Queue;
4
5
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPChannel;
6
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnection;
7
use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
8
use Cmobi\RabbitmqBundle\Connection\Exception\InvalidAMQPChannelException;
9
use Psr\Log\LoggerInterface;
10
11
class Queue implements QueueInterface
12
{
13
    private $connectionManager;
14
    private $connection;
15
    private $connectionName;
16
    private $channel;
17
    private $queueBag;
18
    private $errOutput;
19
    private $logOutput;
20
    private $callback;
21
22
    public function __construct(
23
        ConnectionManager $connectionManager,
24
        QueueBagInterface $queueBag,
25
        $connectionName = 'default',
26
        QueueCallbackInterface $callback = null
27
    )
28
    {
29
        $this->connectionManager = $connectionManager;
30
        $this->connectionName = $connectionName;
31
        $this->connection = $this->getConnectionManager()->getConnection($connectionName);
32
        $this->queueBag = $queueBag;
33
        $this->errOutput = fopen('php://stderr', 'a+');
34
        $this->logOutput = fopen('php://stdout', 'a+');
35
        $this->callback = $callback;
36
    }
37
38
    /**
39
     * @return CmobiAMQPChannel
40
     *
41
     * @throws InvalidAMQPChannelException
42
     */
43
    protected function getChannel()
44
    {
45
        if ($this->channel instanceof CmobiAMQPChannel) {
46
            return $this->channel;
47
        }
48
        $this->channel = $this->getConnection()->channel();
49
50
        if (!$this->channel instanceof CmobiAMQPChannel) {
51
            throw new InvalidAMQPChannelException('Failed get AMQPChannel');
52
        }
53
54
        return $this->channel;
55
    }
56
57
    protected function createQueue()
58
    {
59
        $queueBag = $this->getQueuebag();
60
61
        $this->getChannel()->basic_qos(null, $queueBag->getBasicQos(), null);
0 ignored issues
show
Documentation introduced by
null is of type null, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
62
63
        if ($queueBag->getExchangeDeclare()) {
64
            $this->getChannel()->exchangeDeclare($queueBag->getExchangeDeclare());
65
            list($queueName) = $this->getChannel()->queueDeclare($queueBag->getQueueDeclare());
66
            $this->getChannel()->queue_bind($queueName, $queueBag->getExchange());
67
        } else {
68
            $this->getChannel()->queueDeclare($queueBag->getQueueDeclare());
69
        }
70
        $this->getChannel()->basicConsume($queueBag->getQueueConsume(), $this->getCallback()->toClosure());
71
    }
72
73
    /**
74
     * Declare and start queue in broker.
75
     */
76
    public function start()
77
    {
78
        $this->createQueue();
79
80
        while (count($this->getChannel()->callbacks)) {
81
            try {
82
                $this->getChannel()->wait();
83
            } catch (\Exception $e) {
84
                fwrite($this->errOutput, $e->getMessage());
85
                $this->forceReconnect();
86
87
                continue;
88
            }
89
        }
90
        $connection = $this->getChannel()->getConnection();
91
        $this->getChannel()->close();
92
        $connection->close();
93
    }
94
95
    /**
96
     * @return QueueBagInterface
97
     */
98
    public function getQueuebag()
99
    {
100
        return $this->queueBag;
101
    }
102
103
    /**
104
     * @param QueueCallbackInterface $callback
105
     */
106
    public function setCallback(QueueCallbackInterface $callback)
107
    {
108
        $this->callback = $callback;
109
    }
110
111
    /**
112
     * @return QueueCallbackInterface
113
     */
114
    public function getCallback()
115
    {
116
        return $this->callback;
117
    }
118
119
    /**
120
     * @return ConnectionManager
121
     */
122
    public function getConnectionManager()
123
    {
124
        return $this->connectionManager;
125
    }
126
127
    /**
128
     * @return CmobiAMQPConnection
129
     */
130
    public function getConnection()
131
    {
132
        return $this->connection;
133
    }
134
135
    /**
136
     * Retry connect to message broker until it can.
137
     */
138
    /**
139
     * @param CmobiAMQPConnection|null $connection
140
     *
141
     * @return CmobiAMQPChannel
142
     */
143
    public function forceReconnect(CmobiAMQPConnection $connection = null)
144
    {
145
        do {
146
            try {
147
                $failed = false;
148
                fwrite($this->logOutput, 'start Queue::forceReconnect() - trying connect...' . PHP_EOL);
149
                $this->connection = $this->getConnectionManager()->getConnection($this->connectionName);
150
                $this->channel = $this->getConnection()->channel();
151
                $this->createQueue();
152
            } catch (\Exception $e) {
153
                $failed = true;
154
                sleep(3);
155
                fwrite($this->errOutput, 'failed Queue::forceReconnect() - ' . $e->getMessage() . PHP_EOL);
156
            }
157
        } while ($failed);
158
        fwrite($this->logOutput, 'Queue::forceReconnect() - connected!' . PHP_EOL);
159
160
        return $this->channel;
161
    }
162
}
163