Completed
Push — master ( b145e1...9cf3bd )
by Tomas
05:23
created

RabbitMqDriver   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 65
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 33.33%

Importance

Changes 10
Bugs 2 Features 4
Metric Value
wmc 4
c 10
b 2
f 4
lcom 1
cbo 5
dl 0
loc 65
ccs 9
cts 27
cp 0.3333
rs 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A send() 0 5 1
A wait() 0 20 2
1
<?php
2
3
namespace Tomaj\Hermes\Driver;
4
5
use Closure;
6
use Exception;
7
use Tomaj\Hermes\MessageInterface;
8
use Tomaj\Hermes\MessageSerializer;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
12
class RabbitMqDriver implements DriverInterface
13
{
14
    use SerializerAwareTrait;
15
16
    /**
17
     * @var AMQPChannel
18
     */
19
    private $channel;
20
21
    /**
22
     * @var string
23
     */
24
    private $queue;
25
    
26
    /**
27
     * Create new RabbitMqDriver with provided channel.
28
     *
29
     * You have to create connection to rabbit, and setup queue outside of this class.
30
     * Handling connection to rabbit is up to you and you have to manage it.
31
     *
32
     * @see examples/rabbitmq folder
33
     *
34
     * @param AMQPChannel   $channel
35
     * @param string        $queue
36
     */
37 3
    public function __construct(AMQPChannel $channel, $queue)
38
    {
39 3
        $this->channel = $channel;
40 3
        $this->queue = $queue;
41 3
        $this->serializer = new MessageSerializer();
42 3
    }
43
44
    /**
45
     * {@inheritdoc}
46
     */
47 3
    public function send(MessageInterface $message)
48
    {
49 3
        $rabbitMessage = new AMQPMessage($this->serializer->serialize($message));
50 3
        $this->channel->basic_publish($rabbitMessage, '', $this->queue);
51 3
    }
52
53
    /**
54
     * {@inheritdoc}
55
     */
56
    public function wait(Closure $callback)
57
    {
58
        $this->channel->basic_consume(
59
            $this->queue,
60
            '',
61
            false,
62
            true,
63
            false,
64
            false,
65
            function ($rabbitMessage) use ($callback) {
66
                $message = $this->serializer->unserialize($rabbitMessage->body);
67
                $callback($message);
68
                $rabbitMessage->delivery_info['channel']->basic_ack($rabbitMessage->delivery_info['delivery_tag']);
69
            }
70
        );
71
72
        while (count($this->channel->callbacks)) {
73
            $this->channel->wait();
74
        }
75
    }
76
}
77