MessageQueue::receive()   A
last analyzed

Complexity

Conditions 3
Paths 4

Size

Total Lines 18
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 18
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 13
nc 4
nop 2
1
<?php
2
/**
3
 * Process Library
4
 * @author Tao <[email protected]>
5
 */
6
namespace Slince\Process\SystemV;
7
8
use Slince\Process\Exception\RuntimeException;
9
10
class MessageQueue
11
{
12
    use IpcKeyTrait;
13
14
    /**
15
     * The resource that can be used to access to the system v message queue
16
     * @var resource
17
     */
18
    protected $mqId;
19
20
    /**
21
     * The message type
22
     * @var int
23
     */
24
    protected $messageType;
25
26
    protected $unserialize = true;
27
28
    public function __construct($messageType = 1, $pathname = __FILE__, $mode = 0666)
29
    {
30
        $this->messageType = $messageType;
31
        $ipcKey = $this->generateIpcKey($pathname);
32
        $this->mqId = msg_get_queue($ipcKey, $mode);
33
    }
34
35
    /**
36
     * Sends the message to the queue
37
     * @param string $message
38
     * @param bool $blocking
39
     * @return bool
40
     */
41
    public function send($message, $blocking = true)
42
    {
43
        if (!msg_send(
44
            $this->mqId,
45
            $this->messageType,
46
            $message,
47
            $this->unserialize,
48
            $blocking,
49
            $errorCode
50
        )
51
        ) {
52
            throw new RuntimeException("Failed to send the message to the queue", $errorCode);
53
        }
54
        return true;
55
    }
56
57
    /**
58
     * Gets the message from the queue
59
     * @param bool $blocking
60
     * @param int $maxSize The max size you want receive(Unit:bytes)
61
     * @return string|false
62
     */
63
    public function receive($blocking = true, $maxSize = 10240)
64
    {
65
        $flags = $blocking ? 0 : MSG_IPC_NOWAIT;
66
        if (msg_receive(
67
            $this->mqId,
68
            $this->messageType,
69
            $realMessageType,
70
            $maxSize,
71
            $message,
72
            $this->unserialize,
73
            $flags,
74
            $errorCode
75
        )
76
        ) {
77
            return $message;
78
        }
79
        return false;
80
    }
81
82
    /**
83
     * Sets information for the queue
84
     * @param array $states
85
     * @return bool
86
     */
87
    public function setStates(array $states)
88
    {
89
        return msg_set_queue($this->mqId, $states);
90
    }
91
92
    /**
93
     * Sets information for the queue by specifying the key and value
94
     * @param string $key
95
     * @param string|int $value
96
     * @return bool
97
     */
98
    public function setState($key, $value)
99
    {
100
        return $this->setStates([
101
            $key => $value
102
        ]);
103
    }
104
105
    /**
106
     * Gets the information of the queue
107
     *
108
     * The return value is an array whose keys and values have the following meanings:
109
     * Array structure for msg_stat_queue
110
     * msg_perm.uid The uid of the owner of the queue.
111
     * msg_perm.gid The gid of the owner of the queue.
112
     * msg_perm.mode The file access mode of the queue.
113
     * msg_stime The time that the last message was sent to the queue.
114
     * msg_rtime The time that the last message was received from the queue.
115
     * msg_ctime The time that the queue was last changed.
116
     * msg_qnum The number of messages waiting to be read from the queue.
117
     * msg_qbytes The maximum number of bytes allowed in one message queue. On Linux, this value may be read and modified via /proc/sys/kernel/msgmnb.
118
     * msg_lspid The pid of the process that sent the last message to the queue.
119
     * msg_lrpid The pid of the process that received the last message from the queue.
120
     *
121
     * @return array
122
     */
123
    public function getState()
124
    {
125
        return msg_stat_queue($this->mqId);
126
    }
127
}
128