AMQPBackend::handle()   A
last analyzed

Complexity

Conditions 3
Paths 9

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 30
rs 9.44
c 0
b 0
f 0
cc 3
nc 9
nop 2
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Sonata Project package.
7
 *
8
 * (c) Thomas Rabaix <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Sonata\NotificationBundle\Backend;
15
16
use Interop\Amqp\AmqpConsumer;
17
use Interop\Amqp\AmqpContext;
18
use Interop\Amqp\AmqpMessage;
19
use Interop\Amqp\AmqpQueue;
20
use Interop\Amqp\AmqpTopic;
21
use Interop\Amqp\Impl\AmqpBind;
22
use Laminas\Diagnostics\Result\Failure;
23
use Laminas\Diagnostics\Result\Success;
24
use Sonata\NotificationBundle\Consumer\ConsumerEvent;
25
use Sonata\NotificationBundle\Exception\HandlingException;
26
use Sonata\NotificationBundle\Iterator\AMQPMessageIterator;
27
use Sonata\NotificationBundle\Model\Message;
28
use Sonata\NotificationBundle\Model\MessageInterface;
29
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
30
31
/**
32
 * Consumer side of the rabbitMQ backend.
33
 */
34
class AMQPBackend implements BackendInterface
35
{
36
    /**
37
     * @var AMQPBackendDispatcher
38
     */
39
    private $dispatcher = null;
40
41
    /**
42
     * @var string
43
     */
44
    private $exchange;
45
46
    /**
47
     * @var string
48
     */
49
    private $queue;
50
51
    /**
52
     * @var string
53
     */
54
    private $key;
55
56
    /**
57
     * @var string
58
     */
59
    private $recover;
60
61
    /**
62
     * @var string|null
63
     */
64
    private $deadLetterExchange;
65
66
    /**
67
     * @var string|null
68
     */
69
    private $deadLetterRoutingKey;
70
71
    /**
72
     * @var int|null
73
     */
74
    private $ttl;
75
76
    /**
77
     * @var int|null
78
     */
79
    private $prefetchCount;
80
81
    /**
82
     * @var AmqpConsumer
83
     */
84
    private $consumer;
85
86
    /**
87
     * @param string   $exchange
88
     * @param string   $queue
89
     * @param string   $recover
90
     * @param string   $key
91
     * @param string   $deadLetterExchange
92
     * @param string   $deadLetterRoutingKey
93
     * @param int|null $ttl
94
     */
95
    public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null, $deadLetterRoutingKey = null, $ttl = null, $prefetchCount = null)
96
    {
97
        $this->exchange = $exchange;
98
        $this->queue = $queue;
99
        $this->recover = $recover;
100
        $this->key = $key;
101
        $this->deadLetterExchange = $deadLetterExchange;
102
        $this->deadLetterRoutingKey = $deadLetterRoutingKey;
103
        $this->ttl = $ttl;
104
        $this->prefetchCount = $prefetchCount;
105
    }
106
107
    public function setDispatcher(AMQPBackendDispatcher $dispatcher): void
108
    {
109
        $this->dispatcher = $dispatcher;
110
    }
111
112
    /**
113
     * {@inheritdoc}
114
     */
115
    public function initialize(): void
116
    {
117
        $args = [];
118
        if (null !== $this->deadLetterExchange) {
119
            $args['x-dead-letter-exchange'] = $this->deadLetterExchange;
120
121
            if (null !== $this->deadLetterRoutingKey) {
122
                $args['x-dead-letter-routing-key'] = $this->deadLetterRoutingKey;
123
            }
124
        }
125
126
        if (null !== $this->ttl) {
127
            $args['x-message-ttl'] = $this->ttl;
128
        }
129
130
        $queue = $this->getContext()->createQueue($this->queue);
131
        $queue->addFlag(AmqpQueue::FLAG_DURABLE);
132
        $queue->setArguments($args);
133
        $this->getContext()->declareQueue($queue);
0 ignored issues
show
Compatibility introduced by
$queue of type object<Interop\Queue\PsrQueue> is not a sub-type of object<Interop\Amqp\AmqpQueue>. It seems like you assume a child interface of the interface Interop\Queue\PsrQueue to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
134
135
        $topic = $this->getContext()->createTopic($this->exchange);
136
        $topic->setType(AmqpTopic::TYPE_DIRECT);
137
        $topic->addFlag(AmqpTopic::FLAG_DURABLE);
138
        $this->getContext()->declareTopic($topic);
0 ignored issues
show
Compatibility introduced by
$topic of type object<Interop\Queue\PsrTopic> is not a sub-type of object<Interop\Amqp\AmqpTopic>. It seems like you assume a child interface of the interface Interop\Queue\PsrTopic to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
139
140
        $this->getContext()->bind(new AmqpBind($queue, $topic, $this->key));
0 ignored issues
show
Documentation introduced by
$queue is of type object<Interop\Queue\PsrQueue>, but the function expects a object<Interop\Amqp\AmqpDestination>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
Documentation introduced by
$topic is of type object<Interop\Queue\PsrTopic>, but the function expects a object<Interop\Amqp\AmqpDestination>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
141
142
        if (null !== $this->deadLetterExchange && null === $this->deadLetterRoutingKey) {
143
            $deadLetterTopic = $this->getContext()->createTopic($this->deadLetterExchange);
144
            $deadLetterTopic->setType(AmqpTopic::TYPE_DIRECT);
145
            $deadLetterTopic->addFlag(AmqpTopic::FLAG_DURABLE);
146
            $this->getContext()->declareTopic($deadLetterTopic);
0 ignored issues
show
Compatibility introduced by
$deadLetterTopic of type object<Interop\Queue\PsrTopic> is not a sub-type of object<Interop\Amqp\AmqpTopic>. It seems like you assume a child interface of the interface Interop\Queue\PsrTopic to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
147
148
            $this->getContext()->bind(new AmqpBind($queue, $deadLetterTopic, $this->key));
0 ignored issues
show
Documentation introduced by
$queue is of type object<Interop\Queue\PsrQueue>, but the function expects a object<Interop\Amqp\AmqpDestination>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
Documentation introduced by
$deadLetterTopic is of type object<Interop\Queue\PsrTopic>, but the function expects a object<Interop\Amqp\AmqpDestination>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
149
        }
150
    }
