Completed
Push — master ( e5b054...7b4a69 )
by Akihito
02:22
created

Snidel::run()   A

Complexity

Conditions 2
Paths 3

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 12
rs 9.4285
cc 2
eloc 8
nc 3
nop 1
1
<?php
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\Result;
8
use Ackintosh\Snidel\Token;
9
use Ackintosh\Snidel\Log;
10
use Ackintosh\Snidel\Pcntl;
11
use Ackintosh\Snidel\DataRepository;
12
use Ackintosh\Snidel\MapContainer;
13
use Ackintosh\Snidel\Task;
14
use Ackintosh\Snidel\TaskQueue;
15
use Ackintosh\Snidel\ResultQueue;
16
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
17
18
class Snidel
19
{
20
    /** @var string */
21
    const VERSION = '0.6.0';
22
23
    private $masterProcessId = null;
24
25
    /** @var array */
26
    private $childPids = array();
27
28
    /** @var \Ackintosh\Snidel\ForkContainer */
29
    private $forkContainer;
30
31
    /** @var \Ackintosh\Snidel\Pcntl */
32
    private $pcntl;
33
34
    /** @var int */
35
    private $concurrency;
36
37
    /** @var \Ackintosh\Snidel\Token */
38
    private $token;
39
40
    /** @var \Ackintosh\Snidel\Log */
41
    private $log;
42
43
    /** @var \Ackintosh\Snidel\DataRepository */
44
    private $dataRepository;
45
46
    /** @var bool */
47
    private $joined = false;
48
49
    /** @var int */
50
    private $ownerPid;
51
52
    /** @var array */
53
    private $signals = array(
54
        SIGTERM,
55
        SIGINT,
56
    );
57
58
    /** @var int */
59
    private $receivedSignal;
60
61
    /** @var \Ackintosh\Snidel\Token */
62
    private $processToken;
63
64
    /** @var bool */
65
    private $exceptionHasOccured = false;
66
67
    public function __construct($concurrency = 5)
68
    {
69
        $this->ownerPid         = getmypid();
70
        $this->childPids        = array();
71
        $this->concurrency      = $concurrency;
72
        $this->token            = new Token(getmypid(), $concurrency);
73
        $this->log              = new Log(getmypid());
74
        $this->pcntl            = new Pcntl();
75
        $this->dataRepository   = new DataRepository();
76
        $this->forkContainer    = new ForkContainer($this->ownerPid);
77
78
        $log    = $this->log;
79
        $token  = $this->token;
80
        $self   = $this;
81
        foreach ($this->signals as $sig) {
82
            $this->pcntl->signal(
83
                $sig,
84
                function ($sig) use($log, $token, $self) {
85
                    $log->info('received signal. signo: ' . $sig);
86
                    $self->setReceivedSignal($sig);
87
88
                    $log->info('--> sending a signal to children.');
89
                    $self->sendSignalToChildren($sig);
90
91
                    $log->info('--> deleting token.');
92
                    unset($token);
93
94
                    $log->info('<-- signal handling has been completed successfully.');
95
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method __construct() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
96
                },
97
                false
98
            );
99
        }
100
101
        $this->log->info('parent pid: ' . $this->ownerPid);
102
    }
103
104
    /**
105
     * sets the resource for the log.
106
     *
107
     * @param   resource    $resource
108
     * @return  void
109
     * @codeCoverageIgnore
110
     */
111
    public function setLoggingDestination($resource)
112
    {
113
        $this->log->setDestination($resource);
114
    }
115
116
    /**
117
     * this method uses master / worker model.
118
     *
119
     * @param   callable    $callable
120
     * @param   mixed       $args
121
     * @param   string      $tag
122
     * @return  void
123
     * @throws  \RuntimeException
124
     */
125
    public function fork($callable, $args = array(), $tag = null)
126
    {
127
        if ($this->masterProcessId === null) {
128
            $this->forkMaster();
129
        }
130
131
        try {
132
            $this->forkContainer->enqueue(new Task($callable, $args, $tag));
133
        } catch (\RuntimeException $e) {
134
            throw $e;
135
        }
136
137
        $this->log->info('queued task #' . $this->forkContainer->queuedCount());
138
    }
139
140
    /**
141
     * fork master process
142
     *
143
     * @return  void
144
     */
145
    private function forkMaster()
146
    {
147
        $pid = $this->pcntl->fork();
148
        $this->masterProcessId = ($pid === 0) ? getmypid() : $pid;
149
        $this->log->setMasterProcessId($this->masterProcessId);
150
151
        if ($pid) {
152
            // owner
153
            $this->log->info('pid: ' . getmypid());
154
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
155
            // error
156
        } else {
157
            // master
158
            $taskQueue = new TaskQueue($this->ownerPid);
159
            $this->log->info('pid: ' . $this->masterProcessId);
160
161
            foreach ($this->signals as $sig) {
162
                $this->pcntl->signal($sig, SIG_DFL, true);
163
            }
164
165
            while ($task = $taskQueue->dequeue()) {
166
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
167
                if ($this->token->accept()) {
168
                    $this->forkWorker($task);
169
                }
170
            }
171
            $this->_exit();
172
        }
173
    }
174
175
    /**
176
     * fork worker process
177
     *
178
     * @param   \Ackintosh\Snidel\Task
179
     * @return  void
180
     * @throws  \RuntimeException
181
     */
182
    private function forkWorker($task)
183
    {
184
        try {
185
            $fork = $this->forkContainer->fork($task);
186
        } catch (\RuntimeException $e) {
187
            $this->log->error($e->getMessage());
188
            throw $e;
189
        }
190
191
        $fork->setTask($task);
192
193
        if (getmypid() === $this->masterProcessId) {
194
            // master
195
            $this->log->info('forked worker. pid: ' . $fork->getPid());
196
        } else {
197
            // worker
198
            $this->log->info('has forked. pid: ' . getmypid());
199
            // @codeCoverageIgnoreStart
200
201
            foreach ($this->signals as $sig) {
202
                $this->pcntl->signal($sig, SIG_DFL, true);
203
            }
204
205
            $resultQueue = new ResultQueue($this->ownerPid);
206
            register_shutdown_function(function () use ($fork, $resultQueue) {
207
                if ($fork->hasNoResult() || !$fork->isQueued()) {
208
                    $result = new Result();
209
                    $result->setFailure();
210
                    $fork->setResult($result);
211
                    $resultQueue->enqueue($fork);
212
                }
213
            });
214
215
            $this->log->info('----> started the function.');
216
            $fork->executeTask();
217
            $this->log->info('<---- completed the function.');
218
219
            $resultQueue->enqueue($fork);
220
            $fork->setQueued();
221
            $this->log->info('queued the result.');
222
223
            $this->token->back();
224
            $this->log->info('return the token and exit.');
225
            $this->_exit();
226
            // @codeCoverageIgnoreEnd
227
        }
228
    }
229
230
    /**
231
     * fork process
232
     * the processes which forked are wait for token.
233
     *
234
     * @param   callable                    $callable
235
     * @param   mixed                       $args
236
     * @param   string                      $tag
237
     * @param   \Ackintosh\Snidel\Token     $token
238
     * @return  void
239
     * @throws  \RuntimeException
240
     */
241
    private function prefork($callable, $args = array(), $tag = null, Token $token = null)
242
    {
243
        $this->processToken = $token ? $token : $this->token;
244
        $task = new Task($callable, $args, $tag);
245
246
        try {
247
            $fork = $this->forkContainer->fork($task);
248
        } catch (\RuntimeException $e) {
249
            $this->log->error($e->getMessage());
250
            throw $e;
251
        }
252
253
        $fork->setTask($task);
254
255
        if (getmypid() === $this->ownerPid) {
256
            // parent
257
            $this->log->info('created child process. pid: ' . $fork->getPid());
258
            $this->childPids[] = $fork->getPid();
259
        } else {
260
            // @codeCoverageIgnoreStart
261
            // child
262
            foreach ($this->signals as $sig) {
263
                $this->pcntl->signal($sig, SIG_DFL, true);
264
            }
265
266
            /**
267
             * in php5.3, we can not use $this in anonymous functions
268
             */
269
            $dataRepository     = $this->dataRepository;
270
            $log                = $this->log;
271
            $processToken       = $this->processToken;
272
            register_shutdown_function(function () use ($fork, $dataRepository, $log, $processToken) {
273
                $data = $dataRepository->load(getmypid());
274
                try {
275
                    $data->write($fork);
276
                } catch (SharedMemoryControlException $e) {
277
                    throw $e;
278
                }
279
                $log->info('<-- return token.');
280
                $processToken->back();
281
            });
282
283
            $log->info('--> waiting for the token come around.');
284
            if ($processToken->accept()) {
285
                $log->info('----> started the function.');
286
                $fork->executeTask();
287
                $log->info('<---- completed the function.');
288
            }
289
290
            $this->_exit();
291
            // @codeCoverageIgnoreEnd
292
        }
293
294
        return $fork->getPid();
295
    }
296
297
    /**
298
     * waits until all tasks that queued by Snidel::fork() are completed
299
     *
300
     * @return  void
301
     */
302
    public function wait()
303
    {
304
        $this->forkContainer->wait();
305
        $this->joined = true;
306
    }
307
308
    /**
309
     * @return  bool
310
     */
311
    public function hasError()
312
    {
313
        return $this->forkContainer->hasError();
314
    }
315
316
    /**
317
     * @return  \Ackintosh\Snidel\Error
318
     */
319
    public function getError()
320
    {
321
        return $this->forkContainer->getError();
322
    }
323
324
    /**
325
     * gets results
326
     *
327
     * @param   string  $tag
328
     * @return  \Ackintosh\Snidel\ForkCollection
329
     * @throws  \InvalidArgumentException
330
     */
331
    public function get($tag = null)
332
    {
333
        if (!$this->joined) {
334
            $this->wait();
335
        }
336
337
        if (!$this->forkContainer->hasTag($tag)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->forkContainer->hasTag($tag) of type boolean|null is loosely compared to false; this is ambiguous if the boolean can be false. You might want to explicitly use !== null instead.

If an expression can have both false, and null as possible values. It is generally a good practice to always use strict comparison to clearly distinguish between those two values.

$a = canBeFalseAndNull();

// Instead of
if ( ! $a) { }

// Better use one of the explicit versions:
if ($a !== null) { }
if ($a !== false) { }
if ($a !== null && $a !== false) { }
Loading history...
338
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
339
        }
340
341
        return $this->forkContainer->getCollection($tag);
342
    }
343
344
    /**
345
     * sends signal to child
346
     *
347
     * @param   int     $sig
348
     * @return  void
349
     */
350
    public function sendSignalToChildren($sig)
351
    {
352
        foreach ($this->childPids as $pid) {
353
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
354
            posix_kill($pid, $sig);
355
        }
356
    }
357
358
    public function setReceivedSignal($sig)
359
    {
360
        $this->receivedSignal = $sig;
361
    }
362
363
    /**
364
     * delete shared memory
365
     *
366
     * @return  void
367
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
368
     */
369
    private function deleteAllData()
370
    {
371
        foreach ($this->childPids as $pid) {
372
            $data = $this->dataRepository->load($pid);
373
            try {
374
                $data->deleteIfExists();
375
            } catch (SharedMemoryControlException $e) {
376
                throw $e;
377
            }
378
        }
379
    }
380
381
    /**
382
     * create map object
383
     *
384
     * @param   array       $args
385
     * @param   callable    $callable
386
     * @return  \Ackintosh\Snidel\MapContainer
387
     */
388
    public function map(Array $args, $callable)
389
    {
390
        return new MapContainer($args, $callable, $this->concurrency);
391
    }
392
393
    /**
394
     * run map object
395
     *
396
     * @param   \Ackintosh\Snidel\MapContainer
397
     * @return  array
398
     * @throws  \RuntimeException
399
     */
400
    public function run(MapContainer $mapContainer)
401
    {
402
        try {
403
            $this->forkTheFirstProcessing($mapContainer);
404
            $this->waitsAndConnectsProcess($mapContainer);
405
        } catch (\RuntimeException $e) {
406
            $this->exceptionHasOccured = true;
407
            throw $e;
408
        }
409
410
        return $this->getResultsOf($mapContainer);
411
    }
412
413
    /**
414
     * fork the first processing of the map container
415
     *
416
     * @param   \Ackintosh\Snidel\MapContainer
417
     * @return  void
418
     * @throws  \RuntimeException
419
     */
420
    private function forkTheFirstProcessing(MapContainer $mapContainer)
421
    {
422
        foreach ($mapContainer->getFirstArgs() as $args) {
423
            try {
424
                $childPid = $this->prefork($mapContainer->getFirstMap()->getCallable(), $args);
425
            } catch (\RuntimeException $e) {
426
                throw $e;
427
            }
428
            $mapContainer->getFirstMap()->countTheForked();
429
            $mapContainer->getFirstMap()->addChildPid($childPid);
430
        }
431
    }
432
433
    /**
434
     * waits and connects the process of map container
435
     *
436
     * @param   \Ackintosh\Snidel\MapContainer
437
     * @return  void
438
     * @throws  \RuntimeException
439
     */
440
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
441
    {
442
        if ($this->joined) {
443
            return;
444
        }
445
446
        while ($mapContainer->isProcessing()) {
447
            try {
448
                $fork = $this->forkContainer->waitSimply();
449
            } catch (SharedMemoryControlException $e) {
450
                throw $e;
451
            }
452
453
            $childPid = $fork->getPid();
454
            if ($fork->hasNotFinishedSuccessfully()) {
455
                $message = 'an error has occurred in child process. pid: ' . $childPid;
456
                $this->log->error($message);
457
                throw new \RuntimeException($message);
458
            }
459
460
            unset($this->childPids[array_search($childPid, $this->childPids)]);
461
            if ($nextMap = $mapContainer->nextMap($childPid)) {
462
                try {
463
                    $nextMapPid = $this->prefork(
464
                        $nextMap->getCallable(),
465
                        $fork,
466
                        null,
467
                        $nextMap->getToken()
468
                    );
469
                } catch (\RuntimeException $e) {
470
                    throw $e;
471
                }
472
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
473
                $this->log->info($message);
474
                $nextMap->countTheForked();
475
                $nextMap->addChildPid($nextMapPid);
476
            }
477
            $mapContainer->countTheCompleted($childPid);
478
        }
479
480
        $this->joined = true;
481
    }
482
483
    /**
484
     * gets results of map container
485
     *
486
     * @param   \Ackintosh\Snidel\MapContainer
487
     * @return  array
488
     */
489
    private function getResultsOf(MapContainer $mapContainer)
490
    {
491
        $results = array();
492
        foreach ($mapContainer->getLastMapPids() as $pid) {
493
            $results[] = $this->forkContainer->get($pid)->getResult()->getReturn();
494
        }
495
496
        return $results;
497
    }
498
499
    private function _exit($status = 0)
500
    {
501
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
502
    }
503
504
    public function __destruct()
505
    {
506
        if ($this->masterProcessId !== null && $this->ownerPid === getmypid()) {
507
            $this->log->info('shutdown master process.');
508
            posix_kill($this->masterProcessId, SIGTERM);
509
510
            unset($this->forkContainer);
511
        }
512
513
        if ($this->exceptionHasOccured) {
514
            $this->log->info('destruct processes are started.(exception has occured)');
515
            $this->log->info('--> deleting all shared memory.');
516
            $this->deleteAllData();
517
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
518
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
519
            $this->log->error($message);
520
            $this->log->info('destruct processes are started.');
521
522
            $this->log->info('--> sending a signal to children.');
523
            $this->sendSignalToChildren(SIGTERM);
524
525
            $this->log->info('--> deleting all shared memory.');
526
            $this->deleteAllData();
527
528
            $this->log->info('--> deleting token.');
529
            unset($this->token);
530
531
            $this->log->info('--> destruct processes are finished successfully.');
532
            throw new \LogicException($message);
533
        }
534
    }
535
}
536