Completed
Push — master ( 2a3e1c...68a3b4 )
by Akihito
02:23
created

ForkContainer::waitForChild()   B

Complexity

Conditions 5
Paths 3

Size

Total Lines 20
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 20
rs 8.8571
cc 5
eloc 14
nc 3
nop 0
1
<?php
2
namespace Ackintosh\Snidel;
3
4
use Ackintosh\Snidel\Fork;
5
use Ackintosh\Snidel\Pcntl;
6
use Ackintosh\Snidel\DataRepository;
7
use Ackintosh\Snidel\Task\Queue as TaskQueue;
8
use Ackintosh\Snidel\Result\Result;
9
use Ackintosh\Snidel\Result\Queue as ResultQueue;
10
use Ackintosh\Snidel\Result\Collection;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
13
14
class ForkContainer
15
{
16
    /** @var int */
17
    private $ownerPid;
18
19
    /** @var int */
20
    private $masterPid;
21
22
    /** @var \Ackintosh\Snidel\Fork[] */
23
    private $forks = array();
24
25
    /** @var \Ackintosh\Snidel\Result\Result[] */
26
    private $results = array();
27
28
    /** @var \Ackintosh\Snidel\Pcntl */
29
    private $pcntl;
30
31
    /** @var \Ackintosh\Snidel\DataRepository */
32
    private $dataRepository;
33
34
    /** @var \Ackintosh\Snidel\Error */
35
    private $error;
36
37
    /** @var \Ackintosh\Snidel\Task\Queue */
38
    private $taskQueue;
39
40
    /** @var \Ackintosh\Snidel\Result\Queue */
41
    private $resultQueue;
42
43
    /** @var \Ackintosh\Snidel\Log */
44
    private $log;
45
46
    /** @var array */
47
    private $signals = array(
48
        SIGTERM,
49
        SIGINT,
50
    );
51
52
    /** @var int */
53
    private $concurrency;
54
55
    /**
56
     * @param   int     $ownerPid
57
     */
58
    public function __construct($ownerPid, $log, $concurrency = 5)
59
    {
60
        $this->ownerPid         = $ownerPid;
61
        $this->log              = $log;
62
        $this->concurrency      = $concurrency;
63
        $this->pcntl            = new Pcntl();
64
        $this->dataRepository   = new DataRepository();
65
        $this->taskQueue        = new TaskQueue($this->ownerPid);
66
        $this->resultQueue      = new ResultQueue($this->ownerPid);
67
        $this->error            = new Error();
68
    }
69
70
    /**
71
     * @param   \Ackintosh\Snidel\Task
72
     * @return  void
73
     * @throws  \RuntimeException
74
     */
75
    public function enqueue($task)
76
    {
77
        try {
78
            $this->taskQueue->enqueue($task);
79
        } catch (\RuntimeException $e) {
80
            throw $e;
81
        }
82
    }
83
84
    /**
85
     * @return  int
86
     */
87
    public function queuedCount()
88
    {
89
        return $this->taskQueue->queuedCount();
90
    }
91
92
    /**
93
     * @return  \Ackintosh\Snidel\Fork
94
     */
95
    private function dequeue()
96
    {
97
        return $this->resultQueue->dequeue();
98
    }
99
100
    /**
101
     * @return  int
102
     */
103
    public function dequeuedCount()
104
    {
105
        return $this->resultQueue->dequeuedCount();
106
    }
107
108
    /**
109
     * fork process
110
     *
111
     * @return  \Ackintosh\Snidel\Fork
112
     * @throws  \RuntimeException
113
     */
114
    public function fork()
115
    {
116
        $pid = $this->pcntl->fork();
117
        if ($pid === -1) {
118
            throw new \RuntimeException('could not fork a new process');
119
        }
120
121
        $pid = ($pid === 0) ? getmypid() : $pid;
122
123
        $fork = new Fork($pid);
124
        $this->forks[$pid] = $fork;
125
126
        return $fork;
127
    }
128
129
    /**
130
     * fork master process
131
     *
132
     * @return  int     $masterPid
133
     */
134
    public function forkMaster()
135
    {
136
        $pid = $this->pcntl->fork();
137
        $this->masterPid = ($pid === 0) ? getmypid() : $pid;
138
        $this->log->setMasterPid($this->masterPid);
139
140
        if ($pid) {
141
            // owner
142
            $this->log->info('pid: ' . getmypid());
143
144
            return $this->masterPid;
145
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
146
            // error
147
        } else {
148
            // master
149
            $taskQueue = new TaskQueue($this->ownerPid);
150
            $this->log->info('pid: ' . $this->masterPid);
151
152
            foreach ($this->signals as $sig) {
153
                $this->pcntl->signal($sig, SIG_DFL, true);
154
            }
155
            $workerCount = 0;
156
157
            while ($task = $taskQueue->dequeue()) {
158
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
159
                if ($workerCount >= $this->concurrency) {
160
                    $status = null;
161
                    $this->pcntl->waitpid(-1, $status);
162
                    $workerCount--;
163
                }
164
                $this->forkWorker($task);
165
                $workerCount++;
166
            }
167
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
168
        }
169
    }
170
171
    /**
172
     * fork worker process
173
     *
174
     * @param   \Ackintosh\Snidel\Task
175
     * @return  void
176
     * @throws  \RuntimeException
177
     */
178
    private function forkWorker($task)
179
    {
180
        try {
181
            $fork = $this->fork();
182
        } catch (\RuntimeException $e) {
183
            $this->log->error($e->getMessage());
184
            throw $e;
185
        }
186
187
        if (getmypid() === $this->masterPid) {
188
            // master
189
            $this->log->info('forked worker. pid: ' . $fork->getPid());
190
        } else {
191
            // worker
192
            $this->log->info('has forked. pid: ' . getmypid());
193
            // @codeCoverageIgnoreStart
194
195
            foreach ($this->signals as $sig) {
196
                $this->pcntl->signal($sig, SIG_DFL, true);
197
            }
198
199
            $resultQueue = new ResultQueue($this->ownerPid);
200
            $resultHasQueued = false;
201
            register_shutdown_function(function () use (&$resultHasQueued, $fork, $task, $resultQueue) {
202
                if (!$resultHasQueued) {
203
                    $result = new Result();
204
                    $result->setFailure();
205
                    $result->setTask($task);
206
                    $result->setFork($fork);
207
                    $resultQueue->enqueue($result);
208
                }
209
            });
210
211
            $this->log->info('----> started the function.');
212
            $result = $task->execute();
213
            $result->setFork($fork);
214
            $this->log->info('<---- completed the function.');
215
216
            try {
217
                $resultQueue->enqueue($result);
218
            } catch (\RuntimeException $e) {
219
                $this->log->error($e->getMessage());
220
                $result->setFailure();
221
                $resultQueue->enqueue($result);
222
            }
223
            $resultHasQueued = true;
0 ignored issues
show
Unused Code introduced by
$resultHasQueued is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
224
            $this->log->info('queued the result and exit.');
225
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
226
            // @codeCoverageIgnoreEnd
227
        }
228
    }
229
230
    /**
231
     * @return  bool
232
     */
233
    public function existsMaster()
234
    {
235
        return $this->masterPid !== null;
236
    }
237
238
    /**
239
     * kill master process
240
     *
241
     * @return  void
242
     */
243
    public function killMaster()
244
    {
245
        posix_kill($this->masterPid, SIGTERM);
246
    }
247
248
    /**
249
     *
250
     * @param   string  $tag
251
     * @return  bool
252
     */
253
    public function hasTag($tag)
254
    {
255
        foreach ($this->results as $result) {
256
            if ($result->getTask()->getTag() === $tag) {
257
                return true;
258
            }
259
        }
260
261
        return false;
262
    }
263
264
    /**
265
     * @return void
266
     */
267
    public function wait()
268
    {
269
        for (; $this->queuedCount() > $this->dequeuedCount();) {
270
            $result = $this->dequeue();
271
            $pid = $result->getFork()->getPid();
0 ignored issues
show
Bug introduced by
The method getFork() does not seem to exist on object<Ackintosh\Snidel\Fork>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
272
            $this->results[$pid] = $result;
273
274
            if ($result->isFailure()) {
0 ignored issues
show
Bug introduced by
The method isFailure() does not seem to exist on object<Ackintosh\Snidel\Fork>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
275
                $this->error[$pid] = $result;
276
            }
277
        }
278
    }
279
280
    /**
281
     * wait child
282
     *
283
     * @return \Ackintosh\Snidel\Result\Result
284
     */
285
    public function waitForChild()
286
    {
287
        $status = null;
288
        $childPid = $this->pcntl->waitpid(-1, $status);
289
        try {
290
            $result = $this->dataRepository->load($childPid)->readAndDelete();
291
        } catch (SharedMemoryControlException $e) {
292
            throw $e;
293
        }
294
        $fork = $result->getFork();
295
        $fork->setStatus($status);
296
        $result->setFork($fork);
297
298
        if ($result->isFailure() || !$this->pcntl->wifexited($status) || $this->pcntl->wexitstatus($status) !== 0) {
299
            $this->error[$childPid] = $fork;
300
        }
301
        $this->results[$childPid] = $result;
302
303
        return $result;
304
    }
305
306
    /**
307
     * @return  array
308
     */
309
    public function getChildPids()
310
    {
311
        return array_keys($this->forks);
312
    }
313
314
    /**
315
     * return fork
316
     *
317
     * @param   int     $pid
318
     * @return  \Ackintosh\Snidel\Fork
319
     */
320
    public function get($pid)
321
    {
322
        return $this->results[$pid];
323
    }
324
325
    public function getCollection($tag = null)
326
    {
327
        if ($tag === null) {
328
            $collection = new Collection($this->results);
329
            $this->results = array();
330
331
            return $collection;
332
        }
333
334
        return $this->getCollectionWithTag($tag);
335
    }
336
337
    /**
338
     * return results
339
     *
340
     * @param   string  $tag
341
     * @return  \Ackintosh\Snidel\Result\Collection
342
     */
343
    private function getCollectionWithTag($tag)
344
    {
345
        $results = array();
346
        foreach ($this->results as $r) {
347
            if ($r->getTask()->getTag() !== $tag) {
348
                continue;
349
            }
350
351
            $results[] = $r;
352
            unset($this->results[$r->getFork()->getPid()]);
353
        }
354
355
        return new Collection($results);
356
    }
357
358
    /**
359
     * @return  bool
360
     */
361
    public function hasError()
362
    {
363
        return $this->error->exists();
364
    }
365
366
    /**
367
     * @return  \Ackintosh\Sniden\Error
368
     */
369
    public function getError()
370
    {
371
        return $this->error;
372
    }
373
374
    public function __destruct()
375
    {
376
        unset($this->taskQueue);
377
        unset($this->resultQueue);
378
    }
379
}
380