Issues (60)

Security Analysis    no request data  

This project does not seem to handle request data directly as such no vulnerable execution paths were found.

  Cross-Site Scripting
Cross-Site Scripting enables an attacker to inject code into the response of a web-request that is viewed by other users. It can for example be used to bypass access controls, or even to take over other users' accounts.
  File Exposure
File Exposure allows an attacker to gain access to local files that he should not be able to access. These files can for example include database credentials, or other configuration files.
  File Manipulation
File Manipulation enables an attacker to write custom data to files. This potentially leads to injection of arbitrary code on the server.
  Object Injection
Object Injection enables an attacker to inject an object into PHP code, and can lead to arbitrary code execution, file exposure, or file manipulation attacks.
  Code Injection
Code Injection enables an attacker to execute arbitrary code on the server.
  Response Splitting
Response Splitting can be used to send arbitrary responses.
  File Inclusion
File Inclusion enables an attacker to inject custom files into PHP's file loading mechanism, either explicitly passed to include, or for example via PHP's auto-loading mechanism.
  Command Injection
Command Injection enables an attacker to inject a shell command that is execute with the privileges of the web-server. This can be used to expose sensitive data, or gain access of your server.
  SQL Injection
SQL Injection enables an attacker to execute arbitrary SQL code on your database server gaining access to user data, or manipulating user data.
  XPath Injection
XPath Injection enables an attacker to modify the parts of XML document that are read. If that XML document is for example used for authentication, this can lead to further vulnerabilities similar to SQL Injection.
  LDAP Injection
LDAP Injection enables an attacker to inject LDAP statements potentially granting permission to run unauthorized queries, or modify content inside the LDAP tree.
  Header Injection
  Other Vulnerability
This category comprises other attack vectors such as manipulating the PHP runtime, loading custom extensions, freezing the runtime, or similar.
  Regex Injection
Regex Injection enables an attacker to execute arbitrary code in your PHP process.
  XML Injection
XML Injection enables an attacker to read files on your local filesystem including configuration files, or can be abused to freeze your web-server process.
  Variable Injection
Variable Injection enables an attacker to overwrite program variables with custom data, and can lead to further vulnerabilities.
Unfortunately, the security analysis is currently not available for your project. If you are a non-commercial open-source project, please contact support to gain access.

src/FileDBQueueTransport.php (13 issues)

Labels
Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
    namespace NokitaKaze\Queue;
4
5
    use NokitaKaze\Mutex\FileMutex;
