1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Tomaj\Hermes\Driver; |
4
|
|
|
|
5
|
|
|
use Closure; |
6
|
|
|
use PhpAmqpLib\Channel\AMQPChannel; |
7
|
|
|
use PhpAmqpLib\Connection\AMQPLazyConnection; |
8
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
9
|
|
|
use Tomaj\Hermes\MessageInterface; |
10
|
|
|
use Tomaj\Hermes\MessageSerializer; |
11
|
|
|
|
12
|
|
|
class LazyRabbitMqDriver implements DriverInterface |
13
|
|
|
{ |
14
|
|
|
use SerializerAwareTrait; |
15
|
|
|
|
16
|
|
|
/** @var AMQPLazyConnection */ |
17
|
|
|
private $connection; |
18
|
|
|
|
19
|
|
|
/** @var AMQPChannel */ |
20
|
|
|
private $channel; |
21
|
|
|
|
22
|
|
|
/** @var string */ |
23
|
|
|
private $queue; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @param AMQPLazyConnection $connection |
27
|
|
|
* @param string $queue |
28
|
|
|
*/ |
29
|
|
|
public function __construct(AMQPLazyConnection $connection, $queue) |
30
|
|
|
{ |
31
|
|
|
$this->connection = $connection; |
32
|
|
|
$this->queue = $queue; |
33
|
|
|
$this->serializer = new MessageSerializer(); |
34
|
|
|
} |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* {@inheritdoc} |
38
|
|
|
*/ |
39
|
|
|
public function send(MessageInterface $message) |
40
|
|
|
{ |
41
|
|
|
$rabbitMessage = new AMQPMessage($this->serializer->serialize($message)); |
42
|
|
|
$this->getChannel()->basic_publish($rabbitMessage, '', $this->queue); |
43
|
|
|
} |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* {@inheritdoc} |
47
|
|
|
*/ |
48
|
|
|
public function wait(Closure $callback) |
49
|
|
|
{ |
50
|
|
|
$this->getChannel()->basic_consume( |
51
|
|
|
$this->queue, |
52
|
|
|
'', |
53
|
|
|
false, |
54
|
|
|
true, |
55
|
|
|
false, |
56
|
|
|
false, |
57
|
|
|
function ($rabbitMessage) use ($callback) { |
58
|
|
|
$message = $this->serializer->unserialize($rabbitMessage->body); |
59
|
|
|
$callback($message); |
60
|
|
|
} |
61
|
|
|
); |
62
|
|
|
|
63
|
|
|
while (count($this->getChannel()->callbacks)) { |
64
|
|
|
$this->getChannel()->wait(); |
65
|
|
|
} |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
private function getChannel() |
69
|
|
|
{ |
70
|
|
|
if ($this->channel) { |
71
|
|
|
return $this->channel; |
72
|
|
|
} |
73
|
|
|
$this->channel = $this->connection->channel(); |
74
|
|
|
$this->channel->queue_declare($this->queue, false, false, false, false); |
75
|
|
|
return $this->channel; |
76
|
|
|
} |
77
|
|
|
} |
78
|
|
|
|