Completed
Push — 2.x-dev-kit ( 8d77e1 )
by
unknown
28:22 queued 25:50
created

AMQPBackendDispatcher::getBackend()   D

Complexity

Conditions 10
Paths 19

Size

Total Lines 32
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 32
rs 4.8196
cc 10
eloc 16
nc 19
nop 1

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/*
4
 * This file is part of the Sonata project.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use PhpAmqpLib\Channel\AMQPChannel;
15
use PhpAmqpLib\Connection\AMQPConnection;
16
use Sonata\NotificationBundle\Exception\BackendNotFoundException;
17
use Sonata\NotificationBundle\Model\MessageInterface;
18
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
19
use ZendDiagnostics\Result\Failure;
20
use ZendDiagnostics\Result\Success;
21
22
/**
23
 * Producer side of the rabbitmq backend.
24
 */
25
class AMQPBackendDispatcher extends QueueBackendDispatcher
26
{
27
    /**
28
     * @var array
29
     */
30
    protected $settings;
31
32
    /**
33
     * @var AMQPChannel
34
     */
35
    protected $channel;
36
37
    /**
38
     * @var AMQPConnection
39
     */
40
    protected $connection;
41
42
    /**
43
     * @param array  $settings
44
     * @param array  $queues
45
     * @param string $defaultQueue
46
     * @param array  $backends
47
     */
48
    public function __construct(array $settings, array $queues, $defaultQueue, array $backends)
49
    {
50
        parent::__construct($queues, $defaultQueue, $backends);
51
52
        $this->settings = $settings;
53
    }
54
55
    /**
56
     * @return AMQPChannelannel
57
     */
58
    public function getChannel()
59
    {
60
        if (!$this->channel) {
61
            $this->connection = new AMQPConnection(
0 ignored issues
show
Deprecated Code introduced by
The class PhpAmqpLib\Connection\AMQPConnection has been deprecated.

This class, trait or interface has been deprecated.

Loading history...
62
                $this->settings['host'],
63
                $this->settings['port'],
64
                $this->settings['user'],
65
                $this->settings['pass'],
66
                $this->settings['vhost']
67
            );
68
69
            $this->channel = $this->connection->channel();
70
71
            register_shutdown_function(array($this, 'shutdown'));
72
        }
73
74
        return $this->channel;
75
    }
76
77
    /**
78
     * {@inheritdoc}
79
     */
80
    public function getBackend($type)
81
    {
82
        $default = null;
83
84
        if (count($this->queues) === 0) {
85
            foreach ($this->backends as $backend) {
86
                if ($backend['type'] === 'default') {
87
                    return $backend['backend'];
88
                }
89
            }
90
        }
91
92
        foreach ($this->backends as $backend) {
93
            if ('all' === $type && $backend['type'] === '') {
94
                return $backend['backend'];
95
            }
96
97
            if ($backend['type'] === $type) {
98
                return $backend['backend'];
99
            }
100
101
            if ($backend['type'] === $this->defaultQueue) {
102
                $default = $backend['backend'];
103
            }
104
        }
105
106
        if ($default === null) {
107
            throw new BackendNotFoundException('Could not find a message backend for the type '.$type);
108
        }
109
110
        return $default;
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    public function getIterator()
117
    {
118
        throw new \RuntimeException('You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.');
119
    }
120
121
    /**
122
     * {@inheritdoc}
123
     */
124
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
125
    {
126
        throw new \RuntimeException('You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.');
127
    }
128
129
    /**
130
     * {@inheritdoc}
131
     */
132
    public function getStatus()
133
    {
134
        try {
135
            $this->getChannel();
136
            $output = $this->getApiQueueStatus();
137
            $checked = 0;
138
            $missingConsumers = array();
139
140
            foreach ($this->queues as $queue) {
141
                foreach ($output as $q) {
142
                    if ($q['name'] === $queue['queue']) {
143
                        ++$checked;
144
                        if ($q['consumers'] === 0) {
145
                            $missingConsumers[] = $queue['queue'];
146
                        }
147
                    }
148
                }
149
            }
150
151
            if ($checked !== count($this->queues)) {
152
                return new Failure('Not all queues for the available notification types registered in the rabbitmq broker. Are the consumer commands running?');
153
            }
154
155
            if (count($missingConsumers) > 0) {
156
                return new Failure('There are no rabbitmq consumers running for the queues: '.implode(', ', $missingConsumers));
157
            }
158
        } catch (\Exception $e) {
159
            return new Failure($e->getMessage());
160
        }
161
162
        return new Success('Channel is running (RabbitMQ) and consumers for all queues available.');
163
    }
164
165
    /**
166
     * Calls the rabbitmq management api /api/<vhost>/queues endpoint to list the available queues.
167
     *
168
     * @see http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
169
     *
170
     * @return array
171
     */
172
    protected function getApiQueueStatus()
173
    {
174
        if (class_exists('Guzzle\Http\Client') === false) {
175
            throw new \RuntimeException('The guzzle http client library is required to run rabbitmq health checks. Make sure to add guzzle/guzzle to your composer.json.');
176
        }
177
178
        $client = new \Guzzle\Http\Client();
179
        $client->setConfig(array('curl.options' => array(CURLOPT_CONNECTTIMEOUT_MS => 3000)));
180
        $request = $client->get(sprintf('%s/queues', $this->settings['console_url']));
181
        $request->setAuth($this->settings['user'], $this->settings['pass']);
182
183
        return json_decode($request->send()->getBody(true), true);
184
    }
185
186
    /**
187
     * {@inheritdoc}
188
     */
189
    public function cleanup()
190
    {
191
        throw new \RuntimeException('You need to use a specific rabbitmq backend supporting the selected queue to run a consumer.');
192
    }
193
194
    /**
195
     */
196
    public function shutdown()
197
    {
198
        if ($this->channel) {
199
            $this->channel->close();
200
        }
201
202
        if ($this->connection) {
203
            $this->connection->close();
204
        }
205
    }
206
207
    /**
208
     * {@inheritdoc}
209
     */
210
    public function initialize()
211
    {
212
    }
213
}
214