Completed
Push — master ( 980655...c21cce )
by Grégoire
01:53
created

AMQPBackend::getChannel()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use Interop\Amqp\AmqpConsumer;
15
use Interop\Amqp\AmqpContext;
16
use Interop\Amqp\AmqpMessage;
17
use Interop\Amqp\AmqpQueue;
18
use Interop\Amqp\AmqpTopic;
19
use Interop\Amqp\Impl\AmqpBind;
20
use Sonata\NotificationBundle\Consumer\ConsumerEvent;
21
use Sonata\NotificationBundle\Exception\HandlingException;
22
use Sonata\NotificationBundle\Iterator\AMQPMessageIterator;
23
use Sonata\NotificationBundle\Model\Message;
24
use Sonata\NotificationBundle\Model\MessageInterface;
25
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
26
use ZendDiagnostics\Result\Failure;
27
use ZendDiagnostics\Result\Success;
28
29
/**
30
 * Consumer side of the rabbitMQ backend.
31
 */
32
class AMQPBackend implements BackendInterface
33
{
34
    /**
35
     * @var AMQPBackendDispatcher
36
     */
37
    private $dispatcher = null;
38
39
    /**
40
     * @var string
41
     */
42
    private $exchange;
43
44
    /**
45
     * @var string
46
     */
47
    private $queue;
48
49
    /**
50
     * @var string
51
     */
52
    private $key;
53
54
    /**
55
     * @var string
56
     */
57
    private $recover;
58
59
    /**
60
     * @var null|string
61
     */
62
    private $deadLetterExchange;
63
64
    /**
65
     * @var null|string
66
     */
67
    private $deadLetterRoutingKey;
68
69
    /**
70
     * @var null|int
71
     */
72
    private $ttl;
73
74
    /**
75
     * @var null|int
76
     */
77
    private $prefetchCount;
78
79
    /**
80
     * @var AmqpConsumer
81
     */
82
    private $consumer;
83
84
    /**
85
     * @param string   $exchange
86
     * @param string   $queue
87
     * @param string   $recover
88
     * @param string   $key
89
     * @param string   $deadLetterExchange
90
     * @param string   $deadLetterRoutingKey
91
     * @param null|int $ttl
92
     */
93
    public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null, $prefetchCount = null)
94
    {
95
        $this->exchange = $exchange;
96
        $this->queue = $queue;
97
        $this->recover = $recover;
98
        $this->key = $key;
99
        $this->deadLetterExchange = $deadLetterExchange;
100
        $this->deadLetterRoutingKey = $deadLetterRoutingKey;
101
        $this->ttl = $ttl;
102
        $this->prefetchCount = $prefetchCount;
103
    }
104
105
    /**
106
     * @param AMQPBackendDispatcher $dispatcher
107
     */
108
    public function setDispatcher(AMQPBackendDispatcher $dispatcher)
