Issues (73)

RabbitMq/BaseAmqp.php (1 issue)

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AMQPEvent;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use Psr\Log\LoggerInterface;
9
use Psr\Log\NullLogger;
10
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
11
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface;
12
13
abstract class BaseAmqp
14
{
15
    protected $conn;
16
    protected $ch;
17
    protected $consumerTag;
18
    protected $exchangeDeclared = false;
19
    protected $queueDeclared = false;
20
    protected $routingKey = '';
21
    protected $autoSetupFabric = true;
22
    protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2);
23
24
    /**
25
     * @var LoggerInterface
26
     */
27
    protected $logger;
28
29
    protected $exchangeOptions = array(
30
        'passive' => false,
31
        'durable' => true,
32
        'auto_delete' => false,
33
        'internal' => false,
34
        'nowait' => false,
35
        'arguments' => null,
36
        'ticket' => null,
37
        'declare' => true,
38
    );
39
40
    protected $queueOptions = array(
41
        'name' => '',
42
        'passive' => false,
43
        'durable' => true,
44
        'exclusive' => false,
45
        'auto_delete' => false,
46
        'nowait' => false,
47
        'arguments' => null,
48
        'ticket' => null,
49
        'declare' => true,
50
    );
51
52
    /**
53
     * @var EventDispatcherInterface|null
54
     */
55
    protected $eventDispatcher = null;
56
57
    /**
58
     * @param AbstractConnection   $conn
59
     * @param AMQPChannel|null $ch
60
     * @param null             $consumerTag
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $consumerTag is correct as it would always require null to be passed?
Loading history...
61
     */
62 68
    public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $consumerTag = null)
63
    {
64 68
        $this->conn = $conn;
65 68
        $this->ch = $ch;
66
67 68
        if ($conn->connectOnConstruct()) {
68 1
            $this->getChannel();
69
        }
70
71 68
        $this->consumerTag = empty($consumerTag) ? sprintf("PHPPROCESS_%s_%s", gethostname(), getmypid()) : $consumerTag;
72
73 68
        $this->logger = new NullLogger();
74 68
    }
75
76 47
    public function __destruct()
77
    {
78 47
        $this->close();
79 47
    }
80
81 47
    public function close()
82
    {
83 47
        if ($this->ch) {
84
            try {
85 41
                $this->ch->close();
86
            } catch (\Exception $e) {
87
                // ignore on shutdown
88
            }
89
        }
90
91 47
        if ($this->conn && $this->conn->isConnected()) {
92
            try {
93
                $this->conn->close();
94
            } catch (\Exception $e) {
95
                // ignore on shutdown
96
            }
97
        }
98 47
    }
99
100
    public function reconnect()
101
    {
102
        if (!$this->conn->isConnected()) {
103
            return;
104
        }
105
106
        $this->conn->reconnect();
107
    }
108
109
    /**
110
     * @return AMQPChannel
111
     */
112 24
    public function getChannel()
113
    {
114 24
        if (empty($this->ch) || null === $this->ch->getChannelId()) {
115 1
            $this->ch = $this->conn->channel();
116
        }
117
118 24
        return $this->ch;
119
    }
120
121
    /**
122
     * @param  AMQPChannel $ch
123
     *
124
     * @return void
125
     */
126 19
    public function setChannel(AMQPChannel $ch)
127
    {
128 19
        $this->ch = $ch;
129 19
    }
130
131
    /**
132
     * @throws \InvalidArgumentException
133
     * @param  array                     $options
134
     * @return void
135
     */
136 2
    public function setExchangeOptions(array $options = array())
137
    {
138 2
        if (!isset($options['name'])) {
139
            throw new \InvalidArgumentException('You must provide an exchange name');
140
        }
141
142 2
        if (empty($options['type'])) {
143
            throw new \InvalidArgumentException('You must provide an exchange type');
144
        }
145
146 2
        $this->exchangeOptions = array_merge($this->exchangeOptions, $options);
147 2
    }
148
149
    /**
150
     * @param  array $options
151
     * @return void
152
     */
