RpcClient   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 109
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 11
dl 0
loc 109
ccs 45
cts 45
cp 1
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A addRequest() 0 20 3
A getQueueName() 0 8 2
A getReplies() 0 14 2
A processMessage() 0 8 2
A setSerializer() 0 4 1
A getSerializer() 0 4 1
1
<?php
2
3
namespace RabbitMqModule;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
use Zend\Serializer\Adapter\AdapterInterface as SerializerInterface;
7
8
/**
9
 * Class RpcClient
10
 * @package RabbitMqModule
11
 */
12
class RpcClient extends BaseAmqp
13
{
14
    /**
15
     * @var int
16
     */
17
    protected $requests = 0;
18
    /**
19
     * @var array
20
     */
21
    protected $replies = [];
22
    /**
23
     * @var int
24
     */
25
    protected $timeout = 0;
26
    /**
27
     * @var string
28
     */
29
    protected $queueName;
30
    /**
31
     * @var SerializerInterface
32
     */
33
    protected $serializer;
34
35
    /**
36
     * @param mixed  $body
37
     * @param string $server
38
     * @param mixed  $requestId
39
     * @param string $routingKey
40
     * @param int    $expiration
41
     */
42 1
    public function addRequest($body, $server, $requestId, $routingKey = '', $expiration = 0)
43
    {
44 1
        if ($this->serializer) {
45 1
            $body = $this->serializer->serialize($body);
46 1
        }
47 1
        $msg = new AMQPMessage($body, [
48 1
            'content_type' => 'text/plain',
49 1
            'reply_to' => $this->getQueueName(),
50 1
            'delivery_mode' => 1, // non durable
51 1
            'expiration' => $expiration * 1000,
52 1
            'correlation_id' => $requestId,
53 1
        ]);
54 1
        $this->getChannel()->basic_publish($msg, $server, $routingKey);
55
56 1
        ++$this->requests;
57
58 1
        if ($expiration > $this->timeout) {
59 1
            $this->timeout = $expiration;
60 1
        }
61 1
    }
62
63
    /**
64
     * @return string
65
     */
66 1
    protected function getQueueName()
67
    {
68 1
        if (null === $this->queueName) {
69 1
            list($this->queueName) = $this->getChannel()->queue_declare('', false, false, true, false);
70 1
        }
71
72 1
        return $this->queueName;
73
    }
74
75
    /**
76
     * @return array
77
     */
78 1
    public function getReplies()
79
    {
80 1
        $this->replies = [];
81 1
        $consumer_tag = $this->getChannel()
82 1
            ->basic_consume($this->getQueueName(), '', false, true, false, false, [$this, 'processMessage']);
83 1
        while (count($this->replies) < $this->requests) {
84 1
            $this->getChannel()->wait(null, false, $this->timeout);
85 1
        }
86 1
        $this->getChannel()->basic_cancel($consumer_tag);
87 1
        $this->requests = 0;
88 1
        $this->timeout = 0;
89
90 1
        return $this->replies;
91
    }
92
93
    /**
94
     * @param AMQPMessage $message
95
     */
96 1
    public function processMessage(AMQPMessage $message)
97
    {
98 1
        $messageBody = $message->body;
99 1
        if ($this->serializer) {
100 1
            $messageBody = $this->serializer->unserialize($messageBody);
101 1
        }
102 1
        $this->replies[$message->get('correlation_id')] = $messageBody;
103 1
    }
104
105
    /**
106
     * @param SerializerInterface|null $serializer
107
     */
108 2
    public function setSerializer(SerializerInterface $serializer = null)
109
    {
110 2
        $this->serializer = $serializer;
111 2
    }
112
113
    /**
114
     * @return SerializerInterface
115
     */
116 1
    public function getSerializer()
117
    {
118 1
        return $this->serializer;
119
    }
120
}
121