Completed
Push — master ( 097436...6c4dd2 )
by Tomas
7s
created

LazyRabbitMqDriver   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 66
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 0%

Importance

Changes 1
Bugs 0 Features 1
Metric Value
wmc 6
c 1
b 0
f 1
lcom 1
cbo 6
dl 0
loc 66
ccs 0
cts 38
cp 0
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A send() 0 5 1
A wait() 0 19 2
A getChannel() 0 9 2
1
<?php
2
3
namespace Tomaj\Hermes\Driver;
4
5
use Closure;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AMQPLazyConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use Tomaj\Hermes\MessageInterface;
10
use Tomaj\Hermes\MessageSerializer;
11
12
class LazyRabbitMqDriver implements DriverInterface
13
{
14
    use SerializerAwareTrait;
15
16
    /** @var AMQPLazyConnection */
17
    private $connection;
18
    
19
    /** @var AMQPChannel */
20
    private $channel;
21
22
    /** @var string */
23
    private $queue;
24
    
25
    /**
26
     * @param AMQPLazyConnection $connection
27
     * @param string $queue
28
     */
29
    public function __construct(AMQPLazyConnection $connection, $queue)
30
    {
31
        $this->connection = $connection;
32
        $this->queue = $queue;
33
        $this->serializer = new MessageSerializer();
34
    }
35
36
    /**
37
     * {@inheritdoc}
38
     */
39
    public function send(MessageInterface $message)
40
    {
41
        $rabbitMessage = new AMQPMessage($this->serializer->serialize($message));
42
        $this->getChannel()->basic_publish($rabbitMessage, '', $this->queue);
43
    }
44
45
    /**
46
     * {@inheritdoc}
47
     */
48
    public function wait(Closure $callback)
49
    {
50
        $this->getChannel()->basic_consume(
51
            $this->queue,
52
            '',
53
            false,
54
            true,
55
            false,
56
            false,
57
            function ($rabbitMessage) use ($callback) {
58
                $message = $this->serializer->unserialize($rabbitMessage->body);
59
                $callback($message);
60
            }
61
        );
62
63
        while (count($this->getChannel()->callbacks)) {
64
            $this->getChannel()->wait();
65
        }
66
    }
67
    
68
    private function getChannel()
69
    {
70
        if ($this->channel) {
71
            return $this->channel;
72
        }
73
        $this->channel = $this->connection->channel();
74
        $this->channel->queue_declare($this->queue, false, false, false, false);
75
        return $this->channel;
76
    }
77
}
78