Completed
Push — 3.x-dev-kit ( cc4dbe )
by
unknown
03:15
created

AMQPBackend   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 212
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Importance

Changes 7
Bugs 3 Features 0
Metric Value
wmc 19
c 7
b 3
f 0
lcom 1
cbo 11
dl 0
loc 212
rs 10

11 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 12 2
B initialize() 0 30 2
A publish() 0 16 1
A create() 0 9 1
B handle() 0 31 5
A getStatus() 0 10 2
A cleanup() 0 4 1
A getChannel() 0 8 2
A setDispatcher() 0 4 1
A createAndPublish() 0 4 1
A getIterator() 0 4 1
1
<?php
2
3
/*
4
 * This file is part of the Sonata Project package.
5
 *
6
 * (c) Thomas Rabaix <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Sonata\NotificationBundle\Backend;
13
14
use PhpAmqpLib\Channel\AMQPChannel;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use Sonata\NotificationBundle\Consumer\ConsumerEvent;
17
use Sonata\NotificationBundle\Exception\HandlingException;
18
use Sonata\NotificationBundle\Iterator\AMQPMessageIterator;
19
use Sonata\NotificationBundle\Model\Message;
20
use Sonata\NotificationBundle\Model\MessageInterface;
21
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
22
use ZendDiagnostics\Result\Failure;
23
use ZendDiagnostics\Result\Success;
24
25
/**
26
 * Consumer side of the rabbitMQ backend.
27
 */
28
class AMQPBackend implements BackendInterface
29
{
30
    /**
31
     * @var string
32
     */
33
    protected $exchange;
34
35
    /**
36
     * @var string
37
     */
38
    protected $queue;
39
40
    /**
41
     * @var string
42
     */
43
    protected $key;
44
45
    /**
46
     * @var string
47
     */
48
    protected $recover;
49
50
    /**
51
     * @var null|string
52
     */
53
    protected $deadLetterExchange;
54
55
    /**
56
     * @var AMQPBackendDispatcher
57
     */
58
    protected $dispatcher = null;
59
60
    /**
61
     * @param string $exchange
62
     * @param string $queue
63
     * @param string $recover
64
     * @param string $key
65
     * @param string $deadLetterExchange
66
     */
67
    public function __construct($exchange, $queue, $recover, $key, $deadLetterExchange = null)
68
    {
69
        $this->exchange = $exchange;
70
        $this->queue = $queue;
71
        $this->recover = $recover;
72
        $this->key = $key;
73
        $this->deadLetterExchange = $deadLetterExchange;
74
75
        if (!class_exists('PhpAmqpLib\Message\AMQPMessage')) {
76
            throw new \RuntimeException('Please install videlalvaro/php-amqplib dependency');
77
        }
78
    }
79
80
    /**
81
     * @param AMQPBackendDispatcher $dispatcher
82
     */
83
    public function setDispatcher(AMQPBackendDispatcher $dispatcher)
84
    {
85
        $this->dispatcher = $dispatcher;
86
    }
87
88
    /**
89
     * {@inheritdoc}
90
     */
91
    public function initialize()
92
    {
93
        $args = array();
94
95
        if ($this->deadLetterExchange !== null) {
96
            $args['x-dead-letter-exchange'] = array('S', $this->deadLetterExchange);
97
        }
98
99
        /*
100
         * name: $queue
101
         * passive: false
102
         * durable: true // the queue will survive server restarts
103
         * exclusive: false // the queue can be accessed in other channels
104
         * auto_delete: false //the queue won't be deleted once the channel is closed.
105
         * no_wait: false the channel will wait until queue.declare_ok is received
106
         * args: array
107
         */
108
        $this->getChannel()->queue_declare($this->queue, false, true, false, false, false, $args);
109
110
        /*
111
         * name: $exchange
112
         * type: direct
113
         * passive: false
114
         * durable: true // the exchange will survive server restarts
115
         * auto_delete: false //the exchange won't be deleted once the channel is closed.
116
         **/
117
        $this->getChannel()->exchange_declare($this->exchange, 'direct', false, true, false);
118
119
        $this->getChannel()->queue_bind($this->queue, $this->exchange, $this->key);
120
    }
121
122
    /**
123
     * {@inheritdoc}
124
     */
125
    public function publish(MessageInterface $message)
126
    {
127
        $body = json_encode(array(
128
            'type' => $message->getType(),
129
            'body' => $message->getBody(),
130
            'createdAt' => $message->getCreatedAt()->format('U'),
131
            'state' => $message->getState(),
132
        ));
133
134
        $amq = new AMQPMessage($body, array(
135
            'content_type' => 'text/plain',
136
            'delivery_mode' => 2,
137
        ));
138
139
        $this->getChannel()->basic_publish($amq, $this->exchange, $this->key);
140
    }
141
142
    /**
143
     * {@inheritdoc}
144
     */
145
    public function create($type, array $body)
146
    {
147
        $message = new Message();
148
        $message->setType($type);
149
        $message->setBody($body);
150
        $message->setState(MessageInterface::STATE_OPEN);
151
152
        return $message;
153
    }
154
155
    /**
156
     * {@inheritdoc}
157
     */
158
    public function createAndPublish($type, array $body)
159
    {
160
        $this->publish($this->create($type, $body));
161
    }
162
163
    /**
164
     * {@inheritdoc}
165
     */
166
    public function getIterator()
167
    {
168
        return new AMQPMessageIterator($this->getChannel(), $this->queue);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return new \Sonata\Notif...annel(), $this->queue); (Sonata\NotificationBundl...tor\AMQPMessageIterator) is incompatible with the return type declared by the interface Sonata\NotificationBundl...dInterface::getIterator of type Sonata\NotificationBundl...essageIteratorInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
169
    }
