Completed
Push — master ( 579af5...b29473 )
by Oleg
07:53
created

micro/cli/Threads.php (2 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php /** MicroThreads */
2
3
namespace Micro\cli;
4
5
use Micro\base\Exception;
6
7
/**
8
 * Threads class file.
9
 *
10
 * @author Oleg Lunegov <[email protected]>
11
 * @link https://github.com/lugnsk/micro
12
 * @copyright Copyright &copy; 2013 Oleg Lunegov
13
 * @license /LICENSE
14
 * @package micro
15
 * @subpackage cli
16
 * @version 1.0
17
 * @since 1.0
18
 */
19
abstract class Threads
20
{
21
    /** @var string $name thread name */
22
    private $name;
23
    /** @var integer $pid process ID */
24
    private $pid;
25
    /** @var integer $puid process UID */
26
    private $puid;
27
    /** @var integer $guid process GUID */
28
    private $guid;
29
    /** @var bool $isChild is child */
30
    private $isChild = false;
31
    /** @var array $internalIPCArray Internal IPC array */
32
    private $internalIPCArray = [];
33
    /** @var integer $internalIPCKey Internal IPC key */
34
    private $internalIPCKey;
35
    /** @var integer $internalSemaphoreKey Internal semaphore key */
36
    private $internalSemaphoreKey;
37
    /** @var bool $isIPC is IPC */
38
    private $isIPC = false;
39
    /** @var bool $running is running */
40
    private $running = false;
41
    /** @var string $fileIPC1 file IPC1 */
42
    private $fileIPC1;
43
    /** @var string $fileIPC2 file IPC2 */
44
    private $fileIPC2;
45
46
47
    /**
48
     * Constructor thread
49
     *
50
     * @access public
51
     *
52
     * @param     $name
53
     * @param int $puid
54
     * @param int $guid
55
     * @param int $umask
56
     *
57
     * @result void
58
     * @throws Exception
59
     */
60
    public function __construct($name, $puid = 0, $guid = 0, $umask = -1)
61
    {
62
        if (empty($_SERVER['argc'])) {
63
            throw new Exception('Threads are permitted only for CLI');
64
        }
65
        $this->name = $name;
66
        $this->guid = $guid;
67
        $this->puid = $puid;
68
69
        if ($umask !== -1) {
70
            umask($umask);
71
        }
72
73
        $this->isChild = false;
74
        $this->internalIPCArray = [];
75
        $this->isIPC = false;
76
77
        if ($this->createIPCSegment() && $this->createIPCSemaphore()) {
78
            $this->isIPC = true;
79
        }
80
    }
81
82
    /**
83
     * Create IPC segment
84
     *
85
     * @access protected
86
     * @return bool
87
     * @throws Exception
88
     */
89 View Code Duplication
    protected function createIPCSegment()
90
    {
91
        $this->fileIPC1 = '/tmp/' . mt_rand() . md5($this->getName()) . '.shm';
92
93
        touch($this->fileIPC1);
94
95
        $shm_key = ftok($this->fileIPC1, 't');
96
        if ($shm_key === -1) {
97
            throw new Exception('Fatal exception creating SHM segment (ftok)');
98
        }
99
100
        $this->internalIPCKey = @shmop_open($shm_key, 'c', 0644, 10240);
101
        if (!$this->internalIPCKey) {
102
            return false;
103
        }
104
105
        return true;
106
    }
107
108
    /**
109
     * get thread name
110
     *
111
     * @access public
112
     * @return string
113
     */
114
    public function getName()
115
    {
116
        return $this->name;
117
    }
118
119
    /**
120
     * Set thread name
121
     *
122
     * @access public
123
     *
124
     * @param string $name
125
     *
126
     * @return void
127
     */
128
    public function setName($name)
129
    {
130
        $this->name = $name;
131
    }
132
133
    /**
134
     * Create IPC semaphore
135
     *
136
     * @access protected
137
     * @return bool
138
     * @throws Exception
139
     */
140 View Code Duplication
    protected function createIPCSemaphore()
141
    {
142
        $this->fileIPC2 = '/tmp/' . mt_rand() . md5($this->getName()) . '.sem';
143
144
        touch($this->fileIPC2);
145
146
        $sem_key = ftok($this->fileIPC2, 't');
147
        if ($sem_key === -1) {
148
            throw new Exception('Fatal exception creating semaphore (ftok)');
149
        }
150
151
        $this->internalSemaphoreKey = shmop_open($sem_key, 'c', 0644, 10);
152
        if (!$this->internalSemaphoreKey) {
153
            return false;
154
        }
155
156
        return true;
157
    }
158
159
    /**
160
     * Is running thread
161
     *
162
     * @access public
163
     * @return bool
164
     */
165
    public function isRunning()
166
    {
167
        return (bool)$this->running;
168
    }
169
170
    /**
171
     * Set alive
172
     *
173
     * @access public
174
     * @return void
175
     * @throws \Micro\base\Exception
176
     */
177
    public function setAlive()
178
    {
179
        $this->setVariable('_pingTime', time());
180
    }
181
182
    /**
183
     * Set variable in shared memory
184
     *
185
     * @access public
186
     *
187
     * @param string $name
188
     * @param mixed $value
189
     *
190
     * @return void
191
     * @throws \Micro\base\Exception
192
     */
193
    public function setVariable($name, $value)
194
    {
195
        $this->internalIPCArray[$name] = $value;
196
        $this->writeToIPCSegment();
197
    }
198
199
    /**
200
     * Write to IPC segment
201
     *
202
     * @access protected
203
     * @return void
204
     * @throws Exception
205
     */
206
    protected function writeToIPCSegment()
207
    {
208
        if (shmop_read($this->internalSemaphoreKey, 1, 1) === 1) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison === seems to always evaluate to false as the types of shmop_read($this->internalSemaphoreKey, 1, 1) (string) and 1 (integer) can never be identical. Maybe you want to use a loose comparison == instead?
Loading history...
209
            return;
210
        }
211
212
        $serialized_IPC_array = serialize($this->internalIPCArray);
213
        $shm_bytes_written = shmop_write($this->internalIPCKey, $serialized_IPC_array, 0);
214
215
        if ($shm_bytes_written !== strlen($serialized_IPC_array)) {
216
            throw new Exception(
217
                'Fatal exception writing SHM segment (shmop_write)' . strlen($serialized_IPC_array) .
218
                '-' . shmop_size($this->internalIPCKey)
219
            );
220
        }
221
    }
222
223
    /**
224
     * Get last alive
225
     *
226
     * @access public
227
     * @return int
228
     * @throws \Micro\base\Exception
229
     */
230
    public function getLastAlive()
231
    {
232
        $timestamp = (int)$this->getVariable('_pingTime');
233
        if ($timestamp === 0) {
234
            return 0;
235
        } else {
236
            return (time() - $timestamp);
237
        }
238
    }
239
240
    /**
241
     * Get variable from shared memory
242
     *
243
     * @access public
244
     *
245
     * @param string $name
246
     *
247
     * @return mixed
248
     * @throws \Micro\base\Exception
249
     */
250
    public function getVariable($name)
251
    {
252
        $this->readFromIPCSegment();
253
254
        return $this->internalIPCArray[$name];
255
    }
256
257
    /**
258
     * Read from IPC segment
259
     *
260
     * @access public
261
     * @return void
262
     * @throws Exception
263
     */
264
    protected function readFromIPCSegment()
265
    {
266
        $serialized_IPC_array = shmop_read($this->internalIPCKey, 0, shmop_size($this->internalIPCKey));
267
268
        if (!$serialized_IPC_array) {
269
            throw new Exception('Fatal exception reading SHM segment (shmop_read)' . "\n");
270
        }
271
272
        unset($this->internalIPCArray);
273
274
        $this->internalIPCArray = @unserialize($serialized_IPC_array);
275
    }
276
277
    /**
278
     * Get thread process ID
279
     *
280
     * @access public
281
     * @return int
282
     */
283
    public function getPid()
284
    {
285
        return $this->pid;
286
    }
287
288
    /**
289
     * Register callback func into shared memory
290
     *
291
     * @access public
292
     *
293
     * @param mixed $argList
294
     * @param string $methodName
295
     *
296
     * @return mixed|void
297
     * @throws \Micro\base\Exception
298
     */
299
    public function register_callback_func($argList, $methodName)
300
    {
301
        if (is_array($argList) && count($argList) > 1) {
302
            if ($argList[1] === -2) {
303
                $this->internalIPCArray['_call_type'] = -2;
304
            } else {
305
                $this->internalIPCArray['_call_type'] = -1;
306
            }
307
        } else {
308
            $this->internalIPCArray['_call_type'] = -1;
309
        }
310
311
        $this->internalIPCArray['_call_method'] = $methodName;
312
        $this->internalIPCArray['_call_input'] = $argList;
313
314
        $this->writeToIPCSegment();
315
316
        switch ($this->internalIPCArray['_call_type']) {
317
            case -1:
318
                $this->sendSigUsr1();
319
                break;
320
321
            case -2:
322
                shmop_write($this->internalSemaphoreKey, 1, 0);
323
324
                $this->sendSigUsr1();
325
                $this->waitIPCSemaphore();
326
                $this->readFromIPCSegment();
327
328
                shmop_write($this->internalSemaphoreKey, 0, 1);
329
330
                return $this->internalIPCArray['_call_output'];
331
                break;
332
        }
333
334
        return false;
335
    }
336
337
    /**
338
     * Send signal USR1
339
     *
340
     * @access protected
341
     * @return void
342
     */
343
    protected function sendSigUsr1()
344
    {
345
        if ($this->pid > 0) {
346
            posix_kill($this->pid, SIGUSR1);
347
        }
348
    }
349
350
    /**
351
     * Wait IPC semaphore
352
     *
353
     * @access protected
354
     * @return void
355
     */
356
    protected function waitIPCSemaphore()
357
    {
358
        while (true) {
359
            $ok = shmop_read($this->internalSemaphoreKey, 0, 1);
360
361
            if ($ok === 0) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison === seems to always evaluate to false as the types of $ok (string) and 0 (integer) can never be identical. Maybe you want to use a loose comparison == instead?
Loading history...
362
                break;
363
            } else {
364
                usleep(10);
365
            }
366
        }
367
    }