6
7
    class FileDBQueueTransport extends AbstractQueueTransport {
8
        const FILE_DB_INDEX_VERSION = 1;
9
        const FILE_DB_CHUNK_VERSION = 1;
10
11
        protected $_folder;
12
        protected $_mutex_folder;
13
        protected $_prefix;
14
        protected $_name;
15
        protected $_db_file_count = Queue::DefaultDBFileCount;
16
        /**
17
         * @var FileDBQueueConstructionSettings|object
18
         */
19
        protected $_construction_settings;
20
21
        /**
22
         * @var FileMutex|null
23
         */
24
        protected $_producer_mutex = null;
25
26
        /**
27
         * @var FileMutex|null
28
         */
29
        protected $_index_mutex = null;
30
31
        /**
32
         * @var integer[][] Данные с индексом
33
         */
34
        protected $_index_data = [];
35
36
        /**
37
         * @var FileDBIndexFile
38
         */
39
        protected $_index_data_full = null;
40
41
        /**
42
         * @var boolean Эксклюзивный режим
43
         */
44
        protected $_exclusive_mode = false;
45
46
        /**
47
         * @param FileDBQueueConstructionSettings|object $settings
48
         *
49
         * @throws QueueException
50
         */
51
        function __construct($settings) {
52
            if (!isset($settings->storage_type)) {
53
                $settings->storage_type = Queue::StorageTemporary;
54
            }
55
            if (!isset($settings->name)) {
56
                throw new QueueException('Settings don\'t have field name', 11);
57
            }
58
            $this->_name = $settings->name;
59
            $this->_construction_settings = $settings;
60
61
            $this->set_folder_settings($settings);
62
            if (isset($settings->prefix)) {
63
                $this->_prefix = $settings->prefix;
64
            } else {
65
                $this->_prefix = '';
66
            }
67
            if (isset($settings->mutex_folder)) {
68
                $this->_mutex_folder = $settings->mutex_folder;
69
            } else {
70
                $this->_mutex_folder = $this->_folder.'/mutex';
71
            }
72
            if (isset($settings->db_file_count)) {
73
                $this->_db_file_count = $settings->db_file_count;
74
            }
75
            $this->_index_data = array_fill(0, $this->_db_file_count, []);
76
        }
77
78
        function __destruct() {
79
            if (!is_null($this->_producer_mutex)) {
80
                $this->_producer_mutex->release_lock();
81
            }
82
            if (!is_null($this->_index_mutex)) {
83
                $this->_index_mutex->release_lock();
84
            }
85
        }
86
87
        function __clone() {
88
            parent::__clone();
89
            $this->_construction_settings = clone $this->_construction_settings;
90
            if (isset($this->_producer_mutex)) {
91
                $this->_producer_mutex = clone $this->_producer_mutex;
92
            }
93
            if (isset($this->_index_mutex)) {
94
                $this->_index_mutex = clone $this->_producer_mutex;
95
            }
96
            if (isset($this->_index_data_full)) {
97
                $this->_index_data_full = clone $this->_index_data_full;
98
            }
99
        }
100
101
        /**
102
         * @param FileDBQueueConstructionSettings|object $settings
103
         *
104
         * @throws QueueException
105
         */
106
        protected function set_folder_settings($settings) {
107
            switch ($settings->storage_type) {
108
                case Queue::StorageTemporary:
109
                    $this->_folder = sys_get_temp_dir();
110
                    break;
111
                case Queue::StoragePersistent:
112
                    $this->_folder = FileMutex::getDirectoryString();
113
                    break;
114
                default:
115
                    throw new QueueException(
116
                        'Constructor settings is malformed. Storage type can not be equal '.
117
                        $settings->storage_type, 1
118
                    );
119
            }
120
            if (isset($settings->folder)) {
121
                $this->_folder = $settings->folder;
122
            }
123
        }
124
125
        /**
126
         * Блокируем index mutex
127
         *
128
         * @param double|integer $time
129
         *
130
         * @return boolean
131
         * @throws \NokitaKaze\Mutex\MutexException
132
         */
133
        protected function index_mutex_lock($time = -1) {
134
            if (!$this->_exclusive_mode) {
135
                return $this->_index_mutex->get_lock($time);
136
            } else {
137
                return true;
138
            }
139
        }
140
141
        /**
142
         * Блокируем index mutex
143
         */
144
        protected function index_mutex_release_lock() {
145
            if (!$this->_exclusive_mode) {
146
                $this->_index_mutex->release_lock();
147
            }
148
        }
149
150
        /**
151
         * Устанавливаем или снимаем монопольный режим
152
         *
153
         * @param boolean $mode
154
         * @throws \NokitaKaze\Mutex\MutexException
155
         */
156
        function set_exclusive_mode($mode) {
157
            $this->init_producer_mutex();
158
            $this->_exclusive_mode = $mode;
159
            if ($mode) {
160
                $this->_index_mutex->get_lock();
161
            } else {
162
                $this->_index_mutex->release_lock();
163
            }
164
        }
165
166
        /**
167
         * @return iMessage[]|object[]
168
         */
169
        protected function get_used_keys_and_real_need_for_save() {
170
            $real_need_save = [];
171
            $used_keys = [];
172
            foreach ($this->_pushed_for_save as &$message) {
173
                if (is_null($message->name)) {
174
                    $real_need_save[] = $message;
175
                    continue;
176
                }
177
                if (in_array($message->name, $used_keys)) {
178
                    continue;
179
                }
180
                foreach ($this->_index_data as $sort_id => &$index_datum) {
181
                    if (array_key_exists($message->name, $index_datum)) {
182
                        continue 2;
183
                    }
184
                }
185
                $used_keys[] = $message->name;
186
                $real_need_save[] = $message;
187
            }
188
189
            return $real_need_save;
190
        }
191
192
        /**
193
         * Сохраняем все сообщения, подготовленные для сохранения, в файл сообщений
194
         *
195
         * @throws QueueException
196
         * @throws \NokitaKaze\Mutex\MutexException
197
         */
198
        function save() {
199
            if (count($this->_pushed_for_save) == 0) {
200
                return;
201
            }
202
            $this->init_producer_mutex();
203
204
            $this->index_mutex_lock();
205
            $this->index_data_load();
206
            $real_need_save = $this->get_used_keys_and_real_need_for_save();
207
            if (count($real_need_save) == 0) {
208
                $this->index_mutex_release_lock();
209
210
                return;
211
            }
212
213
            $this->_producer_mutex->get_lock();
214
            $current_thread_id = static::get_current_producer_thread_id();
215
            $data_in = $this->get_data_for_thread($current_thread_id);
216
            foreach ($real_need_save as &$message) {
217
                if (!isset($message->sort)) {
218
                    $message->sort = 5;
219
                }
220
                // Копируем уже существующий message в data_in
221
                $data_in[] = (object) [
222
                    'name' => $message->name,
223
                    'data' => $message->data,
224
                    'time_created' => $message->time_created,
225
                    'time_last_update' => microtime(true),
226
                    'time_rnd_postfix' => isset($message->time_rnd_postfix) ? $message->time_rnd_postfix : null,
227
                    'sort' => $message->sort,
228
                    'is_read' => false,
229
                    // @codeCoverageIgnoreStart
230
                ];
231
                // @codeCoverageIgnoreEnd
232
                $key = self::get_real_key_for_message($message);
233
                $this->_index_data[$message->sort][$key] = $current_thread_id;
234
            }
235
            $this->write_full_data_to_file($current_thread_id, $data_in);
236
            unset($data_in);
237
238
            $this->_producer_mutex->release_lock();
239
            $this->index_data_save();
240
            $this->index_mutex_release_lock();
241
            $this->_pushed_for_save = [];
242
        }
243
244
        /**
245
         * Берём данные для конкретного внутренного треда в очереди сообщений
246
         *
247
         * @param integer $thread_id
248
         *
249
         * @return iMessage[]|object[]
250
         * @throws QueueException
251
         */
252
        protected function get_data_for_thread($thread_id) {
253
            $filename = $this->get_producer_filename_for_thread($thread_id);
254
            if (!file_exists($filename)) {
255
                return [];
256
            }
257
            if (!is_readable($filename)) {
258
                throw new QueueException('Chunk DB File "'.$filename.'" is not readable/writable');
259
            }
260
            $buf = file_get_contents($filename, LOCK_EX);
261
            if (empty($buf)) {
262
                throw new QueueException('Chunk DB File "'.$filename.'" is malformed');
263
            }
264
            /**
265
             * @var FileDBChunkFile $object
266
             */
267
            $object = Queue::unserialize($buf, $is_valid);
268
            if (!is_object($object)) {
269
                throw new QueueException('Chunk DB File "'.$filename.'" is malformed');
270
            }
271
            if ($object->version != self::FILE_DB_CHUNK_VERSION) {
0 ignored issues
show
Accessing version on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
272
                throw new QueueException('Version mismatch ('.
273
                                         $object->version.' instead of '.self::FILE_DB_CHUNK_VERSION.')');
0 ignored issues
show
Accessing version on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
274
            }
275
276
            return $object->queue;
0 ignored issues
show
Accessing queue on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
277
        }
278
279
        /**
280
         * Внутренний id треда
281
         *
282
         * @return integer
283
         */
284
        protected static function get_current_producer_thread_id() {
285
            return posix_getpid() % Queue::ProducerThreadCount;
286
        }
287
288
        /**
289
         * Берём название файла с данными для конкретного внутренного треда в очереди сообщений
290
         *
291
         * @param integer $thread_id
292
         *
293
         * @return string
294
         */
295
        protected function get_producer_filename_for_thread($thread_id) {
296
            return $this->_folder.'/smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-'.$thread_id.'.que';
297
        }
298
299
        /**
300
         * Инициализируем мьютекс для текущего треда
301
         */
302
        protected function init_producer_mutex() {
303
            if (is_null($this->_producer_mutex)) {
304
                $this->_producer_mutex = $this->get_mutex_for_thread(static::get_current_producer_thread_id());
305
                $this->_index_mutex = $this->get_index_mutex();
306
            }
307
        }
308
309
        /**
310
         * @return string
311
         */
312
        function get_mutex_folder() {
313
            return $this->_mutex_folder;
314
        }
315
316
        /**
317
         * Мьютекс для конкретного треда
318
         *
319
         * @param integer $thread_id
320
         *
321
         * @return FileMutex
322
         * @throws \NokitaKaze\Mutex\MutexException
323
         */
324
        protected function get_mutex_for_thread($thread_id) {
325
            // @todo впилить сюда mutex resolver
326
            /**
327
             * @var \NokitaKaze\Mutex\MutexSettings $settings
328
             */
329
            $settings = (object) [];
330
            $settings->folder = $this->get_mutex_folder();
0 ignored issues
show
Accessing folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
331
            FileMutex::create_folders_in_path($settings->folder);
0 ignored issues
show
Accessing folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
332
            $settings->name = 'smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-'.$thread_id;
0 ignored issues
show
Accessing name on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
333
334
            return new FileMutex($settings);
335
        }
336
337
        /**
338
         * Создание мьютекса для файла с индексом на всю текущую очередь сообщений
339
         *
340
         * @return FileMutex
341
         * @throws \NokitaKaze\Mutex\MutexException
342
         */
343
        protected function get_index_mutex() {
344
            // @todo впилить сюда mutex resolver
345
            /**
346
             * @var \NokitaKaze\Mutex\MutexSettings $settings
347
             */
348
            $settings = (object) [];
349
            $settings->folder = $this->get_mutex_folder();
0 ignored issues
show
Accessing folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
350
            FileMutex::create_folders_in_path($settings->folder);
0 ignored issues
show
Accessing folder on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
351
            $settings->name = 'smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-index';
0 ignored issues
show
Accessing name on the interface NokitaKaze\Mutex\MutexSettings suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
352
353
            return new FileMutex($settings);
354
        }
355
356
        /**
357
         * Сохраняем данные в файл внутреннего треда (без индекса)
358
         *
359
         * @param integer             $thread_id
360
         * @param iMessage[]|object[] $data_in
361
         *
362
         * @throws QueueException
363
         */
364
        protected function write_full_data_to_file($thread_id, $data_in) {
365
            $filename = $this->get_producer_filename_for_thread($thread_id);
366
            if (!file_exists(dirname($filename))) {
367
                throw new QueueException('Folder "'.dirname($filename).'" does not exist', 7);
368
            } elseif (!is_dir(dirname($filename))) {
369
                throw new QueueException('Folder "'.dirname($filename).'" is not a folder', 14);
370
            } elseif (!is_writable(dirname($filename))) {
371
                throw new QueueException('Folder "'.dirname($filename).'" is not writable', 8);
372
            } elseif (file_exists($filename) and !is_writable($filename)) {
373
                throw new QueueException('File "'.$filename.'" is not writable', 9);
374
            }
375
            $filename_tmp = $filename.'-'.mt_rand(10000, 99999).'.tmp';
376
            touch($filename_tmp);
377
            // @todo fileperms
378
            chmod($filename_tmp, 6 << 6);
379
380
            /**
381
             * @var FileDBChunkFile $object
382
             */
383
            $object = (object) [];
384
            $object->version = self::FILE_DB_CHUNK_VERSION;
0 ignored issues
show
Accessing version on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
385
            $object->time_last_update = microtime(true);
0 ignored issues
show
Accessing time_last_update on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
386
            $object->queue_name = $this->get_queue_name();
0 ignored issues
show
Accessing queue_name on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
387
            $object->queue = $data_in;
0 ignored issues
show
Accessing queue on the interface NokitaKaze\Queue\FileDBChunkFile suggest that you code against a concrete implementation. How about adding an instanceof check?

If you access a property on an interface, you most likely code against a concrete implementation of the interface.

Available Fixes

  1. Adding an additional type check:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeInterface $object) {
        if ($object instanceof SomeClass) {
            $a = $object->a;
        }
    }
    
  2. Changing the type hint:

    interface SomeInterface { }
    class SomeClass implements SomeInterface {
        public $a;
    }
    
    function someFunction(SomeClass $object) {
        $a = $object->a;
    }
    
Loading history...
388
            if (@file_put_contents($filename_tmp, Queue::serialize($object), LOCK_EX) === false) {
389
                // @codeCoverageIgnoreStart
390
                throw new QueueException('Can not save queue stream: '.FileMutex::get_last_php_error_as_string(), 2);
391
                // @codeCoverageIgnoreEnd
392
            }
393
            if (!rename($filename_tmp, $filename)) {
394
                // @codeCoverageIgnoreStart
395
                throw new QueueException('Can not rename temporary chunk database file: '.
396
                                         FileMutex::get_last_php_error_as_string(), 15);
397
                // @codeCoverageIgnoreEnd
398
            }
399
        }
