AmqpAdapter::deleteMessage()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
rs 10
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
/*
4
 * This file is part of HeriJobQueueBundle.
5
 *
6
 * (c) Alexandre Mogère
7
 *
8
 * This source file is subject to the MIT license that is bundled
9
 * with this source code in the file LICENSE.
10
 */
11
12
namespace Heri\Bundle\JobQueueBundle\Adapter;
13
14
use ZendQueue\Adapter\AbstractAdapter;
15
use ZendQueue\Message;
16
use ZendQueue\Queue;
17
use PhpAmqpLib\Connection\AMQPConnection;
18
use PhpAmqpLib\Message\AMQPMessage;
19
use Heri\Bundle\JobQueueBundle\Exception\AdapterRuntimeException;
20
use Heri\Bundle\JobQueueBundle\Exception\MissingConfigurationException;
21
use Heri\Bundle\JobQueueBundle\Exception\UnsupportedMethodCallException;
22
23
/**
24
 * Amqp adapter.
25
 *
26
 * @see ZendQueue\Queue_Adapter_AdapterAbstract
27
 */
28
class AmqpAdapter extends AbstractAdapter implements AdapterInterface
29
{
30
    /**
31
     * @var LoggerInterface
32
     */
33
    public $logger;
34
35
    /**
36
     * @var AMQPConnection
37
     */
38
    protected $connection = null;
39
40
    /**
41
     * @var AMQPConnection
42
     */
43
    protected $channel = null;
44
45
    /**
46
     * @var AMQP_Queue_Exchange
47
     */
48
    protected $exchangeName = null;
49
50
    /**
51
     * @var int count of messages we got last time
52
     */
53
    private $_count;
54
55
    /**
56
     * Constructor.
57
     *
58
     * @param array|Zend_Config $options options (host, port, login, password)
59
     * @param null|Queue        $queue
60
     */
61
    public function __construct($options, Queue $queue = null)
62
    {
63
        parent::__construct($options, $queue);
64
65
        if (!class_exists('PhpAmqpLib\Message\AMQPMessage')) {
66
            throw new \Exception('Please install videlalvaro/php-amqplib dependency');
67
        }
68
69
        if (is_array($options)) {
70
            try {
71
                $host = $options['host'];
72
                $port = $options['port'];
73
                $user = $options['user'];
74
                $password = $options['password'];
75
76
                $connection = new AMQPConnection($host, $port, $user, $password);
77
                $channel = $connection->channel();
78
79
                $this->connection = $connection;
80
                $this->channel = $channel;
81
            } catch (\Exception $e) {
82
                throw new AdapterRuntimeException("Unable to connect RabbitMQ server: {$e->getMessage()}");
83
            }
84
        } else {
85
            throw new MissingConfigurationException('The options must be an associative array of host, port, login, password...');
86
        }
87
    }
88
89
    /**
90
     * Get AMQPConnection object.
91
     *
92
     * @return object
93
     */
94
    public function getConnection()
95
    {
96
        return $this->connection;
97
    }
98
99
    /**
100
     * Get AMQPChannel object.
101
     *
102
     * @return object
103
     */
104
    public function getChannel()
105
    {
106
        return $this->channel;
107
    }
108
109
    /**
110
     * create queue.
111
     *
112
     * @param string $name
113
     * @param int    $timeout
114
     *
115
     * @return bool
116
     */
117
    public function create($name, $timeout = null)
118
    {
119
        try {
120
            /*
121
                name: $queue
122
                passive: false
123
                durable: true // the queue will survive server restarts
124
                exclusive: false // the queue can be accessed in other channels
125
                auto_delete: false //the queue won't be deleted once the channel is closed.
126
            */
127
            $this->channel->queue_declare($name, false, true, false, false);
128
        } catch (\Exception $e) {
129
            return false;
130
        }
131
132
        return true;
133
    }
134
135
    /**
136
     * delete queue.
137
     *
138
     * @param $name
139
     *
140
     * @return bool
141
     */
142
    public function delete($name)
143
    {
144
        $this->channel->queue_delete($name);
145
146
        return true;
147
    }
148
149
    /**
150
     * Publish message to queue.
151
     *
152
     * @param mixed $message (array or string)
153
     * @param Queue $queue
154
     *
155
     * @return bool|null
156
     */
157
    public function send($message, Queue $queue = null)
158
    {
159
        if ($queue === null) {
160
            $queue = $this->_queue;
161
        }
162
163
        if (is_array($message)) {
164
            $message = \Zend\Json\Encoder::encode($message);
165
        }
166
167
        $this->exchangeName = 'router';
0 ignored issues
show
Documentation Bug introduced by
It seems like 'router' of type string is incompatible with the declared type object<Heri\Bundle\JobQu...er\AMQP_Queue_Exchange> of property $exchangeName.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
168
169
        /*
170
            name: $exchange
171
            type: direct
172
            passive: false
173
            durable: true // the exchange will survive server restarts
174
            auto_delete: false //the exchange won't be deleted once the channel is closed.
175
        */
176
        $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
177
        $this->channel->queue_bind($queue->getName(), $this->exchangeName);
178
179
        $amqpMessage = new AMQPMessage($message, [
180
            'content_type' => 'text/plain',
181
            'delivery_mode' => 2,
182
        ]);
183
184
        $this->channel->basic_publish($amqpMessage, $this->exchangeName);
185
    }
186
187
    /**
188
     * Get messages in the queue.
189
     *
190
     * @param int|null             $maxMessages Maximum number of messages to return
191
     * @param int|null             $timeout     Visibility timeout for these messages
192
     * @param null|ZendQueue\Queue $queue
193
     *
194
     * @return ZendQueue\MessageIterator
195
     */
196
    public function receive($maxMessages = null, $timeout = null, Queue $queue = null)
197
    {
198
        $result = [];
199
200
        if ($queue === null) {
201
            $queue = $this->_queue;
202
        }
203
204
        $maxMessages = (int) $maxMessages ? (int) $maxMessages : 1;
205
206
        // default approach: GET
207
        for ($i = $maxMessages; $i > 0; --$i) {
208
            $amqpMessage = $this->channel->basic_get($queue->getName());
209
210
            if (isset($amqpMessage->delivery_info['delivery_tag'])) {
211
                $result[] = [
212
                    'body' => $amqpMessage->body,
213
                    'amqpMessage' => $amqpMessage,
214
                ];
215
                $this->_count = $amqpMessage->delivery_info['message_count'];
216
            }
217
        }
218
219
        $options = [
220
            'queue' => $queue,
221
            'data' => $result,
222
            'messageClass' => $queue->getMessageClass(),
223
        ];
224
225
        $classname = $queue->getMessageSetClass();
226
227
        return new $classname($options);
228
    }
229
230
    public function getCapabilities()
231
    {
232
        return [
233
            'create' => true,
234
            'delete' => true,
235
            'send' => true,
236
            'count' => true,
237
            'deleteMessage' => true,
238
        ];
239
    }
240
241
    /**
242
     * Does a queue already exist?
243
     *
244
     * Use isSupported('isExists') to determine if an adapter can test for
245
     * queue existance.
246
     *
247
     * @param string $name Queue name
248
     *
249
     * @return bool
250
     */
251
    public function isExists($name)
252
    {
253
        return isset($this->_count);
254
    }
255
256
    /**
257
     * Get an array of all available queues.
258
     *
259
     * Not all adapters support getQueues(); use isSupported('getQueues')
260
     * to determine if the adapter supports this feature.
261
     *
262
     * @return Queue[]
263
     */
264
    public function getQueues()
265
    {
266
        return [$this->_queue];
267
    }
268
269
    /**
270
     * Return the approximate number of messages in the queue.
271
     *
272
     * @param null|Queue $queue
273
     *
274
     * @return int
275
     */
276
    public function count(Queue $queue = null)
277
    {
278
        return $this->_count;
279
    }
280
281
    /**
282
     * Delete a message from the queue.
283
     *
284
     * Return true if the message is deleted, false if the deletion is
285
     * unsuccessful.
286
     *
287
     * @param Message $message
288
     *
289
     * @return bool
290
     */
291
    public function deleteMessage(Message $message)
292
    {
293
        return $this->channel->basic_ack($message->amqpMessage->delivery_info['delivery_tag']);
294
    }
295
296
    /**
297
     * {@inheritdoc}
298
     */
299
    public function setPriority()
300
    {
301
        throw new UnsupportedMethodCallException('Not implemented');
302
    }
303
304
    /**
305
     * {@inheritdoc}
306
     */
307
    public function showMessages($queueName)
308
    {
309
        throw new UnsupportedMethodCallException('Not implemented');
310
    }
311
312
    /**
313
     * {@inheritdoc}
314
     */
315
    public function flush()
316
    {
317
        throw new UnsupportedMethodCallException('Not implemented');
318
    }
319
320
    /**
321
     * {@inheritdoc}
322
     */
323
    public function logException($message, $e)
324
    {
325
        $this->logger->err($message->body);
326
        $this->logger->err($e->getMessage());
327
    }
328
}
329