151
152
    /**
153
     * {@inheritdoc}
154
     */
155
    public function publish(MessageInterface $message): void
156
    {
157
        $body = json_encode([
158
            'type' => $message->getType(),
159
            'body' => $message->getBody(),
160
            'createdAt' => $message->getCreatedAt()->format('U'),
161
            'state' => $message->getState(),
162
        ]);
163
164
        $amqpMessage = $this->getContext()->createMessage($body);
165
        $amqpMessage->setContentType('text/plain'); // application/json ?
166
        $amqpMessage->setTimestamp($message->getCreatedAt()->getTimestamp());
167
        $amqpMessage->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
168
        $amqpMessage->setRoutingKey($this->key);
169
170
        $topic = $this->getContext()->createTopic($this->exchange);
171
172
        $this->getContext()->createProducer()->send($topic, $amqpMessage);
173
    }
174
175
    /**
176
     * {@inheritdoc}
177
     */
178
    public function create($type, array $body)
179
    {
180
        $message = new Message();
181
        $message->setType($type);
182
        $message->setBody($body);
183
        $message->setState(MessageInterface::STATE_OPEN);
184
185
        return $message;
186
    }
187
188
    /**
189
     * {@inheritdoc}
190
     */
191
    public function createAndPublish($type, array $body): void
192
    {
193
        $this->publish($this->create($type, $body));
194
    }
195
196
    /**
197
     * {@inheritdoc}
198
     */
199
    public function getIterator()
200
    {
201
        $context = $this->getContext();
202
203
        if (null !== $this->prefetchCount) {
204
            $context->setQos(null, $this->prefetchCount, false);
205
        }
206
207
        $this->consumer = $this->getContext()->createConsumer($this->getContext()->createQueue($this->queue));
208
        $this->consumer->setConsumerTag('sonata_notification_'.uniqid());
209
210
        return new AMQPMessageIterator($this->consumer);
0 ignored issues
show
Compatibility introduced by
$this->consumer of type object<Interop\Queue\PsrConsumer> is not a sub-type of object<Interop\Amqp\AmqpConsumer>. It seems like you assume a child interface of the interface Interop\Queue\PsrConsumer to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
Bug Best Practice introduced by
The return type of return new \Sonata\Notif...rator($this->consumer); (Sonata\NotificationBundl...tor\AMQPMessageIterator) is incompatible with the return type declared by the interface Sonata\NotificationBundl...dInterface::getIterator of type Sonata\NotificationBundl...essageIteratorInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
211
    }
212
213
    /**
214
     * {@inheritdoc}
215
     */
216
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher): void
217
    {
218
        $event = new ConsumerEvent($message);
219
220
        /** @var AmqpMessage $amqpMessage */
221
        $amqpMessage = $message->getValue('interopMessage');
222
223
        try {
224
            $dispatcher->dispatch($event, $message->getType());
225
226
            $this->consumer->acknowledge($amqpMessage);
227
228
            $message->setCompletedAt(new \DateTime());
229
            $message->setState(MessageInterface::STATE_DONE);
230
        } catch (HandlingException $e) {
231
            $message->setCompletedAt(new \DateTime());
232
            $message->setState(MessageInterface::STATE_ERROR);
233
234
            $this->consumer->acknowledge($amqpMessage);
235
236
            throw new HandlingException('Error while handling a message', 0, $e);
237
        } catch (\Exception $e) {
238
            $message->setCompletedAt(new \DateTime());
239
            $message->setState(MessageInterface::STATE_ERROR);
240
241
            $this->consumer->reject($amqpMessage, $this->recover);
0 ignored issues
show
Documentation introduced by
$this->recover is of type string, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
242
243
            throw new HandlingException('Error while handling a message', 0, $e);
244
        }
245
    }
246
247
    /**
248
     * {@inheritdoc}
249
     */
250
    public function getStatus()
251
    {
252
        try {
253
            $this->getContext();
254
        } catch (\Exception $e) {
255
            return new Failure($e->getMessage());
256
        }
257
258
        return new Success('Channel is running (RabbitMQ)');
259
    }
260
261
    /**
262
     * {@inheritdoc}
263
     */
264
    public function cleanup(): void
265
    {
266
        throw new \RuntimeException('Not implemented');
267
    }
268
269
    /**
270
     * @return AmqpContext
271
     */
272
    private function getContext()
273
    {
274
        if (null === $this->dispatcher) {
275
            throw new \RuntimeException('Unable to retrieve AMQP context without dispatcher.');
276
        }
277
278
        return $this->dispatcher->getContext();
279
    }
280
}
281