368
369
    /**
370
     * Start
371
     *
372
     * @access public
373
     * @return void
374
     * @throws Exception
375
     */
376
    public function start()
377
    {
378
        if (!$this->isIPC) {
379
            throw new Exception('Fatal error, unable to create SHM segments for process communications');
380
        }
381
382
        pcntl_signal(SIGCHLD, SIG_IGN);
383
384
        $pid = pcntl_fork();
385
        if ($pid === 0) {
386
            $this->isChild = true;
387
            sleep(1);
388
389
            pcntl_signal(SIGUSR1, [$this, 'sigHandler']);
390
391
            if ($this->guid !== 0) {
392
                posix_setgid($this->guid);
393
            }
394
            if ($this->puid !== 0) {
395
                posix_setuid($this->puid);
396
            }
397
            $this->run();
398
399
            exit(0);
400
        } else {
401
            $this->isChild = false;
402
            $this->running = true;
403
            $this->pid = $pid;
404
        }
405
    }
406
407
    /**
408
     * Running thread
409
     *
410
     * @access public
411
     * @return void
412
     */
413
    abstract public function run();
414
415
    /**
416
     * Stop thread
417
     *
418
     * @access public
419
     * @return bool
420
     */
421
    public function stop()
422
    {
423
        $success = false;
424
425
        if ($this->pid > 0) {
426
            posix_kill($this->pid, 9);
427
            pcntl_waitpid($this->pid, $temp = 0, WNOHANG);
428
429
            $success = pcntl_wifexited($temp);
430
431
            $this->cleanThreadContext();
432
        }
433
434
        return $success;
435
    }
