Native::connect()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 7
nc 2
nop 1
dl 0
loc 10
ccs 7
cts 7
cp 1
crap 2
rs 9.4285
c 0
b 0
f 0
1
<?php
2
namespace FMUP\Queue\Driver;
3
4
use FMUP\Environment;
5
use FMUP\Queue\Channel;
6
use FMUP\Queue\DriverInterface;
7
use FMUP\Queue\Exception;
8
use FMUP\Queue\Message;
9
10
class Native implements DriverInterface, Environment\OptionalInterface
11
{
12
    use Environment\OptionalTrait;
13
14
    const CONFIGURATION_PERM_UID = 'msg_perm.uid';
15
    const CONFIGURATION_PERM_GID = 'msg_perm.gid';
16
    const CONFIGURATION_PERM_MODE = 'msg_perm.mode';
17
    const CONFIGURATION_SENT_TIME = 'msg_stime';
18
    const CONFIGURATION_RECEIVED_TIME = 'msg_rtime';
19
    const CONFIGURATION_UPDATE_TIME = 'msg_ctime';
20
    const CONFIGURATION_MESSAGE_NUMBER = 'msg_qnum';
21
    const CONFIGURATION_MESSAGE_SIZE = 'msg_qbytes';
22
    const CONFIGURATION_SENDER_PID = 'msg_lspid';
23
    const CONFIGURATION_RECEIVER_PID = 'msg_lrpid';
24
25
    /**
26
     * Connect to specified queue
27
     * @param Channel $channel
28
     * @return Channel
29
     * @throws Exception
30
     */
31 6
    public function connect(Channel $channel)
32
    {
33 6
        if (!$channel->hasResource()) {
34 6
            $name = $this->secureQueueName($channel->getName());
35 5
            $channel->setName($name);
36 5
            $resource = $this->msgGetQueue($name);
37 5
            $channel->setResource($resource);
38
        }
39 5
        return $channel;
40
    }
41
42
    /**
43
     * @param string $name
44
     * @return resource
45
     * @codeCoverageIgnore
46
     */
47
    protected function msgGetQueue($name)
48
    {
49
        return msg_get_queue($name);
50
    }
51
52
    /**
53
     * Check if queue exists
54
     * @param Channel $channel
55
     * @return bool
56
     * @throws Exception
57
     */
58 1
    public function exists(Channel $channel)
59
    {
60 1
        $name = (!$channel->hasResource()) ? $this->secureQueueName($channel->getName()) : $channel->getName();
61 1
        return $this->msgQueueExists($name);
62
    }
63
64
    /**
65
     * @param $name
66
     * @return bool
67
     * @codeCoverageIgnore
68
     */
69
    protected function msgQueueExists($name)
70
    {
71
        return msg_queue_exists($name);
72
    }
73
74
    /**
75
     * Get a message from queue
76
     *
77
     * @param Channel $channel
78
     * @param string $messageType
79
     * @return mixed|null
80
     * @throws Exception
81
     */
82 3
    public function pull(Channel $channel, $messageType = null)
83
    {
84 3
        if (!$channel->hasResource()) {
85 3
            $this->connect($channel);
86
        }
87 3
        $messageType = $this->secureMessageType($messageType);
88 2
        $receivedMessageType = 0;
89 2
        $message = null;
90 2
        $error = 0;
91 2
        $messageSize = $this->getMessageSize($channel);
92 2
        $success = $this->msgReceive(
93 2
            $channel->getResource(),
94 2
            $messageType,
95 2
            $receivedMessageType,
96 2
            $messageSize,
97 2
            $message,
98 2
            $channel->getSettings()->getSerialize(),
99 2
            $this->getReceiveFlags($channel),
100 2
            $error
101
        );
102 2
        $isNonBlockReceive = (MSG_IPC_NOWAIT & $this->getParamBlockReceive($channel));
103 2
        $isNonBlockingPlusNoMessage = $isNonBlockReceive && ($error === MSG_ENOMSG);
104 2
        if (!$success && !$isNonBlockingPlusNoMessage) {
105 1
            throw new Exception("Error while receiving message", $error);
106
        }
107 1
        return $message ? (new Message())->setOriginal($message)->setTranslated($message) : null;
108
    }
109
110
    /**
111
     * @param $queue
112
     * @param $desiredMsgType
113
     * @param $msgType
114
     * @param $maxsize
115
     * @param $message
116
     * @param bool|true $unSerialize
117
     * @param int $flags
118
     * @param null $errorCode
119
     * @return bool
120
     * @codeCoverageIgnore
121
     */
122
    protected function msgReceive(
123
        $queue,
124
        $desiredMsgType,
125
        &$msgType,
126
        $maxsize,
127
        &$message,
128
        $unSerialize = true,
129
        $flags = 0,
130
        &$errorCode = null
131
    ) {
132
        return msg_receive($queue, $desiredMsgType, $msgType, $maxsize, $message, $unSerialize, $flags, $errorCode);
133
    }
134
135
    /**
136
     * Retrieve message maximum size for a given queue
137
     * @param Channel $channel
138
     * @return int
139
     */
140 2
    private function getMessageSize(Channel $channel)
141
    {
142 2
        $messageSize = $channel->getSettings()->getMaxMessageSize();
143 2
        if (!$messageSize) {
144 2
            $configuration = $this->getConfiguration($channel->getResource());
145 2
            $messageSize = (int)$configuration[self::CONFIGURATION_MESSAGE_SIZE];
146 2
            $channel->getSettings()->setMaxMessageSize($messageSize);
147
        }
148 2
        return $messageSize;
149
    }
150
151
    /**
152
     * Push a message in queue
153
     * @param Channel $channel
154
     * @param mixed $message
155
     * @param string|int $messageType
156
     * @return bool
157
     * @throws Exception
158
     */
159 2
    public function push(Channel $channel, $message, $messageType = null)
160
    {
161 2
        if (!$channel->hasResource()) {
162 2
            $this->connect($channel);
163
        }
164 2
        $messageType = $this->secureMessageType($messageType);
165 2
        $error = 0;
166 2
        $blockSend = (bool)$channel->getSettings()->getBlockSend();
167 2
        $maxSendRetry = (int)$channel->getSettings()->getMaxSendRetryTime();
168 2
        $serialize = (bool)$channel->getSettings()->getSerialize();
169 2
        $retry = 0;
170 2
        $success = false;
171 2
        while ($retry < $maxSendRetry) {
172 2
            $success = $this->msgSend($channel->getResource(), $messageType, $message, $serialize, $blockSend, $error);
0 ignored issues
show
Bug introduced by
It seems like $error defined by 0 on line 165 can also be of type integer; however, FMUP\Queue\Driver\Native::msgSend() does only seem to accept null, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
173 2
            $retry = ($success || (!$success && $error != MSG_EAGAIN)) ? $maxSendRetry : $retry + 1;
174
        }
175 2
        if (!$success) {
176 1
            throw new Exception("Error while sending message", $error);
177
        }
178 1
        return $success;
179
    }
180
181
    /**
182
     * @param $queue
183
     * @param $msgType
184
     * @param $message
185
     * @param bool|true $serialize
186
     * @param bool|true $blocking
187
     * @param null $errorCode
188
     * @return bool
189
     * @codeCoverageIgnore
190
     */
191
    protected function msgSend($queue, $msgType, $message, $serialize = true, $blocking = true, &$errorCode = null)
192
    {
193
        return msg_send($queue, $msgType, $message, $serialize, $blocking, $errorCode);
194
    }
195
196
    /**
197
     * Secure queue name to be int (due to semaphore)
198
     * @param string|int $name
199
     * @return int
200
     * @throws Exception
201
     */
202 7
    private function secureQueueName($name)
203
    {
204 7
        $name = (int)$this->stringToUniqueId($name);
205 7
        if ($name === 0) {
206 1
            throw new Exception('Queue name must be in INT > 0 to use semaphores');
207
        }
208 6
        return $name;
209
    }
210
211
    /**
212
     * Secure message type
213
     * @param string|int $messageType
214
     * @return int|null
215
     * @throws Exception
216
     */
217 5
    private function secureMessageType($messageType = null)
218
    {
219 5
        if (is_null($messageType)) {
220 4
            $messageType = 1;
221
        }
222 5
        $messageType = (int)$this->stringToUniqueId($messageType);
223 5
        if ($messageType === 0) {
224 1
            throw new Exception('Message Type must be in INT > 0 to use semaphores');
225
        }
226 4
        return $messageType;
227
    }
228
229
    /**
230
     * Convert string to a unique id
231
     * @param string $string
232
     * @return int
233
     */
234 9
    private function stringToUniqueId($string)
235
    {
236 9
        if (is_numeric($string)) {
237 8
            return (int)$string;
238
        }
239 1
        if ($this->hasEnvironment()) {
240 1
            $string .= $this->getEnvironment()->get();
241
        }
242 1
        $length = strlen($string);
243 1
        $return = 0;
244 1 View Code Duplication
        for ($i = 0; $i < $length; $i++) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
245 1
            $return += ord($string{$i});
246
        }
247 1
        return (int)($length . '0' . $return);
248
    }
