Completed
Push — master ( 1fce20...c903a9 )
by Daniel
03:12
created

RpcClient::forceReconnect()   A

Complexity

Conditions 3
Paths 6

Size

Total Lines 20
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 20
rs 9.4285
cc 3
eloc 16
nc 6
nop 3
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\Transport\Rpc;
4
5
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPChannel;
6
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnection;
7
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnectionInterface;
8
use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
9
use Cmobi\RabbitmqBundle\Queue\CmobiAMQPMessage;
10
use Cmobi\RabbitmqBundle\Queue\QueueProducerInterface;
11
use Cmobi\RabbitmqBundle\Transport\Exception\QueueNotFoundException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Ramsey\Uuid\Uuid;
14
15
class RpcClient implements QueueProducerInterface
16
{
17
    private $connectionName;
18
    private $connectionManager;
19
    private $fromName;
20
    private $queueName;
21
    private $response;
22
    private $logOutput;
23
    private $errOutput;
24
    private $correlationId;
25
    private $callbackQueue;
26
27
    public function __construct($queueName, ConnectionManager $manager, $fromName, $connectionName = 'default')
28
    {
29
        $this->connectionName = $connectionName;
30
        $this->queueName = $queueName;
31
        $this->fromName = $fromName;
32
        $this->connectionManager = $manager;
33
        $this->logOutput = fopen('php://stdout', 'a+');
34
        $this->errOutput = fopen('php://stderr', 'a+');
35
    }
36
37
    /**
38
     * @param AMQPMessage $rep
39
     */
40
    public function onResponse(AMQPMessage $rep)
41
    {
42
        if ($rep->get('correlation_id') === $this->correlationId) {
43
            $this->response = $rep->getBody();
44
        }
45
    }
46
47
    public function createCallbackQueue(CmobiAMQPChannel $channel, $expire, $corralationId = null)
48
    {
49
        $this->correlationId = is_null($corralationId) ? $this->generateCorrelationId() : $corralationId;
50
        $queueBag = new RpcQueueBag(
51
            sprintf(
52
                'callback_to_%s_from_%s_%s',
53
                $this->getQueueName(),
54
                $this->getFromName(),
55
                Uuid::uuid4()->toString()
56
                . microtime()
57
            )
58
        );
59
        $queueBag->setArguments([
60
            'x-expires' => ['I', $expire],
61
        ]);
62
        list($callbackQueue) = $channel->queueDeclare($queueBag->getQueueDeclare());
63
        $this->callbackQueue = $callbackQueue;
64
65
        $callbackQueue = $this->createCallbackQueue($channel, $expire);
66
        $consumeQueueBag = new RpcQueueBag($callbackQueue);
67
68
        $channel->basicConsume(
69
            $consumeQueueBag->getQueueConsume(),
70
            [$this, 'onResponse']
71
        );
72
73
        return $callbackQueue;
74
    }
75
76
    /**
77
     * @param $data
78
     * @param int $expire
79
     * @param int $priority
80
     * @throws QueueNotFoundException
81
     * @throws \Cmobi\RabbitmqBundle\Connection\Exception\NotFoundAMQPConnectionFactoryException
82
     */
83
    public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PRIORITY_LOW)
84
    {
85
        $this->response = null;
86
        $connection = $this->connectionManager->getConnection($this->connectionName);
87
        $channel = $connection->channel();
88
89
        if (! $this->queueHasExists($channel)) {
90
            throw new QueueNotFoundException("Queue $this->queueName not declared.");
91
        }
92
        $msg = new CmobiAMQPMessage(
93
            (string) $data,
94
            [
95
                'correlation_id' => $this->correlationId,
96
                'reply_to' => $this->callbackQueue,
97
                'priority' => $priority,
98
            ]
99
        );
100
        $channel->basic_publish($msg, '', $this->getQueueName());
101
102
        while (! $this->response) {
103
            try {
104
                $channel->wait(null, 0, ($expire / 1000));
0 ignored issues
show
Documentation introduced by
0 is of type integer, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
105
            } catch (\Exception $e) {
106
                fwrite($this->errOutput, $e->getMessage());
107
                $connection = $this->forceReconnect($connection, $expire, $this->correlationId);
108
                $channel = $connection->channel();
109
110
                continue;
111
            }
112
        }
113
        $channel->close();
114
        $connection->close();
115
    }