436
437
    /**
438
     * Clean thread context
439
     *
440
     * @access protected
441
     * @return void
442
     */
443
    protected function cleanThreadContext()
444
    {
445
        @shmop_delete($this->internalIPCKey);
446
        @shmop_delete($this->internalSemaphoreKey);
447
448
        @shmop_close($this->internalIPCKey);
449
        @shmop_close($this->internalSemaphoreKey);
450
451
        unlink($this->fileIPC1);
452
        unlink($this->fileIPC2);
453
454
        $this->running = false;
455
        unset($this->pid);
456
    }
457
458
    /**
459
     * Signal handler
460
     *
461
     * @access protected
462
     *
463
     * @param $sigNo
464
     *
465
     * @return void
466
     * @throws \Micro\base\Exception
467
     */
468
    protected function sigHandler($sigNo)
469
    {
470
        switch ($sigNo) {
471
            case SIGTERM:
472
                exit;
473
                break;
474
475
            case SIGHUP:
476
                break;
477
478
            case SIGUSR1:
479
                $this->readFromIPCSegment();
480
481
                $method = $this->internalIPCArray['_call_method'];
482
                $params = $this->internalIPCArray['_call_input'];
483
484
                switch ($this->internalIPCArray['_call_type']) {
485
                    case -1:
486
                        $this->$method($params);
487
                        break;
488
489
                    case -2:
490
                        $this->internalIPCArray['_call_output'] = $this->$method($params);
491
492
                        $this->writeToIPCSegment();
493
494
                        shmop_write($this->internalSemaphoreKey, 0, 0);
495
                        shmop_write($this->internalSemaphoreKey, 1, 1);
496
497
                        break;
498
                }
499
                break;
500
501
            default:
502
                // handle all other signals
503
        }
504
    }
505
}
506