Completed
Push — 2.x-dev-kit ( 8d77e1 )
by
unknown
28:22 queued 25:50
created

AMQPBackend::initialize()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 30
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 30
rs 8.8571
cc 2
eloc 7
nc 2
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Sonata project.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use PhpAmqpLib\Channel\AMQPChannel;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use Sonata\NotificationBundle\Consumer\ConsumerEvent;
17
use Sonata\NotificationBundle\Exception\HandlingException;
18
use Sonata\NotificationBundle\Iterator\AMQPMessageIterator;
19
use Sonata\NotificationBundle\Model\Message;
20
use Sonata\NotificationBundle\Model\MessageInterface;
21
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
22
use ZendDiagnostics\Result\Failure;
23
use ZendDiagnostics\Result\Success;
24
25
/**
26
 * Consumer side of the rabbitMQ backend.
27
 */
28
class AMQPBackend implements BackendInterface
29
{
30
    /**
31
     * @var string
32
     */
33
    protected $exchange;
34
35
    /**
36
     * @var string
37
     */
38
    protected $queue;
39
40
    /**
41
     * @deprecated since version 2.4 and will be removed in 3.0.
42
     */
43
    protected $connection;
44
45
    /**
46
     * @var string
47
     */
48
    protected $key;
49
50
    /**
51
     * @var string
52
     */
53
    protected $recover;
54
55
    /**
56
     * @var null|string
57
     */
58
    protected $deadLetterExchange;
59
60
    /**
61
     * @var AMQPBackendDispatcher
62
     */
63
    protected $dispatcher = null;
64
65
    /**
66
     * @param string $exchange
67
     * @param string $queue
68
     * @param string $recover
69
     * @param string $key
70
     * @param string $deadLetterExchange
71
     */
72
    public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null)
73
    {
74
        $this->exchange = $exchange;
75
        $this->queue    = $queue;
76
        $this->recover  = $recover;
77
        $this->key      = $key;
78
        $this->deadLetterExchange = $deadLetterExchange;
79
80
        if (!class_exists('PhpAmqpLib\Message\AMQPMessage')) {
81
            throw new \RuntimeException('Please install videlalvaro/php-amqplib dependency');
82
        }
83
    }
84
85
    /**
86
     * @param AMQPBackendDispatcher $dispatcher
87
     */
88
    public function setDispatcher(AMQPBackendDispatcher $dispatcher)
89
    {
90
        $this->dispatcher = $dispatcher;
91
    }
92
93
    /**
94
     * @return AMQPChannel
95
     */
96
    protected function getChannel()
97
    {
98
        if ($this->dispatcher === null) {
99
            throw new \RuntimeException('Unable to retrieve AMQP channel without dispatcher.');
100
        }
101
102
        return $this->dispatcher->getChannel();
103
    }
104
105
    /**
106
     * {@inheritdoc}
107
     */
108
    public function initialize()
109
    {
110
        $args = array();
111
112
        if ($this->deadLetterExchange !== null) {
113
            $args['x-dead-letter-exchange'] = array('S', $this->deadLetterExchange);
114
        }
115
116
        /*
117
         * name: $queue
118
         * passive: false
119
         * durable: true // the queue will survive server restarts
120
         * exclusive: false // the queue can be accessed in other channels
121
         * auto_delete: false //the queue won't be deleted once the channel is closed.
122
         * no_wait: false the channel will wait until queue.declare_ok is received
123
         * args: array
124
         */
125
        $this->getChannel()->queue_declare($this->queue, false, true, false, false, false, $args);
126
127
        /*
128
         * name: $exchange
129
         * type: direct
130
         * passive: false
131
         * durable: true // the exchange will survive server restarts
132
         * auto_delete: false //the exchange won't be deleted once the channel is closed.
133
         **/
134
        $this->getChannel()->exchange_declare($this->exchange, 'direct', false, true, false);
135
136
        $this->getChannel()->queue_bind($this->queue, $this->exchange, $this->key);
137
    }
138
139
    /**
140
     * {@inheritdoc}
141
     */
142
    public function publish(MessageInterface $message)
143
    {
144
        $body = json_encode(array(
145
            'type'      => $message->getType(),
146
            'body'      => $message->getBody(),
147
            'createdAt' => $message->getCreatedAt()->format('U'),
148
            'state'     => $message->getState(),
149
        ));
150
151
        $amq = new AMQPMessage($body, array(
152
            'content_type'  => 'text/plain',
153
            'delivery_mode' => 2,
154
        ));
155
156
        $this->getChannel()->basic_publish($amq, $this->exchange, $this->key);
157
    }
158
159
    /**
160
     * {@inheritdoc}
161
     */
162
    public function create($type, array $body)
163
    {
164
        $message = new Message();
165
        $message->setType($type);
166
        $message->setBody($body);
167
        $message->setState(MessageInterface::STATE_OPEN);
168
169
        return $message;
170
    }
171
172
    /**
173
     * {@inheritdoc}
174
     */
175
    public function createAndPublish($type, array $body)
176
    {
177
        $this->publish($this->create($type, $body));
178
    }
179
180
    /**
181
     * {@inheritdoc}
182
     */
183
    public function getIterator()
184
    {
185
        return new AMQPMessageIterator($this->getChannel(), $this->queue);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return new \Sonata\Notif...annel(), $this->queue); (Sonata\NotificationBundl...tor\AMQPMessageIterator) is incompatible with the return type declared by the interface Sonata\NotificationBundl...dInterface::getIterator of type Sonata\NotificationBundl...essageIteratorInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
186
    }
187
188
    /**
189
     * {@inheritdoc}
190
     */
191
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
192
    {
193
        $event = new ConsumerEvent($message);
194
195
        try {
196
            $dispatcher->dispatch($message->getType(), $event);
197
198
            $message->getValue('AMQMessage')->delivery_info['channel']->basic_ack($message->getValue('AMQMessage')->delivery_info['delivery_tag']);
199
200
            $message->setCompletedAt(new \DateTime());
201
            $message->setState(MessageInterface::STATE_DONE);
202
        } catch (HandlingException $e) {
203
            $message->setCompletedAt(new \DateTime());
204
            $message->setState(MessageInterface::STATE_ERROR);
205
206
            $message->getValue('AMQMessage')->delivery_info['channel']->basic_ack($message->getValue('AMQMessage')->delivery_info['delivery_tag']);
207
208
            throw new HandlingException('Error while handling a message', 0, $e);
209
        } catch (\Exception $e) {
210
            $message->setCompletedAt(new \DateTime());
211
            $message->setState(MessageInterface::STATE_ERROR);
212
213
            if ($this->recover === true) {
214
                $message->getValue('AMQMessage')->delivery_info['channel']->basic_recover($message->getValue('AMQMessage')->delivery_info['delivery_tag']);
215
            } elseif ($this->deadLetterExchange !== null) {
216
                $message->getValue('AMQMessage')->delivery_info['channel']->basic_reject($message->getValue('AMQMessage')->delivery_info['delivery_tag'], false);
217
            }
218
219
            throw new HandlingException('Error while handling a message', 0, $e);
220
        }
221
    }
222
223
    /**
224
     * {@inheritdoc}
225
     */
226
    public function getStatus()
227
    {
228
        try {
229
            $this->getChannel();
230
        } catch (\Exception $e) {
231
            return new Failure($e->getMessage());
232
        }
233
234
        return new Success('Channel is running (RabbitMQ)');
235
    }
236
237
    /**
238
     * {@inheritdoc}
239
     */
240
    public function cleanup()
241
    {
242
        throw new \RuntimeException('Not implemented');
243
    }
244
}
245