FileDBQueueTransport::get_mutex_folder()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
    namespace NokitaKaze\Queue;
4
5
    use NokitaKaze\Mutex\FileMutex;
6
7
    class FileDBQueueTransport extends AbstractQueueTransport {
8
        const FILE_DB_INDEX_VERSION = 1;
9
        const FILE_DB_CHUNK_VERSION = 1;
10
11
        protected $_folder;
12
        protected $_mutex_folder;
13
        protected $_prefix;
14
        protected $_name;
15
        protected $_db_file_count = Queue::DefaultDBFileCount;
16
        /**
17
         * @var FileDBQueueConstructionSettings|object
18
         */
19
        protected $_construction_settings;
20
21
        /**
22
         * @var FileMutex|null
23
         */
24
        protected $_producer_mutex = null;
25
26
        /**
27
         * @var FileMutex|null
28
         */
29
        protected $_index_mutex = null;
30
31
        /**
32
         * @var integer[][] Данные с индексом
33
         */
34
        protected $_index_data = [];
35
36
        /**
37
         * @var FileDBIndexFile
38
         */
39
        protected $_index_data_full = null;
40
41
        /**
42
         * @var boolean Эксклюзивный режим
43
         */
44
        protected $_exclusive_mode = false;
45
46
        /**
47
         * @param FileDBQueueConstructionSettings|object $settings
48
         *
49
         * @throws QueueException
50
         */
51
        function __construct($settings) {
52
            if (!isset($settings->storage_type)) {
53
                $settings->storage_type = Queue::StorageTemporary;
54
            }
55
            if (!isset($settings->name)) {
56
                throw new QueueException('Settings don\'t have field name', 11);
57
            }
58
            $this->_name = $settings->name;
59
            $this->_construction_settings = $settings;
60
61
            $this->set_folder_settings($settings);
62
            if (isset($settings->prefix)) {
63
                $this->_prefix = $settings->prefix;
64
            } else {
65
                $this->_prefix = '';
66
            }
67
            if (isset($settings->mutex_folder)) {
68
                $this->_mutex_folder = $settings->mutex_folder;
69
            } else {
70
                $this->_mutex_folder = $this->_folder.'/mutex';
71
            }
72
            if (isset($settings->db_file_count)) {
73
                $this->_db_file_count = $settings->db_file_count;
74
            }
75
            $this->_index_data = array_fill(0, $this->_db_file_count, []);
76
        }
77
78
        function __destruct() {
79
            if (!is_null($this->_producer_mutex)) {
80
                $this->_producer_mutex->release_lock();
81
            }
82
            if (!is_null($this->_index_mutex)) {
83
                $this->_index_mutex->release_lock();
84
            }
85
        }
86
87
        function __clone() {
88
            parent::__clone();
89
            $this->_construction_settings = clone $this->_construction_settings;
90
            if (isset($this->_producer_mutex)) {
91
                $this->_producer_mutex = clone $this->_producer_mutex;
92
            }
93
            if (isset($this->_index_mutex)) {
94
                $this->_index_mutex = clone $this->_producer_mutex;
95
            }
96
            if (isset($this->_index_data_full)) {
97
                $this->_index_data_full = clone $this->_index_data_full;
98
            }
99
        }
100
101
        /**
102
         * @param FileDBQueueConstructionSettings|object $settings
103
         *
104
         * @throws QueueException
105
         */
106
        protected function set_folder_settings($settings) {
107
            switch ($settings->storage_type) {
108
                case Queue::StorageTemporary:
109
                    $this->_folder = sys_get_temp_dir();
110
                    break;
111
                case Queue::StoragePersistent:
112
                    $this->_folder = FileMutex::getDirectoryString();
113
                    break;
114
                default:
115
                    throw new QueueException(
116
                        'Constructor settings is malformed. Storage type can not be equal '.
117
                        $settings->storage_type, 1
118
                    );
119
            }
120
            if (isset($settings->folder)) {
121
                $this->_folder = $settings->folder;
122
            }
123
        }
124
125
        /**
126
         * Блокируем index mutex
127
         *
128
         * @param double|integer $time
129
         *
130
         * @return boolean
131
         * @throws \NokitaKaze\Mutex\MutexException
132
         */
133
        protected function index_mutex_lock($time = -1) {
134
            if (!$this->_exclusive_mode) {
135
                return $this->_index_mutex->get_lock($time);
136
            } else {
137
                return true;
138
            }
139
        }
140
141
        /**
142
         * Блокируем index mutex
143
         */
144
        protected function index_mutex_release_lock() {
145
            if (!$this->_exclusive_mode) {
146
                $this->_index_mutex->release_lock();
147
            }
148
        }
149
150
        /**
151
         * Устанавливаем или снимаем монопольный режим
152
         *
153
         * @param boolean $mode
154
         * @throws \NokitaKaze\Mutex\MutexException
155
         */
156
        function set_exclusive_mode($mode) {
157
            $this->init_producer_mutex();
158
            $this->_exclusive_mode = $mode;
159
            if ($mode) {
160
                $this->_index_mutex->get_lock();
161
            } else {
162
                $this->_index_mutex->release_lock();
163
            }
164
        }
165
166
        /**
167
         * @return iMessage[]|object[]
168
         */
169
        protected function get_used_keys_and_real_need_for_save() {
170
            $real_need_save = [];
171
            $used_keys = [];
172
            foreach ($this->_pushed_for_save as &$message) {
173
                if (is_null($message->name)) {
174
                    $real_need_save[] = $message;
175
                    continue;
176
                }
177
                if (in_array($message->name, $used_keys)) {
178
                    continue;
179
                }
180
                foreach ($this->_index_data as $sort_id => &$index_datum) {
181
                    if (array_key_exists($message->name, $index_datum)) {
182
                        continue 2;
183
                    }
184
                }
185
                $used_keys[] = $message->name;
186
                $real_need_save[] = $message;
187
            }
188
189
            return $real_need_save;
190
        }
191
192
        /**
193
         * Сохраняем все сообщения, подготовленные для сохранения, в файл сообщений
194
         *
195
         * @throws QueueException
196
         * @throws \NokitaKaze\Mutex\MutexException
197
         */
198
        function save() {
199
            if (count($this->_pushed_for_save) == 0) {
200
                return;
201
            }
202
            $this->init_producer_mutex();
203
204
            $this->index_mutex_lock();
205
            $this->index_data_load();
206
            $real_need_save = $this->get_used_keys_and_real_need_for_save();
207
            if (count($real_need_save) == 0) {
208
                $this->index_mutex_release_lock();
209
210
                return;
211
            }
212
213
            $this->_producer_mutex->get_lock();
214
            $current_thread_id = static::get_current_producer_thread_id();
215
            $data_in = $this->get_data_for_thread($current_thread_id);
216
            foreach ($real_need_save as &$message) {
217
                if (!isset($message->sort)) {
218
                    $message->sort = 5;
219
                }
220
                // Копируем уже существующий message в data_in
221
                $data_in[] = (object) [
222
                    'name' => $message->name,
223
                    'data' => $message->data,
224
                    'time_created' => $message->time_created,
225
                    'time_last_update' => microtime(true),
226
                    'time_rnd_postfix' => isset($message->time_rnd_postfix) ? $message->time_rnd_postfix : null,
227
                    'sort' => $message->sort,
228
                    'is_read' => false,
229
                    // @codeCoverageIgnoreStart
230
                ];
231
                // @codeCoverageIgnoreEnd
232
                $key = self::get_real_key_for_message($message);
233
                $this->_index_data[$message->sort][$key] = $current_thread_id;
234
            }
235
            $this->write_full_data_to_file($current_thread_id, $data_in);
236
            unset($data_in);
237
238
            $this->_producer_mutex->release_lock();
239
            $this->index_data_save();
240
            $this->index_mutex_release_lock();
241
            $this->_pushed_for_save = [];
242
        }
243
244
        /**
245
         * Берём данные для конкретного внутренного треда в очереди сообщений
246
         *
247
         * @param integer $thread_id
248
         *
249
         * @return iMessage[]|object[]
250
         * @throws QueueException
251
         */
252
        protected function get_data_for_thread($thread_id) {
253
            $filename = $this->get_producer_filename_for_thread($thread_id);
254
            if (!file_exists($filename)) {
255
                return [];
256
            }
257
            if (!is_readable($filename)) {
258
                throw new QueueException('Chunk DB File "'.$filename.'" is not readable/writable');
259
            }
260
            $buf = file_get_contents($filename, LOCK_EX);
261
            if (empty($buf)) {
262
                throw new QueueException('Chunk DB File "'.$filename.'" is malformed');
263
            }
264
            /**
265
             * @var FileDBChunkFile $object
266
             */
267
            $object = Queue::unserialize($buf, $is_valid);
268
            if (!is_object($object)) {
269
                throw new QueueException('Chunk DB File "'.$filename.'" is malformed');
270
            }
271
            if ($object->version != self::FILE_DB_CHUNK_VERSION) {
0 ignored issues
show
Bug introduced by
Accessing version on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
272
                throw new QueueException('Version mismatch ('.
273
                                         $object->version.' instead of '.self::FILE_DB_CHUNK_VERSION.')');
0 ignored issues
show
Bug introduced by
Accessing version on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
274
            }
275
276
            return $object->queue;
0 ignored issues
show
Bug introduced by
Accessing queue on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
277
        }
278
279
        /**
280
         * Внутренний id треда
281
         *
282
         * @return integer
283
         */
284
        protected static function get_current_producer_thread_id() {
285
            return posix_getpid() % Queue::ProducerThreadCount;
286
        }
287
288
        /**
289
         * Берём название файла с данными для конкретного внутренного треда в очереди сообщений
290
         *
291
         * @param integer $thread_id
292
         *
293
         * @return string
294
         */
295
        protected function get_producer_filename_for_thread($thread_id) {
296
            return $this->_folder.'/smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-'.$thread_id.'.que';
297
        }
298
299
        /**
300
         * Инициализируем мьютекс для текущего треда
301
         */
302
        protected function init_producer_mutex() {
303
            if (is_null($this->_producer_mutex)) {
304
                $this->_producer_mutex = $this->get_mutex_for_thread(static::get_current_producer_thread_id());
305
                $this->_index_mutex = $this->get_index_mutex();
306
            }
307
        }
308
309
        /**
310
         * @return string
311
         */
312
        function get_mutex_folder() {
313
            return $this->_mutex_folder;
314
        }
315
316
        /**
317
         * Мьютекс для конкретного треда
318
         *
319
         * @param integer $thread_id
320
         *
321
         * @return FileMutex
322
         * @throws \NokitaKaze\Mutex\MutexException
323
         */
324
        protected function get_mutex_for_thread($thread_id) {
325
            // @todo впилить сюда mutex resolver
326
            /**
327
             * @var \NokitaKaze\Mutex\MutexSettings $settings
328
             */
329
            $settings = (object) [];
330
            $settings->folder = $this->get_mutex_folder();
0 ignored issues
show
Bug introduced by
Accessing folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
331
            FileMutex::create_folders_in_path($settings->folder);
0 ignored issues
show
Bug introduced by
Accessing folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
332
            $settings->name = 'smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-'.$thread_id;
0 ignored issues
show
Bug introduced by
Accessing name on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
333
334
            return new FileMutex($settings);
335
        }
336
337
        /**
338
         * Создание мьютекса для файла с индексом на всю текущую очередь сообщений
339
         *
340
         * @return FileMutex
341
         * @throws \NokitaKaze\Mutex\MutexException
342
         */
343
        protected function get_index_mutex() {
344
            // @todo впилить сюда mutex resolver
345
            /**
346
             * @var \NokitaKaze\Mutex\MutexSettings $settings
347
             */
348
            $settings = (object) [];
349
            $settings->folder = $this->get_mutex_folder();
0 ignored issues
show
Bug introduced by
Accessing folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
350
            FileMutex::create_folders_in_path($settings->folder);
0 ignored issues
show
Bug introduced by
Accessing folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
351
            $settings->name = 'smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-index';
0 ignored issues
show
Bug introduced by
Accessing name on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
352
353
            return new FileMutex($settings);
354
        }
355
356
        /**
357
         * Сохраняем данные в файл внутреннего треда (без индекса)
358
         *
359
         * @param integer             $thread_id
360
         * @param iMessage[]|object[] $data_in
361
         *
362
         * @throws QueueException
363
         */
364
        protected function write_full_data_to_file($thread_id, $data_in) {
365
            $filename = $this->get_producer_filename_for_thread($thread_id);
366
            if (!file_exists(dirname($filename))) {
367
                throw new QueueException('Folder "'.dirname($filename).'" does not exist', 7);
368
            } elseif (!is_dir(dirname($filename))) {
369
                throw new QueueException('Folder "'.dirname($filename).'" is not a folder', 14);
370
            } elseif (!is_writable(dirname($filename))) {
371
                throw new QueueException('Folder "'.dirname($filename).'" is not writable', 8);
372
            } elseif (file_exists($filename) and !is_writable($filename)) {
373
                throw new QueueException('File "'.$filename.'" is not writable', 9);
374
            }
375
            $filename_tmp = $filename.'-'.mt_rand(10000, 99999).'.tmp';
376
            touch($filename_tmp);
377
            // @todo fileperms
378
            chmod($filename_tmp, 6 << 6);
379
380
            /**
381
             * @var FileDBChunkFile $object
382
             */
383
            $object = (object) [];
384
            $object->version = self::FILE_DB_CHUNK_VERSION;
0 ignored issues
show
Bug introduced by
Accessing version on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
385
            $object->time_last_update = microtime(true);
0 ignored issues
show
Bug introduced by
Accessing time_last_update on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
386
            $object->queue_name = $this->get_queue_name();
0 ignored issues
show
Bug introduced by
Accessing queue_name on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
387
            $object->queue = $data_in;
0 ignored issues
show
Bug introduced by
Accessing queue on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
388
            if (@file_put_contents($filename_tmp, Queue::serialize($object), LOCK_EX) === false) {
389
                // @codeCoverageIgnoreStart
390
                throw new QueueException('Can not save queue stream: '.FileMutex::get_last_php_error_as_string(), 2);
391
                // @codeCoverageIgnoreEnd
392
            }
393
            if (!rename($filename_tmp, $filename)) {
394
                // @codeCoverageIgnoreStart
395
                throw new QueueException('Can not rename temporary chunk database file: '.
396
                                         FileMutex::get_last_php_error_as_string(), 15);
397
                // @codeCoverageIgnoreEnd
398
            }
399
        }
400
401
        /**
402
         * Получаем номера DB, в которых лежат сообщения, которые нам надо удалить
403
         *
404
         * @param iMessage[]|object[] $messages
405
         *
406
         * @return string[][]|integer[][]
407
         */
408
        protected function get_keys_for_delete_group_by_thread($messages) {
409
            $keys = [];
410
            foreach ($messages as &$message) {
411
                $keys[] = self::get_real_key_for_message($message);
412
            }
413
            unset($message);
414
415
            $threads_keys = array_fill(0, Queue::ProducerThreadCount, []);
416
            foreach ($this->_index_data as $sort_id => &$index_datum) {
417
                if (empty($index_datum)) {
418
                    continue;
419
                }
420
                foreach ($index_datum as $key => &$thread_id) {
421
                    if (in_array($key, $keys)) {
422
                        $threads_keys[$thread_id][] = $key;
423
                        unset($index_datum[$key]);
424
                    }
425
                }
426
            }
427
428
            return $threads_keys;
429
        }
430
431
        /**
432
         * Удалить сообщения и сразу же записать это в БД
433
         *
434
         * @param iMessage[]|object[] $messages
435
         *
436
         * @return string[]|integer[]
437
         * @throws \NokitaKaze\Mutex\MutexException
438
         * @throws QueueException
439
         */
440
        function delete_messages(array $messages) {
441
            $this->init_producer_mutex();
442
            $this->index_mutex_lock();
443
            $this->index_data_load();
444
445
            $threads_keys = $this->get_keys_for_delete_group_by_thread($messages);
446
447
            $deleted_keys = [];
448
            foreach ($threads_keys as $thread_id => &$thread_keys) {
449
                if (count($thread_keys) == 0) {
450
                    continue;
451
                }
452
                $mutex = $this->get_mutex_for_thread($thread_id);
453
                $mutex->get_lock();
454
                $data_in = $this->get_data_for_thread($thread_id);
455
                $u = false;
456
                foreach ($data_in as $inner_id => &$inner_message) {
457
                    $real_key = self::get_real_key_for_message($inner_message);
458
                    if (in_array($real_key, $thread_keys)) {
459
                        unset($data_in[$inner_id]);
460
                        $deleted_keys[] = $real_key;
461
                        $u = true;
462
                    }
463
                }
464
                if ($u) {
465
                    // @hint Это always true condition. Иначе данные неконсистентны
466
                    if (count($data_in) > 0) {
467
                        $this->write_full_data_to_file($thread_id, $data_in);
468
                    } else {
469
                        unlink($this->get_producer_filename_for_thread($thread_id));
470
                    }
471
                }
472
473
                $mutex->release_lock();
474
                $this->index_data_save();
475
            }
476
            $this->index_mutex_release_lock();
477
478
            return $deleted_keys;
479
        }
480
481
        /**
482
         * Обновляем сообщение и сразу же сохраняем всё
483
         *
484
         * Эта функция не рейзит ошибку, если сообщение не найдено
485
         *
486
         * @param iMessage|object $message
487
         * @param string|null     $key форсированно задаём ключ сообщения
488
         *
489
         * @return boolean
490
         * @throws QueueException
491
         * @throws \NokitaKaze\Mutex\MutexException
492
         */
493
        function update_message($message, $key = null) {
494
            $this->init_producer_mutex();
495
            $this->index_mutex_lock();
496
            $this->index_data_load();
497
498
            if (is_null($key)) {
499
                $key = self::get_real_key_for_message($message);
500
            }
501
            $exists = false;
502
            foreach ($this->_index_data as $index_id => &$index_datum) {
503
                if (!array_key_exists($key, $index_datum)) {
504
                    continue;
505
                }
506
                $thread_id = $index_datum[$key];
507
                $mutex = $this->get_mutex_for_thread($thread_id);
508
                $mutex->get_lock();
509
                $data_in = $this->get_data_for_thread($thread_id);
510
                // Ищем то же сообщение и заменяем его
511
                $u = $this->change_message_in_array($data_in, $message, $key);
512
                if ($u) {
513
                    $exists = true;
514
                }
515
                $this->write_full_data_to_file($thread_id, $data_in);
516
                $mutex->release_lock();
517
                $this->index_data_save();
518
                break;
519
            }
520
            $this->index_mutex_release_lock();
521
522
            return $exists;
523
        }
524
525
        /**
526
         * @param double|integer $wait_time
527
         *
528
         * @return iMessage|object|null
529
         * @throws \NokitaKaze\Mutex\MutexException
530
         * @throws QueueException
531
         */
532
        function consume_next_message($wait_time = -1) {
533
            $this->set_same_time_flag(2);
534
            $start = microtime(true);
535
            $this->init_producer_mutex();
536
            $this->index_mutex_lock();
537
            $this->index_data_load();
538
            while (true) {
539
                for ($sort_id = 0; $sort_id < $this->_db_file_count; $sort_id++) {
540
                    foreach ($this->_index_data[$sort_id] as $key => $thread_id) {
541
                        if (isset($this->_consumed_keys[$key])) {
542
                            continue;
543
                        }
544
                        $mutex = $this->get_mutex_for_thread($thread_id);
545
                        $mutex->get_lock();
546
                        $data_in = $this->get_data_for_thread($thread_id);
547
                        $this->_consumed_keys[$key] = 1;
548
                        foreach ($data_in as $message) {
549
                            $this_key = self::get_real_key_for_message($message);
550
                            if ($this_key == $key) {
551
                                $message->time_consumed = microtime(true);
552
                                $message->thread_consumed = $thread_id;
553
                                $message->queue = $this;
554
                                $mutex->release_lock();
555
                                $this->index_mutex_release_lock();
556
557
                                return $message;
558
                            }
559
                        }
560
                        // @hint Сюда может передаться код только в случае неконсистентности БД, когда в
561
                        // index'е ключ есть, а в data его нет
562
                        unset($mutex);
563
                    }
564
                }
565
                $this->index_mutex_release_lock();
566
                if (($wait_time !== -1) and ($start + $wait_time < microtime(true))) {
567
                    return null;
568
                }
569
                $this->index_mutex_lock();
570
                $this->index_data_load();
571
            }
572
573
            // @hint Это для IDE
574
            // @codeCoverageIgnoreStart
575
            return null;
576
            // @codeCoverageIgnoreEnd
577
        }
578
579
        /**
580
         * @return string
581
         */
582
        function get_queue_name() {
583
            return $this->_name;
584
        }
585
586
        /**
587
         * Поддерживает ли очередь сортировку event'ов при вставке
588
         *
589
         * @return boolean
590
         */
591
        static function is_support_sorted_events() {
592
            return true;
593
        }
594
595
        /**
596
         * Индексы
597
         */
598
599
        /**
600
         * Название файла для содержания даты
601
         *
602
         * @return string
603
         */
604
        protected function get_index_filename() {
605
            return $this->_folder.'/smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-index.que';
606
        }
607
608
        /**
609
         * @throws QueueException
610
         */
611
        protected function index_data_load() {
612
            // @todo Проверять время-размер
613
            $filename = $this->get_index_filename();
614
            if (!file_exists($filename)) {
615
                $this->_index_data = array_fill(0, $this->_db_file_count, []);
616
617
                return;
618
            }
619
            if (!is_readable($filename) or !is_writable($filename)) {
620
                throw new QueueException('Index File "'.$filename.'" is not readable/writable');
621
            }
622
            $buf = file_get_contents($filename);
623
            if (empty($buf)) {
624
                throw new QueueException('Index File "'.$filename.'" is empty');
625
            }
626
            $this->_index_data_full = Queue::unserialize($buf, $is_valid);
627
            if (!is_object($this->_index_data_full)) {
628
                throw new QueueException('Index File "'.$filename.'" is empty');
629
            }
630
            if ($this->_index_data_full->version != self::FILE_DB_INDEX_VERSION) {
631
                throw new QueueException('Version mismatch ('.
632
                                         $this->_index_data_full->version.' instead of '.self::FILE_DB_INDEX_VERSION.')');
633
            }
634
            $this->_index_data = $this->_index_data_full->data;
635
            unset($this->_index_data_full->data);
636
        }
637
638
        /**
639
         * Сохраняем данные в индекс
640
         *
641
         * @throws QueueException
642
         */
643
        protected function index_data_save() {
644
            $filename = $this->get_index_filename();
645
            if (!file_exists(dirname($filename))) {
646
                throw new QueueException('Folder "'.dirname($filename).'" does not exist', 4);
647
            } elseif (!is_dir(dirname($filename))) {
648
                throw new QueueException('Folder "'.dirname($filename).'" is not a folder', 13);
649
            } elseif (!is_writable(dirname($filename))) {
650
                throw new QueueException('Folder "'.dirname($filename).'" is not writable', 5);
651
            } elseif (file_exists($filename) and !is_writable($filename)) {
652
                throw new QueueException('File "'.$filename.'" is not writable', 6);
653
            }
654
            $filename_tmp = $filename.'-'.mt_rand(0, 50000).'.tmp';
655
            touch($filename_tmp);
656
            chmod($filename_tmp, 6 << 6);
657
            // @todo fileperms
658
659
            if (is_null($this->_index_data_full)) {
660
                $this->_index_data_full = (object) [];
661
                $this->_index_data_full->time_create = microtime(true);
662
            }
663
            $this->_index_data_full->version = self::FILE_DB_INDEX_VERSION;
664
            $this->_index_data_full->time_last_update = microtime(true);
665
            $this->_index_data_full->queue_name = $this->get_queue_name();
666
            $temporary_data = clone $this->_index_data_full;
667
            $temporary_data->data = $this->_index_data;
668
669
            if (@file_put_contents($filename_tmp, Queue::serialize($temporary_data)) === false) {
670
                // @codeCoverageIgnoreStart
671
                throw new QueueException('Can not save index file: '.FileMutex::get_last_php_error_as_string(), 10);
672
                // @codeCoverageIgnoreEnd
673
            }
674
            if (!rename($filename_tmp, $filename)) {
675
                // @codeCoverageIgnoreStart
676
                throw new QueueException('Can not rename temporary index file: '.
677
                                         FileMutex::get_last_php_error_as_string(), 16);
678
                // @codeCoverageIgnoreEnd
679
            }
680
        }
681
682
        /**
683
         * @param FileDBQueueTransport $queue
684
         *
685
         * @return boolean
686
         */
687
        function is_equal_to($queue) {
688
            if (spl_object_hash($this) == spl_object_hash($queue)) {
689
                return true;
690
            }
691
            if (!parent::is_equal_to($queue)) {
692
                return false;
693
            }
694
695
            return (($queue->_folder == $this->_folder) and
696
                    ($queue->get_queue_name() == $this->get_queue_name()) and
697
                    ($queue->_prefix == $this->_prefix));
698
        }
699
    }
700
701
?>