Completed
Push — master ( 579af5...b29473 )
by Oleg
07:53
created

micro/cli/Threads.php (2 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php /** MicroThreads */
2
3
namespace Micro\Cli;
4
5
use Micro\Base\Exception;
6
7
/**
8
 * Threads class file.
9
 *
10
 * @author Oleg Lunegov <[email protected]>
11
 * @link https://github.com/lugnsk/micro
12
 * @copyright Copyright &copy; 2013 Oleg Lunegov
13
 * @license /LICENSE
14
 * @package Micro
15
 * @subpackage Cli
16
 * @version 1.0
17
 * @since 1.0
18
 */
19
abstract class Threads
20
{
21
    /** @var string $name thread name */
22
    private $name;
23
    /** @var integer $pid process ID */
24
    private $pid;
25
    /** @var integer $puid process UID */
26
    private $puid;
27
    /** @var integer $guid process GUID */
28
    private $guid;
29
    /** @var bool $isChild is child */
30
    private $isChild = false;
31
    /** @var array $internalIPCArray Internal IPC array */
32
    private $internalIPCArray = [];
33
    /** @var integer $internalIPCKey Internal IPC key */
34
    private $internalIPCKey;
35
    /** @var integer $internalSemaphoreKey Internal semaphore key */
36
    private $internalSemaphoreKey;
37
    /** @var bool $isIPC is IPC */
38
    private $isIPC = false;
39
    /** @var bool $running is running */
40
    private $running = false;
41
    /** @var string $fileIPC1 file IPC1 */
42
    private $fileIPC1;
43
    /** @var string $fileIPC2 file IPC2 */
44
    private $fileIPC2;
45
46
47
    /**
48
     * Constructor thread
49
     *
50
     * @access public
51
     *
52
     * @param string $name
53
     * @param int    $puid
54
     * @param int    $guid
55
     * @param int    $umask
56
     *
57
     * @result void
58
     * @throws Exception
59
     */
60
    public function __construct($name, $puid = 0, $guid = 0, $umask = -1)
61
    {
62
        if (empty($_SERVER['argc'])) {
63
            throw new Exception('Threads are permitted only for CLI');
64
        }
65
        $this->name = $name;
66
        $this->guid = $guid;
67
        $this->puid = $puid;
68
69
        if ($umask !== -1) {
70
            umask($umask);
71
        }
72
73
        $this->isChild = false;
74
        $this->internalIPCArray = [];
75
        $this->isIPC = false;
76
77
        if ($this->createIPCSegment() && $this->createIPCSemaphore()) {
78
            $this->isIPC = true;
79
        }
80
    }
81
82
    /**
83
     * Create IPC segment
84
     *
85
     * @access protected
86
     *
87
     * @return bool
88
     * @throws Exception
89
     */
90 View Code Duplication
    protected function createIPCSegment()
91
    {
92
        $this->fileIPC1 = '/tmp/' . mt_rand() . md5($this->getName()) . '.shm';
93
94
        touch($this->fileIPC1);
95
96
        $shm_key = ftok($this->fileIPC1, 't');
97
        if ($shm_key === -1) {
98
            throw new Exception('Fatal exception creating SHM segment (ftok)');
99
        }
100
101
        $this->internalIPCKey = @shmop_open($shm_key, 'c', 0644, 10240);
102
        if (!$this->internalIPCKey) {
103
            return false;
104
        }
105
106
        return true;
107
    }
108
109
    /**
110
     * get thread name
111
     *
112
     * @access public
113
     * @return string
114
     */
115
    public function getName()
116
    {
117
        return $this->name;
118
    }
119
120
    /**
121
     * Set thread name
122
     *
123
     * @access public
124
     *
125
     * @param string $name
126
     *
127
     * @return void
128
     */
129
    public function setName($name)
130
    {
131
        $this->name = $name;
132
    }
133
134
    /**
135
     * Create IPC semaphore
136
     *
137
     * @access protected
138
     *
139
     * @return bool
140
     * @throws Exception
141
     */
142 View Code Duplication
    protected function createIPCSemaphore()
143
    {
144
        $this->fileIPC2 = '/tmp/' . mt_rand() . md5($this->getName()) . '.sem';
145
146
        touch($this->fileIPC2);
147
148
        $sem_key = ftok($this->fileIPC2, 't');
149
        if ($sem_key === -1) {
150
            throw new Exception('Fatal exception creating semaphore (ftok)');
151
        }
152
153
        $this->internalSemaphoreKey = shmop_open($sem_key, 'c', 0644, 10);
154
        if (!$this->internalSemaphoreKey) {
155
            return false;
156
        }
157
158
        return true;
159
    }
160
161
    /**
162
     * Is running thread
163
     *
164
     * @access public
165
     * @return bool
166
     */
167
    public function isRunning()
168
    {
169
        return (bool)$this->running;
170
    }
171
172
    /**
173
     * Set alive
174
     *
175
     * @access public
176
     *
177
     * @return void
178
     * @throws \Micro\Base\Exception
179
     */
180
    public function setAlive()
181
    {
182
        $this->setVariable('_pingTime', time());
183
    }
184
185
    /**
186
     * Set variable in shared memory
187
     *
188
     * @access public
189
     *
190
     * @param string $name
191
     * @param mixed $value
192
     *
193
     * @return void
194
     * @throws \Micro\Base\Exception
195
     */
196
    public function setVariable($name, $value)
197
    {
198
        $this->internalIPCArray[$name] = $value;
199
        $this->writeToIPCSegment();
200
    }
201
202
    /**
203
     * Write to IPC segment
204
     *
205
     * @access protected
206
     * @return void
207
     * @throws Exception
208
     */
209
    protected function writeToIPCSegment()
210
    {
211
        if (shmop_read($this->internalSemaphoreKey, 1, 1) === 1) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison === seems to always evaluate to false as the types of shmop_read($this->internalSemaphoreKey, 1, 1) (string) and 1 (integer) can never be identical. Maybe you want to use a loose comparison == instead?
Loading history...
212
            return;
213
        }
214
215
        $serialized_IPC_array = serialize($this->internalIPCArray);
216
        $shm_bytes_written = shmop_write($this->internalIPCKey, $serialized_IPC_array, 0);
217
218
        if ($shm_bytes_written !== strlen($serialized_IPC_array)) {
219
            throw new Exception(
220
                'Fatal exception writing SHM segment (shmop_write)' . strlen($serialized_IPC_array) .
221
                '-' . shmop_size($this->internalIPCKey)
222
            );
223
        }
224
    }
225
226
    /**
227
     * Get last alive
228
     *
229
     * @access public
230
     * @return int
231
     * @throws \Micro\Base\Exception
232
     */
233
    public function getLastAlive()
234
    {
235
        $timestamp = (int)$this->getVariable('_pingTime');
236
        if ($timestamp === 0) {
237
            return 0;
238
        } else {
239
            return (time() - $timestamp);
240
        }
241
    }
242
243
    /**
244
     * Get variable from shared memory
245
     *
246
     * @access public
247
     *
248
     * @param string $name
249
     *
250
     * @return mixed
251
     * @throws \Micro\Base\Exception
252
     */
253
    public function getVariable($name)
254
    {
255
        $this->readFromIPCSegment();
256
257
        return $this->internalIPCArray[$name];
258
    }
259
260
    /**
261
     * Read from IPC segment
262
     *
263
     * @access public
264
     * @return void
265
     * @throws Exception
266
     */
267
    protected function readFromIPCSegment()
268
    {
269
        $serialized_IPC_array = shmop_read($this->internalIPCKey, 0, shmop_size($this->internalIPCKey));
270
271
        if (!$serialized_IPC_array) {
272
            throw new Exception('Fatal exception reading SHM segment (shmop_read)' . "\n");
273
        }
274
275
        unset($this->internalIPCArray);
276
277
        $this->internalIPCArray = @unserialize($serialized_IPC_array);
278
    }
279
280
    /**
281
     * Get thread process ID
282
     *
283
     * @access public
284
     * @return int
285
     */
286
    public function getPid()
287
    {
288
        return $this->pid;
289
    }
290
291
    /**
292
     * Register callback func into shared memory
293
     *
294
     * @access public
295
     *
296
     * @param mixed $argList
297
     * @param string $methodName
298
     *
299
     * @return mixed|void
300
     * @throws \Micro\Base\Exception
301
     */
302
    public function register_callback_func($argList, $methodName)
303
    {
304
        if (is_array($argList) && count($argList) > 1) {
305
            if ($argList[1] === -2) {
306
                $this->internalIPCArray['_call_type'] = -2;
307
            } else {
308
                $this->internalIPCArray['_call_type'] = -1;
309
            }
310
        } else {
311
            $this->internalIPCArray['_call_type'] = -1;
312
        }
313
314
        $this->internalIPCArray['_call_method'] = $methodName;
315
        $this->internalIPCArray['_call_input'] = $argList;
316
317
        $this->writeToIPCSegment();
318
319
        switch ($this->internalIPCArray['_call_type']) {
320
            case -1:
321
                $this->sendSigUsr1();
322
                break;
323
324
            case -2:
325
                shmop_write($this->internalSemaphoreKey, 1, 0);
326
327
                $this->sendSigUsr1();
328
                $this->waitIPCSemaphore();
329
                $this->readFromIPCSegment();
330
331
                shmop_write($this->internalSemaphoreKey, 0, 1);
332
333
                return $this->internalIPCArray['_call_output'];
334
                break;
335
        }
336
337
        return false;
338
    }
339
340
    /**
341
     * Send signal USR1
342
     *
343
     * @access protected
344
     * @return void
345
     */
346
    protected function sendSigUsr1()
347
    {
348
        if ($this->pid > 0) {
349
            posix_kill($this->pid, SIGUSR1);
350
        }
351
    }
352
353
    /**
354
     * Wait IPC semaphore
355
     *
356
     * @access protected
357
     * @return void
358
     */
359
    protected function waitIPCSemaphore()
360
    {
361
        while (true) {
362
            $ok = shmop_read($this->internalSemaphoreKey, 0, 1);
363
364
            if ($ok === 0) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison === seems to always evaluate to false as the types of $ok (string) and 0 (integer) can never be identical. Maybe you want to use a loose comparison == instead?
Loading history...
365
                break;
366
            } else {
367
                usleep(10);
368
            }
369
        }
370
    }