116
117
    /**
118
     * @return bool
119
     */
120
    /**
121
     * @param CmobiAMQPChannel $channel
122
     * @return bool
123
     */
124 View Code Duplication
    public function queueHasExists(CmobiAMQPChannel $channel)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
125
    {
126
        try {
127
            $channel->queue_declare($this->queueName, true);
128
        } catch (\Exception $e) {
129
            return false;
130
        }
131
132
        return true;
133
    }
134
135
    /**
136
     * @return string
137
     */
138
    public function getQueueName()
139
    {
140
        return $this->queueName;
141
    }
142
143
    /**
144
     * @return string
145
     */
146
    public function getFromName()
147
    {
148
        return $this->fromName;
149
    }
150
151
    /**
152
     * @todo unecessary method set, its only exists to run tests whitout stay jailed in infinite while waiting response.
153
     *
154
     * @param $content
155
     */
156
    public function setResponse($content)
157
    {
158
        $this->response = $content;
159
    }
160
161
    /**
162
     * @return string
163
     */
164
    public function getResponse()
165
    {
166
        return $this->response;
167
    }
168
169
    /** @return string */
170
    public function generateCorrelationId()
171
    {
172
        return uniqid($this->getQueueName()) . Uuid::uuid4()->toString() . microtime();
173
    }
174
175
    /**
176
     * @return string
177
     */
178
    public function getCurrentCorrelationId()
179
    {
180
        return $this->correlationId;
181
    }
182
183
    /**
184
     * @return string
185
     */
186
    public function getExchange()
187
    {
188
        return false;
0 ignored issues
show
Bug Best Practice introduced by
The return type of return false; (false) is incompatible with the return type declared by the interface Cmobi\RabbitmqBundle\Que...rInterface::getExchange of type string.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
189
    }
190
191
    /**
192
     * @return string
193
     */
194
    public function getExchangeType()
195
    {
196
        return false;
0 ignored issues
show
Bug Best Practice introduced by
The return type of return false; (false) is incompatible with the return type declared by the interface Cmobi\RabbitmqBundle\Que...erface::getExchangeType of type string.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
197
    }
198
199
    /**
200
     * @return ConnectionManager
201
     */
202
    public function getConnectionManager()
203
    {
204
        return $this->connectionManager;
205
    }
206
207
    /**
208
     * @param CmobiAMQPConnectionInterface $connection
209
     * @param $expire
210
     * @param $corralationId
211
     * @return CmobiAMQPConnectionInterface
212
     */
213
    public function forceReconnect(CmobiAMQPConnectionInterface $connection, $expire, $corralationId)
214
    {
215
        do {
216
            try {
217
                $connection->close();
218
                $failed = false;
219
                fwrite($this->logOutput, 'start RpcClient::forceReconnect() - trying connect...' . PHP_EOL);
220
                $connection = $this->getConnectionManager()->getConnection($this->connectionName);
221
                $channel = $connection->channel();
222
                $this->createCallbackQueue($channel, $expire, $corralationId);
223
            } catch (\Exception $e) {
224
                $failed = true;
225
                sleep(3);
226
                fwrite($this->errOutput, 'failed RpcClient::forceReconnect() - ' . $e->getMessage() . PHP_EOL);
227
            }
228
        } while ($failed);
229
        fwrite($this->logOutput, 'RpcClient::forceReconnect() - connected!' . PHP_EOL);
230
231
        return $connection;
232
    }
233
}
234