400
401
        /**
402
         * Получаем номера DB, в которых лежат сообщения, которые нам надо удалить
403
         *
404
         * @param iMessage[]|object[] $messages
405
         *
406
         * @return string[][]|integer[][]
407
         */
408
        protected function get_keys_for_delete_group_by_thread($messages) {
409
            $keys = [];
410
            foreach ($messages as &$message) {
411
                $keys[] = self::get_real_key_for_message($message);
412
            }
413
            unset($message);
414
415
            $threads_keys = array_fill(0, Queue::ProducerThreadCount, []);
416
            foreach ($this->_index_data as $sort_id => &$index_datum) {
417
                if (empty($index_datum)) {
418
                    continue;
419
                }
420
                foreach ($index_datum as $key => &$thread_id) {
421
                    if (in_array($key, $keys)) {
422
                        $threads_keys[$thread_id][] = $key;
423
                        unset($index_datum[$key]);
424
                    }
425
                }
426
            }
427
428
            return $threads_keys;
429
        }
430
431
        /**
432
         * Удалить сообщения и сразу же записать это в БД
433
         *
434
         * @param iMessage[]|object[] $messages
435
         *
436
         * @return string[]|integer[]
437
         * @throws \NokitaKaze\Mutex\MutexException
438
         * @throws QueueException
439
         */
