Test Failed
Push — master ( f95edb...01c8bf )
by Daniel
03:03
created

RpcClient::forceReconnect()   B

Complexity

Conditions 4
Paths 7

Size

Total Lines 23
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 23
rs 8.7972
c 0
b 0
f 0
cc 4
eloc 19
nc 7
nop 3
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\Transport\Rpc;
4
5
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPChannel;
6
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnection;
7
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnectionInterface;
8
use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
9
use Cmobi\RabbitmqBundle\Queue\CmobiAMQPMessage;
10
use Cmobi\RabbitmqBundle\Queue\QueueProducerInterface;
11
use Cmobi\RabbitmqBundle\Transport\Exception\QueueNotFoundException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Ramsey\Uuid\Uuid;
14
15
class RpcClient implements QueueProducerInterface
16
{
17
    private $connectionName;
18
    private $connectionManager;
19
    private $fromName;
20
    private $queueName;
21
    private $response;
22
    private $logOutput;
23
    private $errOutput;
24
    private $correlationId;
25
    private $callbackQueue;
26
27
    public function __construct($queueName, ConnectionManager $manager, $fromName, $connectionName = 'default')
28
    {
29
        $this->connectionName = $connectionName;
30
        $this->queueName = $queueName;
31
        $this->fromName = $fromName;
32
        $this->connectionManager = $manager;
33
        $this->logOutput = fopen('php://stdout', 'a+');
34
        $this->errOutput = fopen('php://stderr', 'a+');
35
    }
36
37
    /**
38
     * @param AMQPMessage $rep
39
     */
40
    public function onResponse(AMQPMessage $rep)
41
    {
42
        if ($rep->get('correlation_id') === $this->correlationId) {
43
            $this->response = $rep->getBody();
44
        }
45
    }
46
47
    public function createCallbackQueue(CmobiAMQPChannel $channel, $expire, $corralationId = null, $sufix = null)
48
    {
49
        $this->correlationId = is_null($corralationId) ? $this->generateCorrelationId() : $corralationId;
50
        $sufix = is_null($sufix) ? Uuid::uuid4()->toString() . microtime() : $sufix;
51
        $queueBag = new RpcQueueBag(
52
            sprintf(
53
                'callback_to_%s_from_%s_%s',
54
                $this->getQueueName(),
55
                $this->getFromName(),
56
                $sufix
57
            )
58
        );
59
        $queueBag->setArguments([
60
            'x-expires' => ['I', $expire],
61
        ]);
62
        list($callbackQueue) = $channel->queueDeclare($queueBag->getQueueDeclare());
63
        $this->callbackQueue = $callbackQueue;
64
        $consumeQueueBag = new RpcQueueBag($callbackQueue);
65
66
        $channel->basicConsume(
67
            $consumeQueueBag->getQueueConsume(),
68
            [$this, 'onResponse']
69
        );
70
71
        return $callbackQueue;
72
    }
73
74
    /**
75
     * @param $data
76
     * @param int $expire
77
     * @param int $priority
78
     * @throws QueueNotFoundException
79
     * @throws \Cmobi\RabbitmqBundle\Connection\Exception\NotFoundAMQPConnectionFactoryException
80
     */
81
    public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PRIORITY_LOW)
82
    {
83
        $this->response = null;
84
        $connection = $this->connectionManager->getConnection($this->connectionName);
85
        $channel = $connection->channel();
86
87
        if (! $this->queueHasExists($channel)) {
88
            throw new QueueNotFoundException("Queue $this->queueName not declared.");
89
        }
90
        $this->createCallbackQueue($channel, $expire);
91
        $msg = new CmobiAMQPMessage(
92
            (string) $data,
93
            [
94
                'correlation_id' => $this->correlationId,
95
                'reply_to' => $this->callbackQueue,
96
                'priority' => $priority,
97
            ]
98
        );
99
        $channel->basic_publish($msg, '', $this->getQueueName());
100
101
        while (! $this->response) {
102
            try {
103
                $channel->wait(null, 0, ($expire / 1000));
0 ignored issues
show
Documentation introduced by
0 is of type integer, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
104
            } catch (\Exception $e) {
105
                fwrite($this->errOutput, $e->getMessage());
106
                $connection = $this->forceReconnect($connection, $expire, $this->correlationId);
107
                $channel = $connection->channel();
108
109
                continue;
110
            }
111
        }
112
        $channel->close();
113
        $connection->close();
114
    }
