Passed
Push — master ( d34844...01c7f3 )
by Mihai
03:03
created

RpcClient::getReplies()   A

Complexity

Conditions 3
Paths 10

Size

Total Lines 21
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 4.125

Importance

Changes 7
Bugs 0 Features 0
Metric Value
cc 3
eloc 13
c 7
b 0
f 0
nc 10
nop 0
dl 0
loc 21
rs 9.8333
ccs 6
cts 12
cp 0.5
crap 4.125
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
7
class RpcClient extends BaseAmqp
8
{
9
    protected $requests = 0;
10
    protected $replies = array();
11
    protected $expectSerializedResponse;
12
    protected $timeout = 0;
13
    protected $notifyCallback;
14
15
    private $queueName;
16
    private $unserializer = 'unserialize';
17
    private $directReplyTo;
18
    private $directConsumerTag;
19
20 2
    public function initClient($expectSerializedResponse = true)
21
    {
22 2
        $this->expectSerializedResponse = $expectSerializedResponse;
23 2
    }
24
25 1
    public function addRequest($msgBody, $server, $requestId = null, $routingKey = '', $expiration = 0)
26
    {
27 1
        if (empty($requestId)) {
28
            throw new \InvalidArgumentException('You must provide a $requestId');
29
        }
30
31 1
        if (0 == $this->requests) {
32
            // On first addRequest() call, clear all replies
33 1
            $this->replies = array();
34
35 1
            if ($this->directReplyTo) {
36
                // On direct reply-to mode, make initial consume call
37
                $this->directConsumerTag = $this->getChannel()->basic_consume('amq.rabbitmq.reply-to', '', false, true, false, false, array($this, 'processMessage'));
38
            }
39
        }
40
41 1
        $msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain',
42 1
                                               'reply_to' => $this->directReplyTo
43
                                                   ? 'amq.rabbitmq.reply-to' // On direct reply-to mode, use predefined queue name
44 1
                                                   : $this->getQueueName(),
45 1
                                               'delivery_mode' => 1, // non durable
46 1
                                               'expiration' => $expiration*1000,
47 1
                                               'correlation_id' => $requestId));
48
49 1
        $this->getChannel()->basic_publish($msg, $server, $routingKey);
50
51 1
        $this->requests++;
52
53 1
        if ($expiration > $this->timeout) {
54
            $this->timeout = $expiration;
55
        }
56 1
    }
57
58 1
    public function getReplies()
59
    {
60 1
        if ($this->directReplyTo) {
61
            $consumer_tag = $this->directConsumerTag;
62
        } else {
63 1
            $consumer_tag = $this->getChannel()->basic_consume($this->getQueueName(), '', false, true, false, false, array($this, 'processMessage'));
64
        }
65
66
        try {
67 1
            while (count($this->replies) < $this->requests) {
68 1
                $this->getChannel()->wait(null, false, $this->timeout);
69
            }
70
        } finally {
71 1
            $this->getChannel()->basic_cancel($consumer_tag);
72
        }
73
74
        $this->directConsumerTag = null;
75
        $this->requests = 0;
76
        $this->timeout = 0;
77
78
        return $this->replies;
79
    }
80
81 2
    public function processMessage(AMQPMessage $msg)
82
    {
83 2
        $messageBody = $msg->body;
84 2
        if ($this->expectSerializedResponse) {
85 1
            $messageBody = call_user_func($this->unserializer, $messageBody);
86
        }
87 2
        if ($this->notifyCallback !== null) {
88 1
            call_user_func($this->notifyCallback, $messageBody);
89
        }
90
91 2
        $this->replies[$msg->get('correlation_id')] = $messageBody;
92 2
    }
93
94 1
    protected function getQueueName()
95
    {
96 1
        if (null === $this->queueName) {
97 1
            list($this->queueName, ,) = $this->getChannel()->queue_declare("", false, false, true, false);
98
        }
99
100 1
        return $this->queueName;
101
    }
102
103 1
    public function setUnserializer($unserializer)
104
    {
105 1
        $this->unserializer = $unserializer;
106 1
    }
107
108 2
    public function notify($callback)
109
    {
110 2
        if (is_callable($callback)) {
111 1
            $this->notifyCallback = $callback;
112
        } else {
113 1
            throw new \InvalidArgumentException('First parameter expects to be callable');
114
        }
115 1
    }
116
117
    public function setDirectReplyTo($directReplyTo)
118
    {
119
        $this->directReplyTo = $directReplyTo;
120
    }
121
122
    public function reset()
123
    {
124
        $this->replies = array();
125
        $this->requests = 0;
126
    }
127
}
128