440
        function delete_messages(array $messages) {
441
            $this->init_producer_mutex();
442
            $this->index_mutex_lock();
443
            $this->index_data_load();
444
445
            $threads_keys = $this->get_keys_for_delete_group_by_thread($messages);
446
447
            $deleted_keys = [];
448
            foreach ($threads_keys as $thread_id => &$thread_keys) {
449
                if (count($thread_keys) == 0) {
450
                    continue;
451
                }
452
                $mutex = $this->get_mutex_for_thread($thread_id);
453
                $mutex->get_lock();
454
                $data_in = $this->get_data_for_thread($thread_id);
455
                $u = false;
456
                foreach ($data_in as $inner_id => &$inner_message) {
457
                    $real_key = self::get_real_key_for_message($inner_message);
458
                    if (in_array($real_key, $thread_keys)) {
459
                        unset($data_in[$inner_id]);
460
                        $deleted_keys[] = $real_key;
461
                        $u = true;
462
                    }
463
                }
464
                if ($u) {
465
                    // @hint Это always true condition. Иначе данные неконсистентны
466
                    if (count($data_in) > 0) {
467
                        $this->write_full_data_to_file($thread_id, $data_in);
468
                    } else {
469
                        unlink($this->get_producer_filename_for_thread($thread_id));
470
                    }
471
                }
472
473
                $mutex->release_lock();
474
                $this->index_data_save();
475
            }
476
            $this->index_mutex_release_lock();
477
478
            return $deleted_keys;
479
        }
