Completed
Push — master ( b145e1...9cf3bd )
by Tomas
05:23
created

RabbitMqDriver::send()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 3
Bugs 0 Features 2
Metric Value
c 3
b 0
f 2
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Tomaj\Hermes\Driver;
4
5
use Closure;
6
use Exception;
7
use Tomaj\Hermes\MessageInterface;
8
use Tomaj\Hermes\MessageSerializer;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
12
class RabbitMqDriver implements DriverInterface
13
{
14
    use SerializerAwareTrait;
15
16
    /**
17
     * @var AMQPChannel
18
     */
19
    private $channel;
20
21
    /**
22
     * @var string
23
     */
24
    private $queue;
25
    
26
    /**
27
     * Create new RabbitMqDriver with provided channel.
28
     *
29
     * You have to create connection to rabbit, and setup queue outside of this class.
30
     * Handling connection to rabbit is up to you and you have to manage it.
31
     *
32
     * @see examples/rabbitmq folder
33
     *
34
     * @param AMQPChannel   $channel
35
     * @param string        $queue
36
     */
37 3
    public function __construct(AMQPChannel $channel, $queue)
38
    {
39 3
        $this->channel = $channel;
40 3
        $this->queue = $queue;
41 3
        $this->serializer = new MessageSerializer();
42 3
    }
43
44
    /**
45
     * {@inheritdoc}
46
     */
47 3
    public function send(MessageInterface $message)
48
    {
49 3
        $rabbitMessage = new AMQPMessage($this->serializer->serialize($message));
50 3
        $this->channel->basic_publish($rabbitMessage, '', $this->queue);
51 3
    }
52
53
    /**
54
     * {@inheritdoc}
55
     */
56
    public function wait(Closure $callback)
57
    {
58
        $this->channel->basic_consume(
59
            $this->queue,
60
            '',
61
            false,
62
            true,
63
            false,
64
            false,
65
            function ($rabbitMessage) use ($callback) {
66
                $message = $this->serializer->unserialize($rabbitMessage->body);
67
                $callback($message);
68
                $rabbitMessage->delivery_info['channel']->basic_ack($rabbitMessage->delivery_info['delivery_tag']);
69
            }
70
        );
71
72
        while (count($this->channel->callbacks)) {
73
            $this->channel->wait();
74
        }
75
    }
76
}
77