109
    {
110
        $this->dispatcher = $dispatcher;
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    public function initialize()
117
    {
118
        $args = [];
119
        if (null !== $this->deadLetterExchange) {
120
            $args['x-dead-letter-exchange'] = $this->deadLetterExchange;
121
122
            if (null !== $this->deadLetterRoutingKey) {
123
                $args['x-dead-letter-routing-key'] = $this->deadLetterRoutingKey;
124
            }
125
        }
126
127
        if (null !== $this->ttl) {
128
            $args['x-message-ttl'] = $this->ttl;
129
        }
130
131
        $queue = $this->getContext()->createQueue($this->queue);
132
        $queue->addFlag(AmqpQueue::FLAG_DURABLE);
133
        $queue->setArguments($args);
134
        $this->getContext()->declareQueue($queue);
0 ignored issues
show
Compatibility introduced by
$queue of type object<Interop\Queue\PsrQueue> is not a sub-type of object<Interop\Amqp\AmqpQueue>. It seems like you assume a child interface of the interface Interop\Queue\PsrQueue to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
135
136
        $topic = $this->getContext()->createTopic($this->exchange);
137
        $topic->setType(AmqpTopic::TYPE_DIRECT);
138
        $topic->addFlag(AmqpTopic::FLAG_DURABLE);
139
        $this->getContext()->declareTopic($topic);
0 ignored issues
show
Compatibility introduced by
$topic of type object<Interop\Queue\PsrTopic> is not a sub-type of object<Interop\Amqp\AmqpTopic>. It seems like you assume a child interface of the interface Interop\Queue\PsrTopic to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
140
141
        $this->getContext()->bind(new AmqpBind($queue, $topic, $this->key));
0 ignored issues
show
Documentation introduced by
$queue is of type object<Interop\Queue\PsrQueue>, but the function expects a object<Interop\Amqp\AmqpDestination>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
Documentation introduced by
$topic is of type object<Interop\Queue\PsrTopic>, but the function expects a object<Interop\Amqp\AmqpDestination>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
142
143
        if (null !== $this->deadLetterExchange && null === $this->deadLetterRoutingKey) {
144
            $deadLetterTopic = $this->getContext()->createTopic($this->deadLetterExchange);
145
            $deadLetterTopic->setType(AmqpTopic::TYPE_DIRECT);
146
            $deadLetterTopic->addFlag(AmqpTopic::FLAG_DURABLE);
147
            $this->getContext()->declareTopic($deadLetterTopic);
0 ignored issues
show
Compatibility introduced by
$deadLetterTopic of type object<Interop\Queue\PsrTopic> is not a sub-type of object<Interop\Amqp\AmqpTopic>. It seems like you assume a child interface of the interface Interop\Queue\PsrTopic to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
148
149
            $this->getContext()->bind(new AmqpBind($queue, $deadLetterTopic, $this->key));
0 ignored issues
show
Documentation introduced by
$queue is of type object<Interop\Queue\PsrQueue>, but the function expects a object<Interop\Amqp\AmqpDestination>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
Documentation introduced by
$deadLetterTopic is of type object<Interop\Queue\PsrTopic>, but the function expects a object<Interop\Amqp\AmqpDestination>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
150
        }
151
    }
152
153
    /**
154
     * {@inheritdoc}
155
     */
156
    public function publish(MessageInterface $message)
157
    {
158
        $body = json_encode([
159
            'type' => $message->getType(),
160
            'body' => $message->getBody(),
161
            'createdAt' => $message->getCreatedAt()->format('U'),
162
            'state' => $message->getState(),
163
        ]);
164
165
        $amqpMessage = $this->getContext()->createMessage($body);
166
        $amqpMessage->setContentType('text/plain'); // application/json ?
167
        $amqpMessage->setTimestamp($message->getCreatedAt()->format('U'));
168
        $amqpMessage->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
169
        $amqpMessage->setRoutingKey($this->key);
170
171
        $topic = $this->getContext()->createTopic($this->exchange);
172
173
        $this->getContext()->createProducer()->send($topic, $amqpMessage);
174
    }
175
176
    /**
177
     * {@inheritdoc}
178
     */
179
    public function create($type, array $body)
180
    {
181
        $message = new Message();
182
        $message->setType($type);
183
        $message->setBody($body);
184
        $message->setState(MessageInterface::STATE_OPEN);
185
186
        return $message;
187
    }
188
189
    /**
190
     * {@inheritdoc}
191
     */
192
    public function createAndPublish($type, array $body)
193
    {
194
        $this->publish($this->create($type, $body));
195
    }
196
197
    /**
198
     * {@inheritdoc}
199
     */
200
    public function getIterator()
201
    {
202
        $context = $this->getContext();
203
204
        if (null !== $this->prefetchCount) {
205
            $context->setQos(null, $this->prefetchCount, false);
206
        }
207
208
        $this->consumer = $this->getContext()->createConsumer($this->getContext()->createQueue($this->queue));
209
        $this->consumer->setConsumerTag('sonata_notification_'.uniqid());
210
211
        return new AMQPMessageIterator($this->consumer);
0 ignored issues
show
Compatibility introduced by
$this->consumer of type object<Interop\Queue\PsrConsumer> is not a sub-type of object<Interop\Amqp\AmqpConsumer>. It seems like you assume a child interface of the interface Interop\Queue\PsrConsumer to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
Bug Best Practice introduced by
The return type of return new \Sonata\Notif...rator($this->consumer); (Sonata\NotificationBundl...tor\AMQPMessageIterator) is incompatible with the return type declared by the interface Sonata\NotificationBundl...dInterface::getIterator of type Sonata\NotificationBundl...essageIteratorInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
212
    }
213
214
    /**
215
     * {@inheritdoc}
216
     */
217
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
218
    {
219
        $event = new ConsumerEvent($message);
220
221
        /** @var AmqpMessage $amqpMessage */
222
        $amqpMessage = $message->getValue('interopMessage');
223
224
        try {
225
            $dispatcher->dispatch($message->getType(), $event);
226
227
            $this->consumer->acknowledge($amqpMessage);
228
229
            $message->setCompletedAt(new \DateTime());
230
            $message->setState(MessageInterface::STATE_DONE);
231
        } catch (HandlingException $e) {
232
            $message->setCompletedAt(new \DateTime());
233
            $message->setState(MessageInterface::STATE_ERROR);
234
235
            $this->consumer->acknowledge($amqpMessage);
236
237
            throw new HandlingException('Error while handling a message', 0, $e);
238
        } catch (\Exception $e) {
239
            $message->setCompletedAt(new \DateTime());
240
            $message->setState(MessageInterface::STATE_ERROR);
241
242
            $this->consumer->reject($amqpMessage, $this->recover);
0 ignored issues
show
Documentation introduced by
$this->recover is of type string, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
243
244
            throw new HandlingException('Error while handling a message', 0, $e);
245
        }
246
    }
247
248
    /**
249
     * {@inheritdoc}
250
     */
251
    public function getStatus()
252
    {
253
        try {
254
            $this->getContext();
255
        } catch (\Exception $e) {
256
            return new Failure($e->getMessage());
257
        }
258
259
        return new Success('Channel is running (RabbitMQ)');
260
    }
261
262
    /**
263
     * {@inheritdoc}
264
     */
265
    public function cleanup()
266
    {
267
        throw new \RuntimeException('Not implemented');
268
    }
269
270
    /**
271
     * @return AmqpContext
272
     */
273
    private function getContext()
274
    {
275
        if (null === $this->dispatcher) {
276
            throw new \RuntimeException('Unable to retrieve AMQP context without dispatcher.');
277
        }
278
279
        return $this->dispatcher->getContext();
280
    }
281
}
282