480
481
        /**
482
         * Обновляем сообщение и сразу же сохраняем всё
483
         *
484
         * Эта функция не рейзит ошибку, если сообщение не найдено
485
         *
486
         * @param iMessage|object $message
487
         * @param string|null     $key форсированно задаём ключ сообщения
488
         *
489
         * @return boolean
490
         * @throws QueueException
491
         * @throws \NokitaKaze\Mutex\MutexException
492
         */
493
        function update_message($message, $key = null) {
494
            $this->init_producer_mutex();
495
            $this->index_mutex_lock();
496
            $this->index_data_load();
497
498
            if (is_null($key)) {
499
                $key = self::get_real_key_for_message($message);
500
            }
501
            $exists = false;
502
            foreach ($this->_index_data as $index_id => &$index_datum) {
503
                if (!array_key_exists($key, $index_datum)) {
504
                    continue;
505
                }
506
                $thread_id = $index_datum[$key];
507
                $mutex = $this->get_mutex_for_thread($thread_id);
508
                $mutex->get_lock();
509
                $data_in = $this->get_data_for_thread($thread_id);
510
                // Ищем то же сообщение и заменяем его
511
                $u = $this->change_message_in_array($data_in, $message, $key);
512
                if ($u) {
513
                    $exists = true;
514
                }
515
                $this->write_full_data_to_file($thread_id, $data_in);
516
                $mutex->release_lock();
517
                $this->index_data_save();
518
                break;
519
            }
520
            $this->index_mutex_release_lock();
521
522
            return $exists;
523
        }