249
250
    /**
251
     * Get queue configuration
252
     * @param resource $queueResource
253
     * @return array
254
     * @codeCoverageIgnore
255
     */
256
    protected function getConfiguration($queueResource)
257
    {
258
        return msg_stat_queue($queueResource);
259
    }
260
261
    /**
262
     * Define queue configuration
263
     * @param Channel $channel
264
     * @param array $params
265
     * @return bool
266
     */
267 1
    public function setConfiguration(Channel $channel, $params)
268
    {
269 1
        if (isset($params[self::CONFIGURATION_MESSAGE_SIZE])) {
270 1
            $channel->getSettings()->setMaxMessageSize((int)$params[self::CONFIGURATION_MESSAGE_SIZE]);
271
        }
272 1
        return $this->msgSetQueue($channel->getResource(), (array)$params);
273
    }
274
275
    /**
276
     * @param $queue
277
     * @param array $data
278
     * @return bool
279
     * @codeCoverageIgnore
280
     */
281
    protected function msgSetQueue($queue, array $data)
282
    {
283
        return msg_set_queue($queue, $data);
284
    }
285
286
    /**
287
     * Destroy a queue
288
     * @param Channel $channel
289
     * @return bool
290
     */
291 1
    public function destroy(Channel $channel)
292
    {
293 1
        if ($channel->hasResource()) {
294 1
            if ($this->msgRemoveQueue($channel->getResource())) {
295 1
                $channel->setResource(null);
296 1
                return true;
297
            }
298 1
            return false;
299
        }
300 1
        return true;
301
    }