115
116
    /**
117
     * @return bool
118
     */
119
    /**
120
     * @param CmobiAMQPChannel $channel
121
     * @return bool
122
     */
123 View Code Duplication
    public function queueHasExists(CmobiAMQPChannel $channel)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
124
    {
125
        try {
126
            $channel->queue_declare($this->queueName, true);
127
        } catch (\Exception $e) {
128
            return false;
129
        }
130
131
        return true;
132
    }
133
134
    /**
135
     * @return string
136
     */
137
    public function getQueueName()
138
    {
139
        return $this->queueName;
140
    }
141
142
    /**
143
     * @return string
144
     */
145
    public function getFromName()
146
    {
147
        return $this->fromName;
148
    }
149
150
    /**
151
     * @todo unecessary method set, its only exists to run tests whitout stay jailed in infinite while waiting response.
152
     *
153
     * @param $content
154
     */
155
    public function setResponse($content)
156
    {
157
        $this->response = $content;
158
    }
159
160
    /**
161
     * @return string
162
     */
163
    public function getResponse()
164
    {
165
        return $this->response;
166
    }
167
168
    /** @return string */
169
    public function generateCorrelationId()
170
    {
171
        return uniqid($this->getQueueName()) . Uuid::uuid4()->toString() . microtime();
172
    }
173
174
    /**
175
     * @return string
176
     */
177
    public function getCurrentCorrelationId()
178
    {
179
        return $this->correlationId;
180
    }
181
182
    /**
183
     * @return string
184
     */
185
    public function getExchange()
186
    {
187
        return false;
0 ignored issues
show
Bug Best Practice introduced by
The return type of return false; (false) is incompatible with the return type declared by the interface Cmobi\RabbitmqBundle\Que...rInterface::getExchange of type string.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
188
    }
189
190
    /**
191
     * @return string
192
     */
193
    public function getExchangeType()
194
    {
195
        return false;
0 ignored issues
show
Bug Best Practice introduced by
The return type of return false; (false) is incompatible with the return type declared by the interface Cmobi\RabbitmqBundle\Que...erface::getExchangeType of type string.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
196
    }
197
198
    /**
199
     * @return ConnectionManager
200
     */
201
    public function getConnectionManager()
202
    {
203
        return $this->connectionManager;
204
    }
205
206
    /**
207
     * @param CmobiAMQPConnectionInterface $connection
208
     * @param $expire
209
     * @param $corralationId
210
     * @return CmobiAMQPConnectionInterface
211
     */
212
    public function forceReconnect(CmobiAMQPConnectionInterface $connection, $expire, $corralationId)
213
    {
214
        do {
215
            $tries = 0;
216
            try {
217
                $connection->close();
218
                sleep(2);
219
                $failed = false;
220
                fwrite($this->logOutput, 'start RpcClient::forceReconnect() - trying connect...' . PHP_EOL);
221
                $connection = $this->getConnectionManager()->getConnection($this->connectionName);
222
                $channel = $connection->channel();
223
                $this->createCallbackQueue($channel, $expire, $corralationId);
224
            } catch (\Exception $e) {
225
                $tries++;
226
                $failed = true;
227
                sleep(3);
228
                fwrite($this->errOutput, 'failed RpcClient::forceReconnect() - ' . $e->getMessage() . PHP_EOL);
229
            }
230
        } while ($failed || $tries > 2);
231
        fwrite($this->logOutput, 'RpcClient::forceReconnect() - connected!' . PHP_EOL);
232
233
        return $connection;
234
    }
235
}
236