524
525
        /**
526
         * @param double|integer $wait_time
527
         *
528
         * @return iMessage|object|null
529
         * @throws \NokitaKaze\Mutex\MutexException
530
         * @throws QueueException
531
         */
532
        function consume_next_message($wait_time = -1) {
533
            $this->set_same_time_flag(2);
534
            $start = microtime(true);
535
            $this->init_producer_mutex();
536
            $this->index_mutex_lock();
537
            $this->index_data_load();
538
            while (true) {
539
                for ($sort_id = 0; $sort_id < $this->_db_file_count; $sort_id++) {
540
                    foreach ($this->_index_data[$sort_id] as $key => $thread_id) {
541
                        if (isset($this->_consumed_keys[$key])) {
542
                            continue;
543
                        }
544
                        $mutex = $this->get_mutex_for_thread($thread_id);
545
                        $mutex->get_lock();
546
                        $data_in = $this->get_data_for_thread($thread_id);
547
                        $this->_consumed_keys[$key] = 1;
548
                        foreach ($data_in as $message) {
549
                            $this_key = self::get_real_key_for_message($message);
550
                            if ($this_key == $key) {
551
                                $message->time_consumed = microtime(true);
552
                                $message->thread_consumed = $thread_id;
553
                                $message->queue = $this;
554
                                $mutex->release_lock();
555
                                $this->index_mutex_release_lock();
556
557
                                return $message;
558
                            }
559
                        }
560
                        // @hint Сюда может передаться код только в случае неконсистентности БД, когда в
561
                        // index'е ключ есть, а в data его нет
562
                        unset($mutex);
563
                    }
564
                }
565
                $this->index_mutex_release_lock();
566
                if (($wait_time !== -1) and ($start + $wait_time < microtime(true))) {
567
                    return null;
568
                }
569
                $this->index_mutex_lock();
570
                $this->index_data_load();
571
            }
572
573
            // @hint Это для IDE
574
            // @codeCoverageIgnoreStart
575
            return null;
576
            // @codeCoverageIgnoreEnd
577
        }
578
579
        /**
580
         * @return string
581
         */
582
        function get_queue_name() {
583
            return $this->_name;
584
        }
585
586
        /**
587
         * Поддерживает ли очередь сортировку event'ов при вставке
588
         *
589
         * @return boolean
590
         */
591
        static function is_support_sorted_events() {
592
            return true;
593
        }
594
595
        /**
596
         * Индексы
597
         */
598
599
        /**
600
         * Название файла для содержания даты
601
         *
602
         * @return string
603
         */
604
        protected function get_index_filename() {
605
            return $this->_folder.'/smartqueue_'.$this->_prefix.'_'.hash('sha512', $this->_name).'-index.que';
606
        }