371
372
    /**
373
     * Start
374
     *
375
     * @access public
376
     *
377
     * @return void
378
     * @throws Exception
379
     */
380
    public function start()
381
    {
382
        if (!$this->isIPC) {
383
            throw new Exception('Fatal error, unable to create SHM segments for process communications');
384
        }
385
386
        pcntl_signal(SIGCHLD, SIG_IGN);
387
388
        $pid = pcntl_fork();
389
        if ($pid === 0) {
390
            $this->isChild = true;
391
            sleep(1);
392
393
            pcntl_signal(SIGUSR1, [$this, 'sigHandler']);
394
395
            if ($this->guid !== 0) {
396
                posix_setgid($this->guid);
397
            }
398
            if ($this->puid !== 0) {
399
                posix_setuid($this->puid);
400
            }
401
            $this->run();
402
403
            exit(0);
404
        } else {
405
            $this->isChild = false;
406
            $this->running = true;
407
            $this->pid = $pid;
408
        }
409
    }
410
411
    /**
412
     * Running thread
413
     *
414
     * @access public
415
     * @return void
416
     */
417
    abstract public function run();
418
419
    /**
420
     * Stop thread
421
     *
422
     * @access public
423
     * @return bool
424
     */
425
    public function stop()
426
    {
427
        $success = false;
428
429
        if ($this->pid > 0) {
430
            posix_kill($this->pid, 9);
431
            pcntl_waitpid($this->pid, $temp = 0, WNOHANG);
432
433
            $success = pcntl_wifexited($temp);
434
435
            $this->cleanThreadContext();
436
        }
437
438
        return $success;
439
    }