170
171
    /**
172
     * {@inheritdoc}
173
     */
174
    public function handle(MessageInterface $message, EventDispatcherInterface $dispatcher)
175
    {
176
        $event = new ConsumerEvent($message);
177
178
        try {
179
            $dispatcher->dispatch($message->getType(), $event);
180
181
            $message->getValue('AMQMessage')->delivery_info['channel']->basic_ack($message->getValue('AMQMessage')->delivery_info['delivery_tag']);
182
183
            $message->setCompletedAt(new \DateTime());
184
            $message->setState(MessageInterface::STATE_DONE);
185
        } catch (HandlingException $e) {
186
            $message->setCompletedAt(new \DateTime());
187
            $message->setState(MessageInterface::STATE_ERROR);
188
189
            $message->getValue('AMQMessage')->delivery_info['channel']->basic_ack($message->getValue('AMQMessage')->delivery_info['delivery_tag']);
190
191
            throw new HandlingException('Error while handling a message', 0, $e);
192
        } catch (\Exception $e) {
193
            $message->setCompletedAt(new \DateTime());
194
            $message->setState(MessageInterface::STATE_ERROR);
195
196
            if ($this->recover === true) {
197
                $message->getValue('AMQMessage')->delivery_info['channel']->basic_recover($message->getValue('AMQMessage')->delivery_info['delivery_tag']);
198
            } elseif ($this->deadLetterExchange !== null) {
199
                $message->getValue('AMQMessage')->delivery_info['channel']->basic_reject($message->getValue('AMQMessage')->delivery_info['delivery_tag'], false);
200
            }
201
202
            throw new HandlingException('Error while handling a message', 0, $e);
203
        }
204
    }
205
206
    /**
207
     * {@inheritdoc}
208
     */
209
    public function getStatus()
210
    {
211
        try {
212
            $this->getChannel();
213
        } catch (\Exception $e) {
214
            return new Failure($e->getMessage());
215
        }
216
217
        return new Success('Channel is running (RabbitMQ)');
218
    }
219
220
    /**
221
     * {@inheritdoc}
222
     */
223
    public function cleanup()
224
    {
225
        throw new \RuntimeException('Not implemented');
226
    }
227
228
    /**
229
     * @return AMQPChannel
230
     */
231
    protected function getChannel()
232
    {
233
        if ($this->dispatcher === null) {
234
            throw new \RuntimeException('Unable to retrieve AMQP channel without dispatcher.');
235
        }
236
237
        return $this->dispatcher->getChannel();
238
    }
239
}
240