607
608
        /**
609
         * @throws QueueException
610
         */
611
        protected function index_data_load() {
612
            // @todo Проверять время-размер
613
            $filename = $this->get_index_filename();
614
            if (!file_exists($filename)) {
615
                $this->_index_data = array_fill(0, $this->_db_file_count, []);
616
617
                return;
618
            }
619
            if (!is_readable($filename) or !is_writable($filename)) {
620
                throw new QueueException('Index File "'.$filename.'" is not readable/writable');
621
            }
622
            $buf = file_get_contents($filename);
623
            if (empty($buf)) {
624
                throw new QueueException('Index File "'.$filename.'" is empty');
625
            }
626
            $this->_index_data_full = Queue::unserialize($buf, $is_valid);
627
            if (!is_object($this->_index_data_full)) {
628
                throw new QueueException('Index File "'.$filename.'" is empty');
629
            }
630
            if ($this->_index_data_full->version != self::FILE_DB_INDEX_VERSION) {
631
                throw new QueueException('Version mismatch ('.
632
                                         $this->_index_data_full->version.' instead of '.self::FILE_DB_INDEX_VERSION.')');
633
            }
634
            $this->_index_data = $this->_index_data_full->data;
635
            unset($this->_index_data_full->data);
636
        }
637
638
        /**
639
         * Сохраняем данные в индекс
640
         *
641
         * @throws QueueException
642
         */
643
        protected function index_data_save() {
644
            $filename = $this->get_index_filename();
645
            if (!file_exists(dirname($filename))) {
646
                throw new QueueException('Folder "'.dirname($filename).'" does not exist', 4);
647
            } elseif (!is_dir(dirname($filename))) {
648
                throw new QueueException('Folder "'.dirname($filename).'" is not a folder', 13);
649
            } elseif (!is_writable(dirname($filename))) {
650
                throw new QueueException('Folder "'.dirname($filename).'" is not writable', 5);
651
            } elseif (file_exists($filename) and !is_writable($filename)) {
652
                throw new QueueException('File "'.$filename.'" is not writable', 6);
653
            }
654
            $filename_tmp = $filename.'-'.mt_rand(0, 50000).'.tmp';
655
            touch($filename_tmp);
656
            chmod($filename_tmp, 6 << 6);
657
            // @todo fileperms
658
659
            if (is_null($this->_index_data_full)) {
660
                $this->_index_data_full = (object) [];
661
                $this->_index_data_full->time_create = microtime(true);
662
            }
663
            $this->_index_data_full->version = self::FILE_DB_INDEX_VERSION;
664
            $this->_index_data_full->time_last_update = microtime(true);
665
            $this->_index_data_full->queue_name = $this->get_queue_name();
666
            $temporary_data = clone $this->_index_data_full;
667
            $temporary_data->data = $this->_index_data;
668
669
            if (@file_put_contents($filename_tmp, Queue::serialize($temporary_data)) === false) {
670
                // @codeCoverageIgnoreStart
671
                throw new QueueException('Can not save index file: '.FileMutex::get_last_php_error_as_string(), 10);
672
                // @codeCoverageIgnoreEnd
673
            }
674
            if (!rename($filename_tmp, $filename)) {
675
                // @codeCoverageIgnoreStart
676
                throw new QueueException('Can not rename temporary index file: '.
677
                                         FileMutex::get_last_php_error_as_string(), 16);
678
                // @codeCoverageIgnoreEnd
679
            }
680
        }
681
682
        /**
683
         * @param FileDBQueueTransport $queue
684
         *
685
         * @return boolean
686
         */
687
        function is_equal_to($queue) {
688
            if (spl_object_hash($this) == spl_object_hash($queue)) {
689
                return true;
690
            }
691
            if (!parent::is_equal_to($queue)) {
692
                return false;
693
            }
694
695
            return (($queue->_folder == $this->_folder) and
696
                    ($queue->get_queue_name() == $this->get_queue_name()) and
697
                    ($queue->_prefix == $this->_prefix));
698
        }
699
    }
700
701
?>