440
441
    /**
442
     * Clean thread context
443
     *
444
     * @access protected
445
     * @return void
446
     */
447
    protected function cleanThreadContext()
448
    {
449
        @shmop_delete($this->internalIPCKey);
450
        @shmop_delete($this->internalSemaphoreKey);
451
452
        @shmop_close($this->internalIPCKey);
453
        @shmop_close($this->internalSemaphoreKey);
454
455
        unlink($this->fileIPC1);
456
        unlink($this->fileIPC2);
457
458
        $this->running = false;
459
        unset($this->pid);
460
    }
461
462
    /**
463
     * Signal handler
464
     *
465
     * @access protected
466
     *
467
     * @param int $sigNo
468
     *
469
     * @return void
470
     * @throws \Micro\Base\Exception
471
     */
472
    protected function sigHandler($sigNo)
473
    {
474
        switch ($sigNo) {
475
            case SIGTERM:
476
                exit;
477
                break;
478
479
            case SIGHUP:
480
                break;
481
482
            case SIGUSR1:
483
                $this->readFromIPCSegment();
484
485
                $method = $this->internalIPCArray['_call_method'];
486
                $params = $this->internalIPCArray['_call_input'];
487
488
                switch ($this->internalIPCArray['_call_type']) {
489
                    case -1:
490
                        $this->$method($params);
491
                        break;
492
493
                    case -2:
494
                        $this->internalIPCArray['_call_output'] = $this->$method($params);
495
496
                        $this->writeToIPCSegment();
497
498
                        shmop_write($this->internalSemaphoreKey, 0, 0);
499
                        shmop_write($this->internalSemaphoreKey, 1, 1);
500
501
                        break;
502
                }
503
                break;
504
505
            default:
506
                // handle all other signals
507
        }
508
    }
509
}
510