SystemVReceive::receive()   B
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 37
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 37
c 0
b 0
f 0
rs 8.8571
cc 3
eloc 23
nc 3
nop 0
1
<?php
2
3
namespace Saxulum\MessageQueue\SystemV;
4
5
use Saxulum\MessageQueue\AbstractMessageReceive;
6
use Saxulum\MessageQueue\MessageInterface;
7
use Saxulum\MessageQueue\MessageReceiveException;
8
9
final class SystemVReceive extends AbstractMessageReceive
10
{
11
    /**
12
     * @var string
13
     */
14
    private $messageClass;
15
16
    /**
17
     * @var resource
18
     */
19
    private $queue;
20
21
    /**
22
     * @var int
23
     */
24
    private $type;
25
26
    /**
27
     * @var int
28
     */
29
    private $maxBytesPerMessage;
30
31
    /**
32
     * @param string $messageClass
33
     * @param int    $key
34
     * @param int    $type
35
     * @param int    $qbytes
36
     */
37
    public function __construct(string $messageClass, int $key, int $type = 1, int $qbytes = 16384)
38
    {
39
        $this->messageClass = $messageClass;
40
        $this->queue = msg_get_queue($key);
41
        $this->type = $type;
42
        $this->maxBytesPerMessage = $qbytes / 2;
43
44
        msg_set_queue($this->queue, ['msg_qbytes' => $qbytes]);
45
    }
46
47
    /**
48
     * @return null|MessageInterface
49
     *
50
     * @throws MessageReceiveException
51
     */
52
    public function receive()
53
    {
54
        $type = null;
55
        $json = null;
56
        $errorCode = null;
57
58
        $status = msg_receive(
59
            $this->queue,
60
            $this->type,
61
            $type,
62
            $this->maxBytesPerMessage,
63
            $json,
64
            false,
65
            MSG_IPC_NOWAIT,
66
            $errorCode
67
        );
68
69
        if (false === $status) {
70
            // we do not wait for a message (prevent lock)
71
            if (MSG_ENOMSG === $errorCode) {
72
                return null;
73
            }
74
75
            throw new MessageReceiveException(
76
                sprintf(
77
                    MessageReceiveException::MESSAGE_RECEIVE_FAILED,
78
                    sprintf(' (SystemV error code %d)', $errorCode)
79
                ),
80
                MessageReceiveException::CODE_RECEIVE_FAILED
81
            );
82
        }
83
84
        /** @var MessageInterface $messageClass */
85
        $messageClass = $this->messageClass;
86
87
        return $messageClass::fromJson($json);
88
    }
89
}
90