This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace NokitaKaze\Queue; |
||
4 | |||
5 | use NokitaKaze\Mutex\FileMutex; |
||
6 | |||
7 | class FileDBQueueTransport extends AbstractQueueTransport { |
||
8 | const FILE_DB_INDEX_VERSION = 1; |
||
9 | const FILE_DB_CHUNK_VERSION = 1; |
||
10 | |||
11 | protected $_folder; |
||
12 | protected $_mutex_folder; |
||
13 | protected $_prefix; |
||
14 | protected $_name; |
||
15 | protected $_db_file_count = Queue::DefaultDBFileCount; |
||
16 | /** |
||
17 | * @var FileDBQueueConstructionSettings|object |
||
18 | */ |
||
19 | protected $_construction_settings; |
||
20 | |||
21 | /** |
||
22 | * @var FileMutex|null |
||
23 | */ |
||
24 | protected $_producer_mutex = null; |
||
25 | |||
26 | /** |
||
27 | * @var FileMutex|null |
||
28 | */ |
||
29 | protected $_index_mutex = null; |
||
30 | |||
31 | /** |
||
32 | * @var integer[][] Данные с индексом |
||
33 | */ |
||
34 | protected $_index_data = []; |
||
35 | |||
36 | /** |
||
37 | * @var FileDBIndexFile |
||
38 | */ |
||
39 | protected $_index_data_full = null; |
||
40 | |||
41 | /** |
||
42 | * @var boolean Эксклюзивный режим |
||
43 | */ |
||
44 | protected $_exclusive_mode = false; |
||
45 | |||
46 | /** |
||
47 | * @param FileDBQueueConstructionSettings|object $settings |
||
48 | * |
||
49 | * @throws QueueException |
||
50 | */ |
||
51 | function __construct($settings) { |
||
52 | if (!isset($settings->storage_type)) { |
||
53 | $settings->storage_type = Queue::StorageTemporary; |
||
54 | } |
||
55 | if (!isset($settings->name)) { |
||
56 | throw new QueueException('Settings don\'t have field name', 11); |
||
57 | } |
||
58 | $this->_name = $settings->name; |
||
59 | $this->_construction_settings = $settings; |
||
60 | |||
61 | $this->set_folder_settings($settings); |
||
62 | if (isset($settings->prefix)) { |
||
63 | $this->_prefix = $settings->prefix; |
||
64 | } else { |
||
65 | $this->_prefix = ''; |
||
66 | } |
||
67 | if (isset($settings->mutex_folder)) { |
||
68 | $this->_mutex_folder = $settings->mutex_folder; |
||
69 | } else { |
||
70 | $this->_mutex_folder = $this->_folder.'/mutex'; |
||
71 | } |
||
72 | if (isset($settings->db_file_count)) { |
||
73 | $this->_db_file_count = $settings->db_file_count; |
||
74 | } |
||
75 | $this->_index_data = array_fill(0, $this->_db_file_count, []); |
||
76 | } |
||
77 | |||
78 | function __destruct() { |
||
79 | if (!is_null($this->_producer_mutex)) { |
||
80 | $this->_producer_mutex->release_lock(); |
||
81 | } |
||
82 | if (!is_null($this->_index_mutex)) { |
||
83 | $this->_index_mutex->release_lock(); |
||
84 | } |
||
85 | } |
||
86 | |||
87 | function __clone() { |
||
88 | parent::__clone(); |
||
89 | $this->_construction_settings = clone $this->_construction_settings; |
||
90 | if (isset($this->_producer_mutex)) { |
||
91 | $this->_producer_mutex = clone $this->_producer_mutex; |
||
92 | } |
||
93 | if (isset($this->_index_mutex)) { |
||
94 | $this->_index_mutex = clone $this->_producer_mutex; |
||
95 | } |
||
96 | if (isset($this->_index_data_full)) { |
||
97 | $this->_index_data_full = clone $this->_index_data_full; |
||
98 | } |
||
99 | } |
||
100 | |||
101 | /** |
||
102 | * @param FileDBQueueConstructionSettings|object $settings |
||
103 | * |
||
104 | * @throws QueueException |
||
105 | */ |
||
106 | protected function set_folder_settings($settings) { |
||
107 | switch ($settings->storage_type) { |
||
108 | case Queue::StorageTemporary: |
||
109 | $this->_folder = sys_get_temp_dir(); |
||
110 | break; |
||
111 | case Queue::StoragePersistent: |
||
112 | $this->_folder = FileMutex::getDirectoryString(); |
||
113 | break; |
||
114 | default: |
||
115 | throw new QueueException( |
||
116 | 'Constructor settings is malformed. Storage type can not be equal '. |
||
117 | $settings->storage_type, 1 |
||
118 | ); |
||
119 | } |
||
120 | if (isset($settings->folder)) { |
||
121 | $this->_folder = $settings->folder; |
||
122 | } |
||
123 | } |
||
124 | |||
125 | /** |
||
126 | * Блокируем index mutex |
||
127 | * |
||
128 | * @param double|integer $time |
||
129 | * |
||
130 | * @return boolean |
||
131 | * @throws \NokitaKaze\Mutex\MutexException |
||
132 | */ |
||
133 | protected function index_mutex_lock($time = -1) { |
||
134 | if (!$this->_exclusive_mode) { |
||
135 | return $this->_index_mutex->get_lock($time); |
||
136 | } else { |
||
137 | return true; |
||
138 | } |
||
139 | } |
||
140 | |||
141 | /** |
||
142 | * Блокируем index mutex |
||
143 | */ |
||
144 | protected function index_mutex_release_lock() { |
||
145 | if (!$this->_exclusive_mode) { |
||
146 | $this->_index_mutex->release_lock(); |
||
147 | } |
||
148 | } |
||
149 | |||
150 | /** |
||
151 | * Устанавливаем или снимаем монопольный режим |
||
152 | * |
||
153 | * @param boolean $mode |
||
154 | * @throws \NokitaKaze\Mutex\MutexException |
||
155 | */ |
||
156 | function set_exclusive_mode($mode) { |
||
157 | $this->init_producer_mutex(); |
||
158 | $this->_exclusive_mode = $mode; |
||
159 | if ($mode) { |
||
160 | $this->_index_mutex->get_lock(); |
||
161 | } else { |
||
162 | $this->_index_mutex->release_lock(); |
||
163 | } |
||
164 | } |
||
165 | |||
166 | /** |
||
167 | * @return iMessage[]|object[] |
||
168 | */ |
||
169 | protected function get_used_keys_and_real_need_for_save() { |
||
170 | $real_need_save = []; |
||
171 | $used_keys = []; |
||
172 | foreach ($this->_pushed_for_save as &$message) { |
||
173 | if (is_null($message->name)) { |
||
174 | $real_need_save[] = $message; |
||
175 | continue; |
||
176 | } |
||
177 | if (in_array($message->name, $used_keys)) { |
||
178 | continue; |
||
179 | } |
||
180 | foreach ($this->_index_data as $sort_id => &$index_datum) { |
||
181 | if (array_key_exists($message->name, $index_datum)) { |
||
182 | continue 2; |
||
183 | } |
||
184 | } |
||
185 | $used_keys[] = $message->name; |
||
186 | $real_need_save[] = $message; |
||
187 | } |
||
188 | |||
189 | return $real_need_save; |
||
190 | } |
||
191 | |||
192 | /** |
||
193 | * Сохраняем все сообщения, подготовленные для сохранения, в файл сообщений |
||
194 | * |
||
195 | * @throws QueueException |
||
196 | * @throws \NokitaKaze\Mutex\MutexException |
||
197 | */ |
||
198 | function save() { |
||
199 | if (count($this->_pushed_for_save) == 0) { |
||
200 | return; |
||
201 | } |
||
202 | $this->init_producer_mutex(); |
||
203 | |||
204 | $this->index_mutex_lock(); |
||
205 | $this->index_data_load(); |
||
206 | $real_need_save = $this->get_used_keys_and_real_need_for_save(); |
||
207 | if (count($real_need_save) == 0) { |
||
208 | $this->index_mutex_release_lock(); |
||
209 | |||
210 | return; |
||
211 | } |
||
212 | |||
213 | $this->_producer_mutex->get_lock(); |
||
214 | $current_thread_id = static::get_current_producer_thread_id(); |
||
215 | $data_in = $this->get_data_for_thread($current_thread_id); |
||
216 | foreach ($real_need_save as &$message) { |
||
217 | if (!isset($message->sort)) { |
||
218 | $message->sort = 5; |
||
219 | } |
||
220 | // Копируем уже существующий message в data_in |
||
221 | $data_in[] = (object) [ |
||
222 | 'name' => $message->name, |
||
223 | 'data' => $message->data, |
||
224 | 'time_created' => $message->time_created, |
||
225 | 'time_last_update' => microtime(true), |
||
226 | 'time_rnd_postfix' => isset($message->time_rnd_postfix) ? $message->time_rnd_postfix : null, |
||
227 | 'sort' => $message->sort, |
||
228 | 'is_read' => false, |
||
229 | // @codeCoverageIgnoreStart |
||
230 | ]; |
||
231 | // @codeCoverageIgnoreEnd |
||
232 | $key = self::get_real_key_for_message($message); |
||
233 | $this->_index_data[$message->sort][$key] = $current_thread_id; |
||
234 | } |
||
235 | $this->write_full_data_to_file($current_thread_id, $data_in); |
||
236 | unset($data_in); |
||
237 | |||
238 | $this->_producer_mutex->release_lock(); |
||
239 | $this->index_data_save(); |
||
240 | $this->index_mutex_release_lock(); |
||
241 | $this->_pushed_for_save = []; |
||
242 | } |
||
243 | |||
244 | /** |
||
245 | * Берём данные для конкретного внутренного треда в очереди сообщений |
||
246 | * |
||
247 | * @param integer $thread_id |
||
248 | * |
||
249 | * @return iMessage[]|object[] |
||
250 | * @throws QueueException |
||
251 | */ |
||
252 | protected function get_data_for_thread($thread_id) { |
||
253 | $filename = $this->get_producer_filename_for_thread($thread_id); |
||
254 | if (!file_exists($filename)) { |
||
255 | return []; |
||
256 | } |
||
257 | if (!is_readable($filename)) { |
||
258 | throw new QueueException('Chunk DB File "'.$filename.'" is not readable/writable'); |
||
259 | } |
||
260 | $buf = file_get_contents($filename, LOCK_EX); |
||
261 | if (empty($buf)) { |
||
262 | throw new QueueException('Chunk DB File "'.$filename.'" is malformed'); |
||
263 | } |
||
264 | /** |
||
265 | * @var FileDBChunkFile $object |
||
266 | */ |
||
267 | $object = Queue::unserialize($buf, $is_valid); |
||
268 | if (!is_object($object)) { |
||
269 | throw new QueueException('Chunk DB File "'.$filename.'" is malformed'); |
||
270 | } |
||
271 | if ($object->version != self::FILE_DB_CHUNK_VERSION) { |
||
0 ignored issues
–
show
|
|||
272 | throw new QueueException('Version mismatch ('. |
||
273 | $object->version.' instead of '.self::FILE_DB_CHUNK_VERSION.')'); |
||
0 ignored issues
–
show
Accessing
version on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
274 | } |
||
275 | |||
276 | return $object->queue; |
||
0 ignored issues
–
show
Accessing
queue on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
277 | } |
||
278 | |||
279 | /** |
||
280 | * Внутренний id треда |
||
281 | * |
||
282 | * @return integer |
||
283 | */ |
||
284 | protected static function get_current_producer_thread_id() { |
||
285 | return posix_getpid() % Queue::ProducerThreadCount; |
||
286 | } |
||
287 | |||
288 | /** |
||
289 | * Берём название файла с данными для конкретного внутренного треда в очереди сообщений |
||
290 | * |
||
291 | * @param integer $thread_id |
||
292 | * |
||
293 | * @return string |
||
294 | */ |
||
295 | protected function get_producer_filename_for_thread($thread_id) { |
||
296 | return $this->_folder.'/smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-'.$thread_id.'.que'; |
||
297 | } |
||
298 | |||
299 | /** |
||
300 | * Инициализируем мьютекс для текущего треда |
||
301 | */ |
||
302 | protected function init_producer_mutex() { |
||
303 | if (is_null($this->_producer_mutex)) { |
||
304 | $this->_producer_mutex = $this->get_mutex_for_thread(static::get_current_producer_thread_id()); |
||
305 | $this->_index_mutex = $this->get_index_mutex(); |
||
306 | } |
||
307 | } |
||
308 | |||
309 | /** |
||
310 | * @return string |
||
311 | */ |
||
312 | function get_mutex_folder() { |
||
313 | return $this->_mutex_folder; |
||
314 | } |
||
315 | |||
316 | /** |
||
317 | * Мьютекс для конкретного треда |
||
318 | * |
||
319 | * @param integer $thread_id |
||
320 | * |
||
321 | * @return FileMutex |
||
322 | * @throws \NokitaKaze\Mutex\MutexException |
||
323 | */ |
||
324 | protected function get_mutex_for_thread($thread_id) { |
||
325 | // @todo впилить сюда mutex resolver |
||
326 | /** |
||
327 | * @var \NokitaKaze\Mutex\MutexSettings $settings |
||
328 | */ |
||
329 | $settings = (object) []; |
||
330 | $settings->folder = $this->get_mutex_folder(); |
||
0 ignored issues
–
show
Accessing
folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
331 | FileMutex::create_folders_in_path($settings->folder); |
||
0 ignored issues
–
show
Accessing
folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
332 | $settings->name = 'smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-'.$thread_id; |
||
0 ignored issues
–
show
Accessing
name on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
333 | |||
334 | return new FileMutex($settings); |
||
335 | } |
||
336 | |||
337 | /** |
||
338 | * Создание мьютекса для файла с индексом на всю текущую очередь сообщений |
||
339 | * |
||
340 | * @return FileMutex |
||
341 | * @throws \NokitaKaze\Mutex\MutexException |
||
342 | */ |
||
343 | protected function get_index_mutex() { |
||
344 | // @todo впилить сюда mutex resolver |
||
345 | /** |
||
346 | * @var \NokitaKaze\Mutex\MutexSettings $settings |
||
347 | */ |
||
348 | $settings = (object) []; |
||
349 | $settings->folder = $this->get_mutex_folder(); |
||
0 ignored issues
–
show
Accessing
folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
350 | FileMutex::create_folders_in_path($settings->folder); |
||
0 ignored issues
–
show
Accessing
folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
351 | $settings->name = 'smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-index'; |
||
0 ignored issues
–
show
Accessing
name on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
352 | |||
353 | return new FileMutex($settings); |
||
354 | } |
||
355 | |||
356 | /** |
||
357 | * Сохраняем данные в файл внутреннего треда (без индекса) |
||
358 | * |
||
359 | * @param integer $thread_id |
||
360 | * @param iMessage[]|object[] $data_in |
||
361 | * |
||
362 | * @throws QueueException |
||
363 | */ |
||
364 | protected function write_full_data_to_file($thread_id, $data_in) { |
||
365 | $filename = $this->get_producer_filename_for_thread($thread_id); |
||
366 | if (!file_exists(dirname($filename))) { |
||
367 | throw new QueueException('Folder "'.dirname($filename).'" does not exist', 7); |
||
368 | } elseif (!is_dir(dirname($filename))) { |
||
369 | throw new QueueException('Folder "'.dirname($filename).'" is not a folder', 14); |
||
370 | } elseif (!is_writable(dirname($filename))) { |
||
371 | throw new QueueException('Folder "'.dirname($filename).'" is not writable', 8); |
||
372 | } elseif (file_exists($filename) and !is_writable($filename)) { |
||
373 | throw new QueueException('File "'.$filename.'" is not writable', 9); |
||
374 | } |
||
375 | $filename_tmp = $filename.'-'.mt_rand(10000, 99999).'.tmp'; |
||
376 | touch($filename_tmp); |
||
377 | // @todo fileperms |
||
378 | chmod($filename_tmp, 6 << 6); |
||
379 | |||
380 | /** |
||
381 | * @var FileDBChunkFile $object |
||
382 | */ |
||
383 | $object = (object) []; |
||
384 | $object->version = self::FILE_DB_CHUNK_VERSION; |
||
0 ignored issues
–
show
Accessing
version on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
385 | $object->time_last_update = microtime(true); |
||
0 ignored issues
–
show
Accessing
time_last_update on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
386 | $object->queue_name = $this->get_queue_name(); |
||
0 ignored issues
–
show
Accessing
queue_name on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
387 | $object->queue = $data_in; |
||
0 ignored issues
–
show
Accessing
queue on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?
If you access a property on an interface, you most likely code against a concrete implementation of the interface. Available Fixes
![]() |
|||
388 | if (@file_put_contents($filename_tmp, Queue::serialize($object), LOCK_EX) === false) { |
||
389 | // @codeCoverageIgnoreStart |
||
390 | throw new QueueException('Can not save queue stream: '.FileMutex::get_last_php_error_as_string(), 2); |
||
391 | // @codeCoverageIgnoreEnd |
||
392 | } |
||
393 | if (!rename($filename_tmp, $filename)) { |
||
394 | // @codeCoverageIgnoreStart |
||
395 | throw new QueueException('Can not rename temporary chunk database file: '. |
||
396 | FileMutex::get_last_php_error_as_string(), 15); |
||
397 | // @codeCoverageIgnoreEnd |
||
398 | } |
||
399 | } |
||
400 | |||
401 | /** |
||
402 | * Получаем номера DB, в которых лежат сообщения, которые нам надо удалить |
||
403 | * |
||
404 | * @param iMessage[]|object[] $messages |
||
405 | * |
||
406 | * @return string[][]|integer[][] |
||
407 | */ |
||
408 | protected function get_keys_for_delete_group_by_thread($messages) { |
||
409 | $keys = []; |
||
410 | foreach ($messages as &$message) { |
||
411 | $keys[] = self::get_real_key_for_message($message); |
||
412 | } |
||
413 | unset($message); |
||
414 | |||
415 | $threads_keys = array_fill(0, Queue::ProducerThreadCount, []); |
||
416 | foreach ($this->_index_data as $sort_id => &$index_datum) { |
||
417 | if (empty($index_datum)) { |
||
418 | continue; |
||
419 | } |
||
420 | foreach ($index_datum as $key => &$thread_id) { |
||
421 | if (in_array($key, $keys)) { |
||
422 | $threads_keys[$thread_id][] = $key; |
||
423 | unset($index_datum[$key]); |
||
424 | } |
||
425 | } |
||
426 | } |
||
427 | |||
428 | return $threads_keys; |
||
429 | } |
||
430 | |||
431 | /** |
||
432 | * Удалить сообщения и сразу же записать это в БД |
||
433 | * |
||
434 | * @param iMessage[]|object[] $messages |
||
435 | * |
||
436 | * @return string[]|integer[] |
||
437 | * @throws \NokitaKaze\Mutex\MutexException |
||
438 | * @throws QueueException |
||
439 | */ |
||
440 | function delete_messages(array $messages) { |
||
441 | $this->init_producer_mutex(); |
||
442 | $this->index_mutex_lock(); |
||
443 | $this->index_data_load(); |
||
444 | |||
445 | $threads_keys = $this->get_keys_for_delete_group_by_thread($messages); |
||
446 | |||
447 | $deleted_keys = []; |
||
448 | foreach ($threads_keys as $thread_id => &$thread_keys) { |
||
449 | if (count($thread_keys) == 0) { |
||
450 | continue; |
||
451 | } |
||
452 | $mutex = $this->get_mutex_for_thread($thread_id); |
||
453 | $mutex->get_lock(); |
||
454 | $data_in = $this->get_data_for_thread($thread_id); |
||
455 | $u = false; |
||
456 | foreach ($data_in as $inner_id => &$inner_message) { |
||
457 | $real_key = self::get_real_key_for_message($inner_message); |
||
458 | if (in_array($real_key, $thread_keys)) { |
||
459 | unset($data_in[$inner_id]); |
||
460 | $deleted_keys[] = $real_key; |
||
461 | $u = true; |
||
462 | } |
||
463 | } |
||
464 | if ($u) { |
||
465 | // @hint Это always true condition. Иначе данные неконсистентны |
||
466 | if (count($data_in) > 0) { |
||
467 | $this->write_full_data_to_file($thread_id, $data_in); |
||
468 | } else { |
||
469 | unlink($this->get_producer_filename_for_thread($thread_id)); |
||
470 | } |
||
471 | } |
||
472 | |||
473 | $mutex->release_lock(); |
||
474 | $this->index_data_save(); |
||
475 | } |
||
476 | $this->index_mutex_release_lock(); |
||
477 | |||
478 | return $deleted_keys; |
||
479 | } |
||
480 | |||
481 | /** |
||
482 | * Обновляем сообщение и сразу же сохраняем всё |
||
483 | * |
||
484 | * Эта функция не рейзит ошибку, если сообщение не найдено |
||
485 | * |
||
486 | * @param iMessage|object $message |
||
487 | * @param string|null $key форсированно задаём ключ сообщения |
||
488 | * |
||
489 | * @return boolean |
||
490 | * @throws QueueException |
||
491 | * @throws \NokitaKaze\Mutex\MutexException |
||
492 | */ |
||
493 | function update_message($message, $key = null) { |
||
494 | $this->init_producer_mutex(); |
||
495 | $this->index_mutex_lock(); |
||
496 | $this->index_data_load(); |
||
497 | |||
498 | if (is_null($key)) { |
||
499 | $key = self::get_real_key_for_message($message); |
||
500 | } |
||
501 | $exists = false; |
||
502 | foreach ($this->_index_data as $index_id => &$index_datum) { |
||
503 | if (!array_key_exists($key, $index_datum)) { |
||
504 | continue; |
||
505 | } |
||
506 | $thread_id = $index_datum[$key]; |
||
507 | $mutex = $this->get_mutex_for_thread($thread_id); |
||
508 | $mutex->get_lock(); |
||
509 | $data_in = $this->get_data_for_thread($thread_id); |
||
510 | // Ищем то же сообщение и заменяем его |
||
511 | $u = $this->change_message_in_array($data_in, $message, $key); |
||
512 | if ($u) { |
||
513 | $exists = true; |
||
514 | } |
||
515 | $this->write_full_data_to_file($thread_id, $data_in); |
||
516 | $mutex->release_lock(); |
||
517 | $this->index_data_save(); |
||
518 | break; |
||
519 | } |
||
520 | $this->index_mutex_release_lock(); |
||
521 | |||
522 | return $exists; |
||
523 | } |
||
524 | |||
525 | /** |
||
526 | * @param double|integer $wait_time |
||
527 | * |
||
528 | * @return iMessage|object|null |
||
529 | * @throws \NokitaKaze\Mutex\MutexException |
||
530 | * @throws QueueException |
||
531 | */ |
||
532 | function consume_next_message($wait_time = -1) { |
||
533 | $this->set_same_time_flag(2); |
||
534 | $start = microtime(true); |
||
535 | $this->init_producer_mutex(); |
||
536 | $this->index_mutex_lock(); |
||
537 | $this->index_data_load(); |
||
538 | while (true) { |
||
539 | for ($sort_id = 0; $sort_id < $this->_db_file_count; $sort_id++) { |
||
540 | foreach ($this->_index_data[$sort_id] as $key => $thread_id) { |
||
541 | if (isset($this->_consumed_keys[$key])) { |
||
542 | continue; |
||
543 | } |
||
544 | $mutex = $this->get_mutex_for_thread($thread_id); |
||
545 | $mutex->get_lock(); |
||
546 | $data_in = $this->get_data_for_thread($thread_id); |
||
547 | $this->_consumed_keys[$key] = 1; |
||
548 | foreach ($data_in as $message) { |
||
549 | $this_key = self::get_real_key_for_message($message); |
||
550 | if ($this_key == $key) { |
||
551 | $message->time_consumed = microtime(true); |
||
552 | $message->thread_consumed = $thread_id; |
||
553 | $message->queue = $this; |
||
554 | $mutex->release_lock(); |
||
555 | $this->index_mutex_release_lock(); |
||
556 | |||
557 | return $message; |
||
558 | } |
||
559 | } |
||
560 | // @hint Сюда может передаться код только в случае неконсистентности БД, когда в |
||
561 | // index'е ключ есть, а в data его нет |
||
562 | unset($mutex); |
||
563 | } |
||
564 | } |
||
565 | $this->index_mutex_release_lock(); |
||
566 | if (($wait_time !== -1) and ($start + $wait_time < microtime(true))) { |
||
567 | return null; |
||
568 | } |
||
569 | $this->index_mutex_lock(); |
||
570 | $this->index_data_load(); |
||
571 | } |
||
572 | |||
573 | // @hint Это для IDE |
||
574 | // @codeCoverageIgnoreStart |
||
575 | return null; |
||
576 | // @codeCoverageIgnoreEnd |
||
577 | } |
||
578 | |||
579 | /** |
||
580 | * @return string |
||
581 | */ |
||
582 | function get_queue_name() { |
||
583 | return $this->_name; |
||
584 | } |
||
585 | |||
586 | /** |
||
587 | * Поддерживает ли очередь сортировку event'ов при вставке |
||
588 | * |
||
589 | * @return boolean |
||
590 | */ |
||
591 | static function is_support_sorted_events() { |
||
592 | return true; |
||
593 | } |
||
594 | |||
595 | /** |
||
596 | * Индексы |
||
597 | */ |
||
598 | |||
599 | /** |
||
600 | * Название файла для содержания даты |
||
601 | * |
||
602 | * @return string |
||
603 | */ |
||
604 | protected function get_index_filename() { |
||
605 | return $this->_folder.'/smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-index.que'; |
||
606 | } |
||
607 | |||
608 | /** |
||
609 | * @throws QueueException |
||
610 | */ |
||
611 | protected function index_data_load() { |
||
612 | // @todo Проверять время-размер |
||
613 | $filename = $this->get_index_filename(); |
||
614 | if (!file_exists($filename)) { |
||
615 | $this->_index_data = array_fill(0, $this->_db_file_count, []); |
||
616 | |||
617 | return; |
||
618 | } |
||
619 | if (!is_readable($filename) or !is_writable($filename)) { |
||
620 | throw new QueueException('Index File "'.$filename.'" is not readable/writable'); |
||
621 | } |
||
622 | $buf = file_get_contents($filename); |
||
623 | if (empty($buf)) { |
||
624 | throw new QueueException('Index File "'.$filename.'" is empty'); |
||
625 | } |
||
626 | $this->_index_data_full = Queue::unserialize($buf, $is_valid); |
||
627 | if (!is_object($this->_index_data_full)) { |
||
628 | throw new QueueException('Index File "'.$filename.'" is empty'); |
||
629 | } |
||
630 | if ($this->_index_data_full->version != self::FILE_DB_INDEX_VERSION) { |
||
631 | throw new QueueException('Version mismatch ('. |
||
632 | $this->_index_data_full->version.' instead of '.self::FILE_DB_INDEX_VERSION.')'); |
||
633 | } |
||
634 | $this->_index_data = $this->_index_data_full->data; |
||
635 | unset($this->_index_data_full->data); |
||
636 | } |
||
637 | |||
638 | /** |
||
639 | * Сохраняем данные в индекс |
||
640 | * |
||
641 | * @throws QueueException |
||
642 | */ |
||
643 | protected function index_data_save() { |
||
644 | $filename = $this->get_index_filename(); |
||
645 | if (!file_exists(dirname($filename))) { |
||
646 | throw new QueueException('Folder "'.dirname($filename).'" does not exist', 4); |
||
647 | } elseif (!is_dir(dirname($filename))) { |
||
648 | throw new QueueException('Folder "'.dirname($filename).'" is not a folder', 13); |
||
649 | } elseif (!is_writable(dirname($filename))) { |
||
650 | throw new QueueException('Folder "'.dirname($filename).'" is not writable', 5); |
||
651 | } elseif (file_exists($filename) and !is_writable($filename)) { |
||
652 | throw new QueueException('File "'.$filename.'" is not writable', 6); |
||
653 | } |
||
654 | $filename_tmp = $filename.'-'.mt_rand(0, 50000).'.tmp'; |
||
655 | touch($filename_tmp); |
||
656 | chmod($filename_tmp, 6 << 6); |
||
657 | // @todo fileperms |
||
658 | |||
659 | if (is_null($this->_index_data_full)) { |
||
660 | $this->_index_data_full = (object) []; |
||
661 | $this->_index_data_full->time_create = microtime(true); |
||
662 | } |
||
663 | $this->_index_data_full->version = self::FILE_DB_INDEX_VERSION; |
||
664 | $this->_index_data_full->time_last_update = microtime(true); |
||
665 | $this->_index_data_full->queue_name = $this->get_queue_name(); |
||
666 | $temporary_data = clone $this->_index_data_full; |
||
667 | $temporary_data->data = $this->_index_data; |
||
668 | |||
669 | if (@file_put_contents($filename_tmp, Queue::serialize($temporary_data)) === false) { |
||
670 | // @codeCoverageIgnoreStart |
||
671 | throw new QueueException('Can not save index file: '.FileMutex::get_last_php_error_as_string(), 10); |
||
672 | // @codeCoverageIgnoreEnd |
||
673 | } |
||
674 | if (!rename($filename_tmp, $filename)) { |
||
675 | // @codeCoverageIgnoreStart |
||
676 | throw new QueueException('Can not rename temporary index file: '. |
||
677 | FileMutex::get_last_php_error_as_string(), 16); |
||
678 | // @codeCoverageIgnoreEnd |
||
679 | } |
||
680 | } |
||
681 | |||
682 | /** |
||
683 | * @param FileDBQueueTransport $queue |
||
684 | * |
||
685 | * @return boolean |
||
686 | */ |
||
687 | function is_equal_to($queue) { |
||
688 | if (spl_object_hash($this) == spl_object_hash($queue)) { |
||
689 | return true; |
||
690 | } |
||
691 | if (!parent::is_equal_to($queue)) { |
||
692 | return false; |
||
693 | } |
||
694 | |||
695 | return (($queue->_folder == $this->_folder) and |
||
696 | ($queue->get_queue_name() == $this->get_queue_name()) and |
||
697 | ($queue->_prefix == $this->_prefix)); |
||
698 | } |
||
699 | } |
||
700 | |||
701 | ?> |
If you access a property on an interface, you most likely code against a concrete implementation of the interface.
Available Fixes
Adding an additional type check:
Changing the type hint: