Completed
Push — master ( ba5678...9c5ab4 )
by Dominik
06:28 queued 04:14
created

SystemVReceive::receive()   B

Complexity

Conditions 3
Paths 3

Size

Total Lines 31
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 31
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 19
nc 3
nop 0
1
<?php
2
3
namespace Saxulum\MessageQueue\SystemV;
4
5
use Saxulum\MessageQueue\MessageInterface;
6
use Saxulum\MessageQueue\MessageReceiveInterface;
7
8
final class SystemVReceive implements MessageReceiveInterface
9
{
10
    /**
11
     * @var string
12
     */
13
    private $messageClass;
14
15
    /**
16
     * @var resource
17
     */
18
    private $queue;
19
20
    /**
21
     * @var int
22
     */
23
    private $type;
24
25
    /**
26
     * @var int
27
     */
28
    private $qbytes;
29
30
    /**
31
     * @param string $messageClass
32
     * @param int $key
33
     * @param int $type
34
     * @param int $qbytes
35
     */
36
    public function __construct(string $messageClass, int $key, int $type = 1, int $qbytes = 16384)
37
    {
38
        $this->messageClass = $messageClass;
39
        $this->queue = msg_get_queue($key);
40
        $this->type = $type;
41
        $this->qbytes = $qbytes;
42
43
        msg_set_queue($this->queue, ['msg_qbytes' => $this->qbytes]);
44
    }
45
46
    /**
47
     * @return null|MessageInterface
48
     *
49
     * @throws \Exception
50
     */
51
    public function receive()
52
    {
53
        $type = null;
54
        $json = null;
55
        $errorCode = null;
56
57
        $status = msg_receive(
58
            $this->queue,
59
            $this->type,
60
            $type,
61
            $this->qbytes / 2,
62
            $json,
63
            false,
64
            MSG_IPC_NOWAIT,
65
            $errorCode
66
        );
67
68
        if (false === $status) {
69
            // we do not wait for a message (prevent lock)
70
            if (MSG_ENOMSG === $errorCode) {
71
                return null;
72
            }
73
74
            throw new \Exception(sprintf('Can\'t receive message, error code %d', $errorCode), $errorCode);
75
        }
76
77
        /** @var MessageInterface $messageClass */
78
        $messageClass = $this->messageClass;
79
80
        return $messageClass::fromJson($json);
81
    }
82
}
83