302
303
    /**
304
     * @param $queue
305
     * @return bool
306
     * @codeCoverageIgnore
307
     */
308
    protected function msgRemoveQueue($queue)
309
    {
310
        return msg_remove_queue($queue);
311
    }
312
313
    /**
314
     * Reception options
315
     * @param Channel $channel
316
     * @return int
317
     */
318 2
    private function getReceiveFlags(Channel $channel)
319
    {
320 2
        $modeExcept = 0;
321 2
        $forceSize = 0;
322 2
        $settings = $channel->getSettings();
323 2
        if ($settings instanceof Channel\Settings\Native) {
324 2
            $modeExcept = $settings->getReceiveModeExcept() ? MSG_EXCEPT : 0;
325 2
            $forceSize = $settings->getReceiveForceSize() ? MSG_NOERROR : 0;
326
        }
327 2
        $blockReceive = $this->getParamBlockReceive($channel);
328
329 2
        return $blockReceive | $modeExcept | $forceSize;
330
    }
331
332
    /**
333
     * Check whether block or not on reception
334
     * @param Channel $channel
335
     * @return int
336
     */
337 2
    private function getParamBlockReceive(Channel $channel)
338
    {
339 2
        return $channel->getSettings()->getBlockReceive() ? 0 : MSG_IPC_NOWAIT;
340
    }
341
342
    /**
343
     * @todo factorize this method
344
     * @param Channel $channel
345
     * @return array
346
     */
347 1
    public function getStats(Channel $channel)
348
    {
349 1
        if (!$channel->hasResource()) {
350 1
            $this->connect($channel);
351
        }
352 1
        return $this->getConfiguration($channel->getResource());
353
    }
354
355
    /**
356
     * This methods do nothing since messages are auto-acked in SystemV. Sorry :(
357
     * @param Channel $channel
358
     * @param Message $message
359
     * @return $this
360
     */
361 1
    public function ackMessage(Channel $channel, Message $message)
362
    {
363 1
        return $this;
364
    }
365
}
366