153
    public function setQueueOptions(array $options = array())
154
    {
155
        $this->queueOptions = array_merge($this->queueOptions, $options);
156
    }
157
158
    /**
159
     * @param  string $routingKey
160
     * @return void
161
     */
162 2
    public function setRoutingKey($routingKey)
163
    {
164 2
        $this->routingKey = $routingKey;
165 2
    }
166
167 2
    public function setupFabric()
168
    {
169 2
        if (!$this->exchangeDeclared) {
170 2
            $this->exchangeDeclare();
171
        }
172
173 2
        if (!$this->queueDeclared) {
174 2
            $this->queueDeclare();
175
        }
176 2
    }
177
178
    /**
179
     * disables the automatic SetupFabric when using a consumer or producer
180
     */
181 18
    public function disableAutoSetupFabric()
182
    {
183 18
        $this->autoSetupFabric = false;
184 18
    }
185
186
    /**
187
     * @param LoggerInterface $logger
188
     */
189
    public function setLogger($logger)
190
    {
191
        $this->logger = $logger;
192
    }
193
194
    /**
195
     * Declares exchange
196
     */
197 2
    protected function exchangeDeclare()
198
    {
199 2
        if ($this->exchangeOptions['declare']) {
200
            $this->getChannel()->exchange_declare(
201
                $this->exchangeOptions['name'],
202
                $this->exchangeOptions['type'],
203
                $this->exchangeOptions['passive'],
204
                $this->exchangeOptions['durable'],
205
                $this->exchangeOptions['auto_delete'],
206
                $this->exchangeOptions['internal'],
207
                $this->exchangeOptions['nowait'],
208
                $this->exchangeOptions['arguments'],
209
                $this->exchangeOptions['ticket']);
210
211
            $this->exchangeDeclared = true;
212
        }
213 2
    }
214
215
    /**
216
     * Declares queue, creates if needed
217
     */
218
    protected function queueDeclare()
219
    {
220
        if ($this->queueOptions['declare']) {
221
            list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
222
                $this->queueOptions['durable'], $this->queueOptions['exclusive'],
223
                $this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
224
                $this->queueOptions['arguments'], $this->queueOptions['ticket']);
225
226
            if (isset($this->queueOptions['routing_keys']) && count($this->queueOptions['routing_keys']) > 0) {
227
                foreach ($this->queueOptions['routing_keys'] as $routingKey) {
228
                    $this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey, $this->queueOptions['arguments'] ?? []);
229
                }
230
            } else {
231
                $this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey, $this->queueOptions['arguments'] ?? []);
232
            }
233
234
            $this->queueDeclared = true;
235
        }
236
    }
237
238
    /**
239
     * Binds queue to an exchange
240
     *
241
     * @param string $queue
242
     * @param string $exchange
243
     * @param string $routing_key
244
     */
245 2
    protected function queueBind($queue, $exchange, $routing_key, array $arguments = array())
246
    {
247
        // queue binding is not permitted on the default exchange
248 2
        if ('' !== $exchange) {
249 2
            $this->getChannel()->queue_bind($queue, $exchange, $routing_key, false, $arguments);
250
        }
251 2
    }
252
253
    /**
254
     * @param EventDispatcherInterface $eventDispatcher
255
     *
256
     * @return BaseAmqp
257
     */
258 20
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
259
    {
260 20
        $this->eventDispatcher = $eventDispatcher;
261
262 20
        return $this;
263
    }
264
265
    /**
266
     * @param string $eventName
267
     * @param AMQPEvent  $event
268
     */
269 49
    protected function dispatchEvent($eventName, AMQPEvent $event)
270
    {
271 49
        if ($this->getEventDispatcher() instanceof ContractsEventDispatcherInterface) {
272 19
            $this->getEventDispatcher()->dispatch(
273 19
                $event,
274
                $eventName
275
            );
276
        }
277 49
    }
278
279
    /**
280
     * @return EventDispatcherInterface|null
281
     */
282 48
    public function getEventDispatcher()
283
    {
284 48
        return $this->eventDispatcher;
285
    }
286
}
287