1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
namespace Tomaj\Hermes\Driver; |
5
|
|
|
|
6
|
|
|
use Closure; |
7
|
|
|
use Exception; |
8
|
|
|
use Tomaj\Hermes\Dispatcher; |
9
|
|
|
use Tomaj\Hermes\MessageInterface; |
10
|
|
|
use Tomaj\Hermes\MessageSerializer; |
11
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
12
|
|
|
use PhpAmqpLib\Channel\AMQPChannel; |
13
|
|
|
|
14
|
|
|
/** |
15
|
|
|
* Class RabbitMqDriver |
16
|
|
|
* @package Tomaj\Hermes\Driver |
17
|
|
|
* |
18
|
|
|
* @deprecated use LazyRabbitMqDriver instead |
19
|
|
|
*/ |
20
|
|
|
class RabbitMqDriver implements DriverInterface |
21
|
|
|
{ |
22
|
|
|
use SerializerAwareTrait; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* @var AMQPChannel |
26
|
|
|
*/ |
27
|
|
|
private $channel; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* @var string |
31
|
|
|
*/ |
32
|
|
|
private $queue; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @var array |
36
|
|
|
*/ |
37
|
3 |
|
private $amqpMessageProperties = []; |
38
|
|
|
|
39
|
3 |
|
/** |
40
|
3 |
|
* Create new RabbitMqDriver with provided channel. |
41
|
3 |
|
* |
42
|
3 |
|
* You have to create connection to rabbit, and setup queue outside of this class. |
43
|
|
|
* Handling connection to rabbit is up to you and you have to manage it. |
44
|
|
|
* |
45
|
|
|
* @see examples/rabbitmq folder |
46
|
|
|
* |
47
|
3 |
|
* @param AMQPChannel $channel |
48
|
|
|
* @param string $queue |
49
|
3 |
|
* @param array $amqpMessageProperties |
50
|
3 |
|
*/ |
51
|
3 |
|
public function __construct(AMQPChannel $channel, string $queue, array $amqpMessageProperties = []) |
52
|
|
|
{ |
53
|
|
|
$this->channel = $channel; |
54
|
|
|
$this->queue = $queue; |
55
|
|
|
$this->amqpMessageProperties = $amqpMessageProperties; |
56
|
|
|
$this->serializer = new MessageSerializer(); |
57
|
|
|
} |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* {@inheritdoc} |
61
|
|
|
*/ |
62
|
|
|
public function send(MessageInterface $message, int $priority = Dispatcher::PRIORITY_MEDIUM): bool |
63
|
|
|
{ |
64
|
|
|
$rabbitMessage = new AMQPMessage($this->serializer->serialize($message), $this->amqpMessageProperties); |
65
|
|
|
$this->channel->basic_publish($rabbitMessage, '', $this->queue); |
66
|
|
|
return true; |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
public function setupPriorityQueue(string $name, int $priority): void |
70
|
|
|
{ |
71
|
|
|
throw new \Exception("AmazonSQS is not supporting priority queues now"); |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* {@inheritdoc} |
76
|
|
|
*/ |
77
|
|
|
public function wait(Closure $callback, array $priorities): void |
78
|
|
|
{ |
79
|
|
|
$this->channel->basic_consume( |
80
|
|
|
$this->queue, |
81
|
|
|
'', |
82
|
|
|
false, |
83
|
|
|
false, |
84
|
|
|
false, |
85
|
|
|
false, |
86
|
|
|
function ($rabbitMessage) use ($callback) { |
87
|
|
|
$message = $this->serializer->unserialize($rabbitMessage->body); |
88
|
|
|
$callback($message); |
89
|
|
|
$rabbitMessage->delivery_info['channel']->basic_ack($rabbitMessage->delivery_info['delivery_tag']); |
90
|
|
|
} |
91
|
|
|
); |
92
|
|
|
|
93
|
|
|
while (count($this->channel->callbacks)) { |
94
|
|
|
$this->channel->wait(); |
95
|
|
|
} |
96
|
|
|
} |
97
|
|
|
} |
98
|
|
|
|