Completed
Push — developer ( 8100a7...c24331 )
by Никита
40:55 queued 13:43
created

Queue::unserialize()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 2
crap 1
1
<?php
2
3
    namespace NokitaKaze\Queue;
4
5
    use \NokitaKaze\Serializer\Serializer;
6
7
    class Queue extends AbstractQueueTransport {
8
        const ProducerThreadCount = 5;
9
        const DefaultDBFileCount = 10;
10
        const DefaultMessageFolderSubfolderCount = 10;
11
12
        const StorageTemporary = 0;
13
        const StoragePersistent = 1;
14
15
        /**
16
         * @var QueueTransport
17
         */
18
        protected $_general_queue = null;
19
20
        /**
21
         * @var QueueTransport[]
22
         */
23
        protected $_additional_queue = [];
24
25
        /**
26
         * @var GeneralQueueConstructionSettings|object $_settings
27
         */
28
        protected $_settings = null;
29
30
        /**
31
         * SmartQueue constructor.
32
         *
33
         * @param GeneralQueueConstructionSettings|object $settings
34
         *
35
         * @throws QueueException
36
         */
37 8
        function __construct($settings) {
38 8
            $this->_settings = $settings;
39 8
            foreach ($settings->queues as $queue_settings) {
40
                /**
41
                 * @var string|QueueTransport $queue_class_name
42
                 * @var QueueTransport        $queue
43
                 */
44 8
                if (substr($queue_settings->class_name, 0, 1) == '\\') {
45
                    $queue_class_name = $queue_settings->class_name.'QueueTransport';
46
                } else {
47 8
                    $queue_class_name = __NAMESPACE__.'\\'.$queue_settings->class_name.'QueueTransport';
48
                }
49 8
                $queue = new $queue_class_name($queue_settings);
50
51 8
                if (isset($queue_settings->is_general) and $queue_settings->is_general) {
52 8
                    if (isset($this->_general_queue)) {
53
                        throw new QueueException('Two or more general queues');
54
                    }
55 8
                    $this->_general_queue = $queue;
56 8
                    $this->_additional_queue[] = clone $queue;
57
                } else {
58 8
                    $this->_additional_queue[] = $queue;
59
                }
60
            }
61 8
            if (is_null($this->_general_queue)) {
62
                throw new QueueException('Can not get general queue');
63
            }
64
            // @todo дописать
65 8
        }
66
67
        /**
68
         * @return string
69
         */
70 174
        static function generate_rnd_postfix() {
71
            // mt_rand. **ДЁШЕВО** и сердито
72 174
            return mt_rand(1000000, 9999999).mt_rand(1000000, 9999999);
73
74
            // return RandomString::generate(6, RandomString::INCLUDE_NUMERIC | RandomString::INCLUDE_LOWER_LETTERS);
75
        }
76
77
        /**
78
         * Формируем сообщение для очереди
79
         *
80
         * @param mixed       $data
81
         * @param string|null $name Название сообщения. Если null, то можно дублировать
82
         * @param integer     $sort
83
         *
84
         * @return iMessage|object
85
         */
86 174
        static function build_message($data, $name = null, $sort = 5) {
87
            return (object) [
88 174
                'name' => is_null($name) ? null : (string) $name,
89 174
                'data' => $data,
90 174
                'time_created' => microtime(true),
91 174
                'time_rnd_postfix' => is_null($name) ? static::generate_rnd_postfix() : null,
92 174
                'time_last_update' => microtime(true),
93 174
                'sort' => min(max($sort, 0), self::DefaultDBFileCount - 1),
94
                'is_read' => false,
95
            ];
96
        }
97
98
        /**
99
         * @param iMessage|object $object
100
         *
101
         * @return iMessage|object
102
         * @throws QueueException
103
         */
104 177
        static function sanify_event_object($object) {
105 177
            $ret = clone $object;
106
            /** @noinspection PhpParamsInspection */
107 177
            if (!array_key_exists('name', $ret)) {
108 1
                $ret->name = null;
109
            }
110
            /** @noinspection PhpParamsInspection */
111 177
            if (!array_key_exists('data', $ret)) {
112 1
                throw new QueueException('Datum does not have field data', 12);
113
            }
114 177
            if (!isset($ret->sort)) {
115 1
                $ret->sort = 5;
116
            } else {
117 176
                $ret->sort = min(max($ret->sort, 0), Queue::DefaultDBFileCount - 1);
118
            }
119
120 177
            return $ret;
121
        }
122
123
        /**
124
         * Cloning sub queues
125
         */
126 79
        function __clone() {
127 79
            parent::__clone();
128 79
            $this->_general_queue = clone $this->_general_queue;
129 79
            foreach ($this->_additional_queue as &$sub_queue) {
130 79
                $sub_queue = clone $sub_queue;
131
            }
132 79
        }
133
134
        /**
135
         * implements Transport
136
         */
137
138
        /**
139
         * @param iMessage|object $message
140
         *
141
         * @return string
142
         */
143 230
        static function get_real_key_for_message($message) {
144 230
            return !is_null($message->name)
145 185
                ? $message->name
146 219
                : sprintf('_%s%s',
147 219
                    number_format($message->time_created, 7, '.', ''),
148 230
                    isset($message->time_rnd_postfix) ? '_'.$message->time_rnd_postfix : ''
149
                );
150
        }
151
152 2
        function get_queue_name() {
153 2
            return $this->_settings->name;
154
        }
155
156 1
        static function is_support_sorted_events() {
157 1
            return false;
158
        }
159
160 5
        function produce_message($data, $name = null, $sort = 5) {
161 5
            $this->set_same_time_flag(1);
162 5
            $this->_general_queue->produce_message($data, $name, $sort);
163 5
        }
164
165 62
        function save() {
166 62
            $this->set_same_time_flag(1);
167 62
            $this->_general_queue->push($this->_pushed_for_save);
168 62
            $this->_pushed_for_save = [];
169 62
            $this->_general_queue->save();
170 62
        }
171
172
        function set_exclusive_mode($mode) {
173
            $this->_general_queue->set_exclusive_mode($mode);
174
            foreach ($this->_additional_queue as $queue) {
175
                $queue->set_exclusive_mode($mode);
176
            }
177
        }
178
179
        /**
180
         * @var iMessage[]|object[]
181
         */
182
        protected $_next_messages = [];
183
184 73
        function consume_next_message($wait_time = -1) {
185 73
            $this->set_same_time_flag(2);
186 73
            if (!empty($this->_next_messages)) {
187 65
                return array_shift($this->_next_messages);
188
            }
189 73
            $ts_start = microtime(true);
190 73
            $till_time = $ts_start + $wait_time;
191
192 73
            $first_run = false;
193
            do {
194 73
                if ($first_run) {
195 15
                    usleep($this->_settings->sleep_time_while_consuming * 1000000);
196
                } else {
197 73
                    $first_run = true;
198
                }
199 73
                $messages = [];
200 73
                foreach ($this->_additional_queue as $queue) {
201 73
                    while ($message = $queue->consume_next_message(0)) {
202 73
                        $key = self::get_real_key_for_message($message);
203 73
                        if (isset($this->_consumed_keys[$key])) {
204 21
                            continue;
205
                        }
206 73
                        $message->is_read = true;
207 73
                        $messages[] = $message;
208 73
                        $this->_consumed_keys[$key] = 1;
209
                    }
210
211 73
                    if (!empty($messages)) {
212 73
                        $this->_next_messages = $messages;
213 73
                        if (!$queue->is_equal_to($this->_general_queue)) {
214 25
                            $this->_general_queue->push($messages);
215 25
                            $this->_general_queue->save();
216 25
                            $queue->delete_messages($messages);
217
                        }
218
219 73
                        return array_shift($this->_next_messages);
220
                    }
221
                }
222 68
                unset($message, $key, $queue);
223 68
            } while (($wait_time == -1) or (microtime(true) <= $till_time));
224
225 68
            return null;
226
        }
227
228
        /**
229
         * Обновляем сообщение и сразу же сохраняем всё
230
         *
231
         * Эта функция не рейзит ошибку, если сообщение не найдено
232
         *
233
         * @param iMessage|object $message
234
         * @param string|null     $key форсированно задаём ключ сообщения
235
         *
236
         * @return boolean
237
         */
238 40
        function update_message($message, $key = null) {
239 40
            $this_key = !is_null($key) ? $key : self::get_real_key_for_message($message);
240 40
            foreach ($this->_next_messages as $exists_message) {
241
                if (self::get_real_key_for_message($exists_message) == $this_key) {
242
                    $exists_message->data = $message->data;
243
                    $exists_message->time_last_update = microtime(true);
244
                    break;
245
                }
246
            }
247
248 40
            return $this->_general_queue->update_message($message, $key);
249
        }
250
251 1
        function clear_consumed_keys() {
252 1
            $this->_consumed_keys = [];
253 1
            foreach ($this->_additional_queue as $queue) {
254 1
                $queue->clear_consumed_keys();
255
            }
256 1
        }
257
258
        /**
259
         * Удалить сообщения и сразу же записать это в БД
260
         *
261
         * @param iMessage[]|object[] $messages
262
         *
263
         * @return string[]|integer[]
264
         */
265 20
        function delete_messages(array $messages) {
266 20
            $deleted_keys = [];
267 20
            foreach ($this->_additional_queue as $queue) {
268 20
                $deleted_keys = array_merge($deleted_keys, $queue->delete_messages($messages));
269
            }
270
271 20
            return array_unique($deleted_keys);
272
        }
273
274
        /**
275
         * @param Queue $queue
276
         *
277
         * @return boolean
278
         */
279 1
        function is_equal_to($queue) {
280 1
            if (spl_object_hash($this) == spl_object_hash($queue)) {
281 1
                return true;
282
            }
283 1
            if (!parent::is_equal_to($queue)) {
284
                return false;
285
            }
286 1
            if (!$this->_general_queue->is_equal_to($queue->_general_queue)) {
287
                return false;
288
            }
289 1
            foreach ($this->_additional_queue as $i => $sub_queue) {
290 1
                if (!$sub_queue->is_equal_to($queue->_additional_queue[$i])) {
291 1
                    return false;
292
                }
293
            }
294
295 1
            return true;
296
        }
297
298
        /**
299
         * @param mixed $data
300
         *
301
         * @return string
302
         */
303 186
        static function serialize($data) {
304 186
            return Serializer::serialize($data);
305
        }
306
307
        /**
308
         * @param string  $string
309
         * @param boolean $is_valid
310
         *
311
         * @return mixed
312
         */
313 186
        static function unserialize($string, &$is_valid) {
314 186
            return Serializer::unserialize($string, $is_valid);
315
        }
316
    }
317
318
?>