Completed
Push — master ( cfbc00...1384b1 )
by Akihito
02:24
created

ForkContainer::forkWorker()   B

Complexity

Conditions 6
Paths 6

Size

Total Lines 49
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 5
Bugs 1 Features 1
Metric Value
c 5
b 1
f 1
dl 0
loc 49
rs 8.5906
cc 6
eloc 32
nc 6
nop 1
1
<?php
2
namespace Ackintosh\Snidel;
3
4
use Ackintosh\Snidel\Fork;
5
use Ackintosh\Snidel\ForkCollection;
6
use Ackintosh\Snidel\Pcntl;
7
use Ackintosh\Snidel\DataRepository;
8
use Ackintosh\Snidel\Task\Queue as TaskQueue;
9
use Ackintosh\Snidel\ResultQueue;
10
use Ackintosh\Snidel\ResultCollection;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
13
14
class ForkContainer
15
{
16
    /** @var int */
17
    private $ownerPid;
18
19
    /** @var int */
20
    private $masterPid;
21
22
    /** @var \Ackintosh\Snidel\Fork[] */
23
    private $forks = array();
24
25
    /** @var \Ackintosh\Snidel\Result[] */
26
    private $results = array();
27
28
    /** @var \Ackintosh\Snidel\Pcntl */
29
    private $pcntl;
30
31
    /** @var \Ackintosh\Snidel\DataRepository */
32
    private $dataRepository;
33
34
    /** @var \Ackintosh\Snidel\Error */
35
    private $error;
36
37
    /** @var \Ackintosh\Snidel\Task\Queue */
38
    private $taskQueue;
39
40
    /** @var \Ackintosh\Snidel\ResultQueue */
41
    private $resultQueue;
42
43
    /** @var \Ackintosh\Snidel\Log */
44
    private $log;
45
46
    /** @var array */
47
    private $signals = array(
48
        SIGTERM,
49
        SIGINT,
50
    );
51
52
    /** @var int */
53
    private $concurrency;
54
55
    /**
56
     * @param   int     $ownerPid
57
     */
58
    public function __construct($ownerPid, $log, $concurrency = 5)
59
    {
60
        $this->ownerPid         = $ownerPid;
61
        $this->log              = $log;
62
        $this->concurrency      = $concurrency;
63
        $this->pcntl            = new Pcntl();
64
        $this->dataRepository   = new DataRepository();
65
        $this->taskQueue        = new TaskQueue($this->ownerPid);
66
        $this->resultQueue      = new ResultQueue($this->ownerPid);
67
        $this->error            = new Error();
68
    }
69
70
    /**
71
     * @param   \Ackintosh\Snidel\Task
72
     * @return  void
73
     * @throws  \RuntimeException
74
     */
75
    public function enqueue($task)
76
    {
77
        try {
78
            $this->taskQueue->enqueue($task);
79
        } catch (\RuntimeException $e) {
80
            throw $e;
81
        }
82
    }
83
84
    /**
85
     * @return  int
86
     */
87
    public function queuedCount()
88
    {
89
        return $this->taskQueue->queuedCount();
90
    }
91
92
    /**
93
     * @return  \Ackintosh\Snidel\Fork
94
     */
95
    private function dequeue()
96
    {
97
        return $this->resultQueue->dequeue();
98
    }
99
100
    /**
101
     * @return  int
102
     */
103
    public function dequeuedCount()
104
    {
105
        return $this->resultQueue->dequeuedCount();
106
    }
107
108
    /**
109
     * fork process
110
     *
111
     * @param   \Ackintosh\Snidel\Task
112
     * @return  \Ackintosh\Snidel\Fork
113
     * @throws  \RuntimeException
114
     */
115
    public function fork($task)
116
    {
117
        $pid = $this->pcntl->fork();
118
        if ($pid === -1) {
119
            throw new \RuntimeException('could not fork a new process');
120
        }
121
122
        $pid = ($pid === 0) ? getmypid() : $pid;
123
124
        $fork = new Fork($pid, $task);
125
        $this->forks[$pid] = $fork;
126
127
        return $fork;
128
    }
129
130
    /**
131
     * fork master process
132
     *
133
     * @return  int     $masterPid
134
     */
135
    public function forkMaster()
136
    {
137
        $pid = $this->pcntl->fork();
138
        $this->masterPid = ($pid === 0) ? getmypid() : $pid;
139
        $this->log->setMasterPid($this->masterPid);
140
141
        if ($pid) {
142
            // owner
143
            $this->log->info('pid: ' . getmypid());
144
145
            return $this->masterPid;
146
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
147
            // error
148
        } else {
149
            // master
150
            $taskQueue = new TaskQueue($this->ownerPid);
151
            $this->log->info('pid: ' . $this->masterPid);
152
153
            foreach ($this->signals as $sig) {
154
                $this->pcntl->signal($sig, SIG_DFL, true);
155
            }
156
            $workerCount = 0;
157
158
            while ($task = $taskQueue->dequeue()) {
159
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
160
                if ($workerCount >= $this->concurrency) {
161
                    $status = null;
162
                    $this->pcntl->waitpid(-1, $status);
163
                    $workerCount--;
164
                }
165
                $this->forkWorker($task);
166
                $workerCount++;
167
            }
168
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
169
        }
170
    }
171
172
    /**
173
     * fork worker process
174
     *
175
     * @param   \Ackintosh\Snidel\Task
176
     * @return  void
177
     * @throws  \RuntimeException
178
     */
179
    private function forkWorker($task)
180
    {
181
        try {
182
            $fork = $this->fork($task);
183
        } catch (\RuntimeException $e) {
184
            $this->log->error($e->getMessage());
185
            throw $e;
186
        }
187
188
        if (getmypid() === $this->masterPid) {
189
            // master
190
            $this->log->info('forked worker. pid: ' . $fork->getPid());
191
        } else {
192
            // worker
193
            $this->log->info('has forked. pid: ' . getmypid());
194
            // @codeCoverageIgnoreStart
195
196
            foreach ($this->signals as $sig) {
197
                $this->pcntl->signal($sig, SIG_DFL, true);
198
            }
199
200
            $resultQueue = new ResultQueue($this->ownerPid);
201
            $resultHasQueued = false;
202
            register_shutdown_function(function () use (&$resultHasQueued, $fork, $resultQueue) {
203
                if (!$resultHasQueued) {
204
                    $result = new Result();
205
                    $result->setFailure();
206
                    $result->setFork($fork);
207
                    $resultQueue->enqueue($result);
208
                }
209
            });
210
211
            $this->log->info('----> started the function.');
212
            $result = $fork->executeTask();
213
            $this->log->info('<---- completed the function.');
214
215
            try {
216
                $resultQueue->enqueue($result);
217
            } catch (\RuntimeException $e) {
218
                $this->log->error($e->getMessage());
219
                $result->setFailure();
220
                $resultQueue->enqueue($result);
221
            }
222
            $resultHasQueued = true;
0 ignored issues
show
Unused Code introduced by
$resultHasQueued is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
223
            $this->log->info('queued the result and exit.');
224
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
225
            // @codeCoverageIgnoreEnd
226
        }
227
    }
228
229
    /**
230
     * @return  bool
231
     */
232
    public function existsMaster()
233
    {
234
        return $this->masterPid !== null;
235
    }
236
237
    /**
238
     * kill master process
239
     *
240
     * @return  void
241
     */
242
    public function killMaster()
243
    {
244
        posix_kill($this->masterPid, SIGTERM);
245
    }
246
247
    /**
248
     *
249
     * @param   string  $tag
250
     * @return  bool
251
     */
252
    public function hasTag($tag)
253
    {
254
        foreach ($this->results as $result) {
255
            if ($result->getFork()->getTag() === $tag) {
256
                return true;
257
            }
258
        }
259
260
        return false;
261
    }
262
263
    /**
264
     * @return void
265
     */
266
    public function wait()
267
    {
268
        for (; $this->queuedCount() > $this->dequeuedCount();) {
269
            $result = $this->dequeue();
270
            $pid = $result->getFork()->getPid();
0 ignored issues
show
Bug introduced by
The method getFork() does not seem to exist on object<Ackintosh\Snidel\Fork>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
271
            $this->results[$pid] = $result;
272
273
            if ($result->isFailure()) {
0 ignored issues
show
Bug introduced by
The method isFailure() does not seem to exist on object<Ackintosh\Snidel\Fork>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
274
                $this->error[$pid] = $result;
275
            }
276
        }
277
    }
278
279
    /**
280
     * wait child
281
     *
282
     * @return \Ackintosh\Snidel\Result
283
     */
284
    public function waitForChild()
285
    {
286
        $status = null;
287
        $childPid = $this->pcntl->waitpid(-1, $status);
288
        try {
289
            $result = $this->dataRepository->load($childPid)->readAndDelete();
290
        } catch (SharedMemoryControlException $e) {
291
            throw $e;
292
        }
293
        $fork = $result->getFork();
294
        $fork->setStatus($status);
295
        $result->setFork($fork);
296
297
        if ($fork->hasNotFinishedSuccessfully()) {
298
            $this->error[$childPid] = $fork;
299
        }
300
        $this->results[$childPid] = $result;
301
302
        return $result;
303
    }
304
305
    /**
306
     * @return  array
307
     */
308
    public function getChildPids()
309
    {
310
        return array_keys($this->forks);
311
    }
312
313
    /**
314
     * return fork
315
     *
316
     * @param   int     $pid
317
     * @return  \Ackintosh\Snidel\Fork
318
     */
319
    public function get($pid)
320
    {
321
        return $this->results[$pid];
322
    }
323
324
    public function getCollection($tag = null)
325
    {
326
        if ($tag === null) {
327
            $collection = new ResultCollection($this->results);
328
            $this->results = array();
329
330
            return $collection;
331
        }
332
333
        return $this->getCollectionWithTag($tag);
334
    }
335
336
    /**
337
     * return results
338
     *
339
     * @param   string  $tag
340
     * @return  \Ackintosh\Snidel\ResultCollection
341
     */
342
    private function getCollectionWithTag($tag)
343
    {
344
        $results = array();
345
        foreach ($this->results as $r) {
346
            if ($r->getFork()->getTag() !== $tag) {
347
                continue;
348
            }
349
350
            $results[] = $r;
351
            unset($this->results[$r->getFork()->getPid()]);
352
        }
353
354
        return new ResultCollection($results);
355
    }
356
357
    /**
358
     * @return  bool
359
     */
360
    public function hasError()
361
    {
362
        return $this->error->exists();
363
    }
364
365
    /**
366
     * @return  \Ackintosh\Sniden\Error
367
     */
368
    public function getError()
369
    {
370
        return $this->error;
371
    }
372
373
    public function __destruct()
374
    {
375
        unset($this->taskQueue);
376
        unset($this->resultQueue);
377
    }
378
}
379