RpcClient::getCurrentCorrelationId()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\Transport\Rpc;
4
5
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPChannel;
6
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnection;
7
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnectionInterface;
8
use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
9
use Cmobi\RabbitmqBundle\Queue\CmobiAMQPMessage;
10
use Cmobi\RabbitmqBundle\Queue\QueueProducerInterface;
11
use Cmobi\RabbitmqBundle\Transport\Exception\QueueNotFoundException;
12
use PhpAmqpLib\Exception\AMQPTimeoutException;
13
use PhpAmqpLib\Message\AMQPMessage;
14
use Ramsey\Uuid\Uuid;
15
16
class RpcClient implements QueueProducerInterface
17
{
18
    private $connectionName;
19
    private $connectionManager;
20
    private $fromName;
21
    private $queueName;
22
    private $response;
23
    private $logOutput;
24
    private $errOutput;
25
    private $correlationId;
26
    private $callbackQueue;
27
28
    public function __construct($queueName, ConnectionManager $manager, $fromName, $connectionName = 'default')
29
    {
30
        $this->connectionName = $connectionName;
31
        $this->queueName = $queueName;
32
        $this->fromName = $fromName;
33
        $this->connectionManager = $manager;
34
        $this->logOutput = fopen('php://stdout', 'a+');
35
        $this->errOutput = fopen('php://stderr', 'a+');
36
    }
37
38
    /**
39
     * @param AMQPMessage $rep
40
     */
41
    public function onResponse(AMQPMessage $rep)
42
    {
43
        if ($rep->get('correlation_id') === $this->correlationId) {
44
            $this->response = $rep->getBody();
45
        }
46
    }
47
48
    public function createCallbackQueue(CmobiAMQPChannel $channel, $expire, $sufix, $corralationId = null)
49
    {
50
        $this->correlationId = is_null($corralationId) ? $this->generateCorrelationId() : $corralationId;
51
        $queueBag = new RpcQueueBag(
52
            sprintf(
53
                'callback_to_%s_from_%s_%s',
54
                $this->getQueueName(),
55
                $this->getFromName(),
56
                $sufix
57
            )
58
        );
59
        $queueBag->setArguments([
60
            'x-expires' => ['I', $expire],
61
        ]);
62
        list($callbackQueue) = $channel->queueDeclare($queueBag->getQueueDeclare());
63
        $this->callbackQueue = $callbackQueue;
64
        $consumeQueueBag = new RpcQueueBag($callbackQueue);
65
66
        $channel->basicConsume(
67
            $consumeQueueBag->getQueueConsume(),
68
            [$this, 'onResponse']
69
        );
70
71
        return $callbackQueue;
72
    }
73
74
    /**
75
     * @param $data
76
     * @param int $expire
77
     * @param int $priority
78
     * @throws QueueNotFoundException
79
     * @throws \Cmobi\RabbitmqBundle\Connection\Exception\NotFoundAMQPConnectionFactoryException
80
     */
81
    public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PRIORITY_LOW)
82
    {
83
        $this->response = null;
84
        $connection = $this->connectionManager->getConnection($this->connectionName);
85
        $channel = $connection->channel();
86
87
        if (! $this->queueHasExists($channel)) {
88
            throw new QueueNotFoundException("Queue $this->queueName not declared.");
89
        }
90
        $sufix = Uuid::uuid4()->toString() . microtime();
91
        $this->createCallbackQueue($channel, $expire, $sufix);
92
        $msg = new CmobiAMQPMessage(
93
            (string) $data,
94
            [
95
                'correlation_id' => $this->correlationId,
96
                'reply_to' => $this->callbackQueue,
97
                'priority' => $priority,
98
            ]
99
        );
100
        $channel->basic_publish($msg, '', $this->getQueueName());
101
102
        while (! $this->response) {
103
            try {
104
                $channel->wait(null, 0, ($expire / 1000));
0 ignored issues
show
Documentation introduced by
0 is of type integer, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
105
            } catch (\Exception $e) {
106
                fwrite($this->errOutput, $e->getMessage());
107
                break;
108
            }
109
        }
110
        $channel->close();
111
        $connection->close();
112
    }
113
114
    /**
115
     * @return bool
116
     */
117
    /**
118
     * @param CmobiAMQPChannel $channel
119
     * @return bool
120
     */
121 View Code Duplication
    public function queueHasExists(CmobiAMQPChannel $channel)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
122
    {
123
        try {
124
            $channel->queue_declare($this->queueName, true);
125
        } catch (\Exception $e) {
126
            return false;
127
        }
128
129
        return true;
130
    }
131
132
    /**
133
     * @return string
134
     */
135
    public function getQueueName()
136
    {
137
        return $this->queueName;
138
    }
139
140
    /**
141
     * @return string
142
     */
143
    public function getFromName()
144
    {
145
        return $this->fromName;
146
    }
147
148
    /**
149
     * @todo unecessary method set, its only exists to run tests whitout stay jailed in infinite while waiting response.
150
     *
151
     * @param $content
152
     */
153
    public function setResponse($content)
154
    {
155
        $this->response = $content;
156
    }
157
158
    /**
159
     * @return string
160
     */
161
    public function getResponse()
162
    {
163
        return $this->response;
164
    }
165
166
    /** @return string */
167
    public function generateCorrelationId()
168
    {
169
        return uniqid($this->getQueueName()) . Uuid::uuid4()->toString() . microtime();
170
    }
171
172
    /**
173
     * @return string
174
     */
175
    public function getCurrentCorrelationId()
176
    {
177
        return $this->correlationId;
178
    }
179
180
    /**
181
     * @return string
182
     */
183
    public function getExchange()
184
    {
185
        return false;
0 ignored issues
show
Bug Best Practice introduced by
The return type of return false; (false) is incompatible with the return type declared by the interface Cmobi\RabbitmqBundle\Que...rInterface::getExchange of type string.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
186
    }
187
188
    /**
189
     * @return string
190
     */
191
    public function getExchangeType()
192
    {
193
        return false;
0 ignored issues
show
Bug Best Practice introduced by
The return type of return false; (false) is incompatible with the return type declared by the interface Cmobi\RabbitmqBundle\Que...erface::getExchangeType of type string.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
194
    }
195
196
    /**
197
     * @return ConnectionManager
198
     */
199
    public function getConnectionManager()
200
    {
201
        return $this->connectionManager;
202
    }
203
204
    /**
205
     * @param CmobiAMQPConnectionInterface $connection
206
     * @param $expire
207
     * @param $sufix
208
     * @param $corralationId
209
     * @return CmobiAMQPConnectionInterface
210
     */
211
    public function forceReconnect(CmobiAMQPConnectionInterface $connection, $expire, $sufix, $corralationId)
212
    {
213
        try {
214
            $connection->close();
215
            sleep(2);
216
            fwrite($this->logOutput, 'start RpcClient::forceReconnect() - trying connect...' . PHP_EOL);
217
            $connection = $this->getConnectionManager()->getConnection($this->connectionName);
218
            $channel = $connection->channel();
219
            $this->createCallbackQueue($channel, $expire, $sufix, $corralationId);
220
        } catch (\Exception $e) {
221
            fwrite($this->errOutput, 'failed RpcClient::forceReconnect() - ' . $e->getMessage() . PHP_EOL);
222
223
            return $connection;
224
        }
225
        fwrite($this->logOutput, 'RpcClient::forceReconnect() - connected!' . PHP_EOL);
226
227
        return $connection;
228
    }
229
}
230