Queue::delete_messages()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 1
1
<?php
2
3
    namespace NokitaKaze\Queue;
4
5
    use \NokitaKaze\Serializer\Serializer;
6
7
    class Queue extends AbstractQueueTransport {
8
        const ProducerThreadCount = 5;
9
        const DefaultDBFileCount = 10;
10
        const DefaultMessageFolderSubfolderCount = 10;
11
12
        const StorageTemporary = 0;
13
        const StoragePersistent = 1;
14
15
        /**
16
         * @var QueueTransport
17
         */
18
        protected $_general_queue = null;
19
20
        /**
21
         * @var QueueTransport[]
22
         */
23
        protected $_additional_queue = [];
24
25
        /**
26
         * @var GeneralQueueConstructionSettings|object $_settings
27
         */
28
        protected $_settings = null;
29
30
        /**
31
         * SmartQueue constructor.
32
         *
33
         * @param GeneralQueueConstructionSettings|object $settings
34
         *
35
         * @throws QueueException
36
         */
37
        function __construct($settings) {
38
            $this->_settings = $settings;
39
            foreach ($settings->queues as $queue_settings) {
40
                /**
41
                 * @var string|QueueTransport $queue_class_name
42
                 * @var QueueTransport        $queue
43
                 */
44
                if (substr($queue_settings->class_name, 0, 1) == '\\') {
45
                    $queue_class_name = $queue_settings->class_name.'QueueTransport';
46
                } else {
47
                    $queue_class_name = __NAMESPACE__.'\\'.$queue_settings->class_name.'QueueTransport';
48
                }
49
                $queue = new $queue_class_name($queue_settings);
50
51
                if (isset($queue_settings->is_general) and $queue_settings->is_general) {
52
                    if (isset($this->_general_queue)) {
53
                        throw new QueueException('Two or more general queues');
54
                    }
55
                    $this->_general_queue = $queue;
56
                    $this->_additional_queue[] = clone $queue;
57
                } else {
58
                    $this->_additional_queue[] = $queue;
59
                }
60
            }
61
            if (is_null($this->_general_queue)) {
62
                throw new QueueException('Can not get general queue');
63
            }
64
            // @todo дописать
65
        }
66
67
        /**
68
         * @return string
69
         */
70
        static function generate_rnd_postfix() {
71
            // mt_rand. **ДЁШЕВО** и сердито
72
            return mt_rand(1000000, 9999999).mt_rand(1000000, 9999999);
73
74
            // return RandomString::generate(6, RandomString::INCLUDE_NUMERIC | RandomString::INCLUDE_LOWER_LETTERS);
75
        }
76
77
        /**
78
         * Формируем сообщение для очереди
79
         *
80
         * @param mixed       $data
81
         * @param string|null $name Название сообщения. Если null, то можно дублировать
82
         * @param integer     $sort
83
         *
84
         * @return iMessage|object
85
         */
86
        static function build_message($data, $name = null, $sort = 5) {
87
            return (object) [
88
                'name' => is_null($name) ? null : (string) $name,
89
                'data' => $data,
90
                'time_created' => microtime(true),
91
                'time_rnd_postfix' => is_null($name) ? static::generate_rnd_postfix() : null,
92
                'time_last_update' => microtime(true),
93
                'sort' => min(max($sort, 0), self::DefaultDBFileCount - 1),
94
                'is_read' => false,
95
            ];
96
        }
97
98
        /**
99
         * @param iMessage|object $object
100
         *
101
         * @return iMessage|object
102
         * @throws QueueException
103
         */
104
        static function sanify_event_object($object) {
105
            $ret = clone $object;
106
            /** @noinspection PhpParamsInspection */
107
            if (!array_key_exists('name', $ret)) {
108
                $ret->name = null;
109
            }
110
            /** @noinspection PhpParamsInspection */
111
            if (!array_key_exists('data', $ret)) {
112
                throw new QueueException('Datum does not have field data', 12);
113
            }
114
            if (!isset($ret->sort)) {
115
                $ret->sort = 5;
116
            } else {
117
                $ret->sort = min(max($ret->sort, 0), Queue::DefaultDBFileCount - 1);
118
            }
119
120
            return $ret;
121
        }
122
123
        /**
124
         * Cloning sub queues
125
         */
126
        function __clone() {
127
            parent::__clone();
128
            $this->_general_queue = clone $this->_general_queue;
129
            foreach ($this->_additional_queue as &$sub_queue) {
130
                $sub_queue = clone $sub_queue;
131
            }
132
        }
133
134
        /**
135
         * implements Transport
136
         */
137
138
        /**
139
         * @param iMessage|object $message
140
         *
141
         * @return string
142
         */
143
        static function get_real_key_for_message($message) {
144
            return !is_null($message->name)
145
                ? $message->name
146
                : sprintf('_%s%s',
147
                    number_format($message->time_created, 7, '.', ''),
148
                    isset($message->time_rnd_postfix) ? '_'.$message->time_rnd_postfix : ''
149
                );
150
        }
151
152
        function get_queue_name() {
153
            return $this->_settings->name;
154
        }
155
156
        static function is_support_sorted_events() {
157
            return false;
158
        }
159
160
        function produce_message($data, $name = null, $sort = 5) {
161
            $this->set_same_time_flag(1);
162
            $this->_general_queue->produce_message($data, $name, $sort);
163
        }
164
165
        function save() {
166
            $this->set_same_time_flag(1);
167
            $this->_general_queue->push($this->_pushed_for_save);
168
            $this->_pushed_for_save = [];
169
            $this->_general_queue->save();
170
        }
171
172
        function set_exclusive_mode($mode) {
173
            $this->_general_queue->set_exclusive_mode($mode);
174
            foreach ($this->_additional_queue as $queue) {
175
                $queue->set_exclusive_mode($mode);
176
            }
177
        }
178
179
        /**
180
         * @var iMessage[]|object[]
181
         */
182
        protected $_next_messages = [];
183
184
        function consume_next_message($wait_time = -1) {
185
            $this->set_same_time_flag(2);
186
            if (!empty($this->_next_messages)) {
187
                return array_shift($this->_next_messages);
188
            }
189
            $ts_start = microtime(true);
190
            $till_time = $ts_start + $wait_time;
191
192
            $first_run = false;
193
            do {
194
                if ($first_run) {
195
                    usleep($this->_settings->sleep_time_while_consuming * 1000000);
196
                } else {
197
                    $first_run = true;
198
                }
199
                $messages = [];
200
                foreach ($this->_additional_queue as $queue) {
201
                    while ($message = $queue->consume_next_message(0)) {
202
                        $key = self::get_real_key_for_message($message);
203
                        if (isset($this->_consumed_keys[$key])) {
204
                            continue;
205
                        }
206
                        $message->is_read = true;
207
                        $messages[] = $message;
208
                        $this->_consumed_keys[$key] = 1;
209
                    }
210
211
                    if (!empty($messages)) {
212
                        $this->_next_messages = $messages;
213
                        if (!$queue->is_equal_to($this->_general_queue)) {
214
                            $this->_general_queue->push($messages);
215
                            $this->_general_queue->save();
216
                            $queue->delete_messages($messages);
217
                        }
218
219
                        return array_shift($this->_next_messages);
220
                    }
221
                }
222
                unset($message, $key, $queue);
223
            } while (($wait_time == -1) or (microtime(true) <= $till_time));
224
225
            return null;
226
        }
227
228
        /**
229
         * Обновляем сообщение и сразу же сохраняем всё
230
         *
231
         * Эта функция не рейзит ошибку, если сообщение не найдено
232
         *
233
         * @param iMessage|object $message
234
         * @param string|null     $key форсированно задаём ключ сообщения
235
         *
236
         * @return boolean
237
         */
238
        function update_message($message, $key = null) {
239
            $this_key = !is_null($key) ? $key : self::get_real_key_for_message($message);
240
            foreach ($this->_next_messages as $exists_message) {
241
                if (self::get_real_key_for_message($exists_message) == $this_key) {
242
                    $exists_message->data = $message->data;
243
                    $exists_message->time_last_update = microtime(true);
244
                    break;
245
                }
246
            }
247
248
            return $this->_general_queue->update_message($message, $key);
249
        }
250
251
        function clear_consumed_keys() {
252
            $this->_consumed_keys = [];
253
            foreach ($this->_additional_queue as $queue) {
254
                $queue->clear_consumed_keys();
255
            }
256
        }
257
258
        /**
259
         * Удалить сообщения и сразу же записать это в БД
260
         *
261
         * @param iMessage[]|object[] $messages
262
         *
263
         * @return string[]|integer[]
264
         */
265
        function delete_messages(array $messages) {
266
            $deleted_keys = [];
267
            foreach ($this->_additional_queue as $queue) {
268
                $deleted_keys = array_merge($deleted_keys, $queue->delete_messages($messages));
269
            }
270
271
            return array_unique($deleted_keys);
272
        }
273
274
        /**
275
         * @param Queue $queue
276
         *
277
         * @return boolean
278
         */
279
        function is_equal_to($queue) {
280
            if (spl_object_hash($this) == spl_object_hash($queue)) {
281
                return true;
282
            }
283
            if (!parent::is_equal_to($queue)) {
284
                return false;
285
            }
286
            if (!$this->_general_queue->is_equal_to($queue->_general_queue)) {
287
                return false;
288
            }
289
            foreach ($this->_additional_queue as $i => $sub_queue) {
290
                if (!$sub_queue->is_equal_to($queue->_additional_queue[$i])) {
291
                    return false;
292
                }
293
            }
294
295
            return true;
296
        }
297
298
        /**
299
         * @param mixed $data
300
         *
301
         * @return string
302
         */
303
        static function serialize($data) {
304
            return Serializer::serialize($data);
305
        }
306
307
        /**
308
         * @param string  $string
309
         * @param boolean $is_valid
310
         *
311
         * @return mixed
312
         */
313
        static function unserialize($string, &$is_valid) {
314
            return Serializer::unserialize($string, $is_valid);
315
        }
316
    }
317
318
?>