Completed
Push — master ( 75acb7...b42713 )
by Akihito
03:09
created

Container::getChildren()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
namespace Ackintosh\Snidel\Fork;
3
4
use Ackintosh\Snidel\Fork\Fork;
5
use Ackintosh\Snidel\Pcntl;
6
use Ackintosh\Snidel\DataRepository;
7
use Ackintosh\Snidel\Task\Queue as TaskQueue;
8
use Ackintosh\Snidel\Result\Result;
9
use Ackintosh\Snidel\Result\Queue as ResultQueue;
10
use Ackintosh\Snidel\Result\Collection;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
13
use Ackintosh\Snidel\Worker;
14
use Ackintosh\Snidel\ActiveWorkerSet;
15
16
class Container
17
{
18
    /** @var int */
19
    private $ownerPid;
20
21
    /** @var int */
22
    private $masterPid;
23
24
    /** @var int[] */
25
    private $workerPids = array();
0 ignored issues
show
Unused Code introduced by
The property $workerPids is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
26
27
    /** @var \Ackintosh\Snidel\Fork\Fork[] */
28
    private $children = array();
29
30
    /** @var \Ackintosh\Snidel\Result\Result[] */
31
    private $results = array();
32
33
    /** @var \Ackintosh\Snidel\Pcntl */
34
    private $pcntl;
35
36
    /** @var \Ackintosh\Snidel\DataRepository */
37
    private $dataRepository;
38
39
    /** @var \Ackintosh\Snidel\Error */
40
    private $error;
41
42
    /** @var \Ackintosh\Snidel\Task\Queue */
43
    private $taskQueue;
44
45
    /** @var \Ackintosh\Snidel\Result\Queue */
46
    private $resultQueue;
47
48
    /** @var \Ackintosh\Snidel\Log */
49
    private $log;
50
51
    /** @var array */
52
    private $signals = array(
53
        SIGTERM,
54
        SIGINT,
55
    );
56
57
    /** @var int */
58
    private $concurrency;
59
60
    /**
61
     * @param   int     $ownerPid
62
     */
63
    public function __construct($ownerPid, $log, $concurrency = 5)
64
    {
65
        $this->ownerPid         = $ownerPid;
66
        $this->log              = $log;
67
        $this->concurrency      = $concurrency;
68
        $this->pcntl            = new Pcntl();
69
        $this->dataRepository   = new DataRepository();
70
        $this->taskQueue        = new TaskQueue($this->ownerPid);
71
        $this->resultQueue      = new ResultQueue($this->ownerPid);
72
        $this->error            = new Error();
73
    }
74
75
    /**
76
     * @param   \Ackintosh\Snidel\Task
77
     * @return  void
78
     * @throws  \RuntimeException
79
     */
80
    public function enqueue($task)
81
    {
82
        try {
83
            $this->taskQueue->enqueue($task);
84
        } catch (\RuntimeException $e) {
85
            throw $e;
86
        }
87
    }
88
89
    /**
90
     * @return  int
91
     */
92
    public function queuedCount()
93
    {
94
        return $this->taskQueue->queuedCount();
95
    }
96
97
    /**
98
     * @return  \Ackintosh\Snidel\Fork\Fork
99
     */
100
    private function dequeue()
101
    {
102
        return $this->resultQueue->dequeue();
103
    }
104
105
    /**
106
     * @return  int
107
     */
108
    public function dequeuedCount()
109
    {
110
        return $this->resultQueue->dequeuedCount();
111
    }
112
113
    /**
114
     * fork process
115
     *
116
     * @return  \Ackintosh\Snidel\Fork\Fork
117
     * @throws  \RuntimeException
118
     */
119
    private function fork()
120
    {
121
        $pid = $this->pcntl->fork();
122
        if ($pid === -1) {
123
            throw new \RuntimeException('could not fork a new process');
124
        }
125
126
        $pid = ($pid === 0) ? getmypid() : $pid;
127
128
        return new Fork($pid);
129
    }
130
131
    /**
132
     * fork child process
133
     *
134
     * @return  \Ackintosh\Snidel\Fork\Fork
135
     * @throws  \RuntimeException
136
     */
137
    public function forkChild()
138
    {
139
        try {
140
            $fork = $this->fork();
141
        } catch (\RuntimeException $e) {
142
            throw $e;
143
        }
144
145
        $this->children[] = $fork;
146
147
        return $fork;
148
    }
149
150
    /**
151
     * fork master process
152
     *
153
     * @return  int     $masterPid
154
     */
155
    public function forkMaster()
156
    {
157
        try {
158
            $fork = $this->fork();
159
        } catch (\RuntimeException $e) {
160
            throw $e;
161
        }
162
163
        $this->masterPid = $fork->getPid();
164
        $this->log->setMasterPid($this->masterPid);
165
166
        if (getmypid() === $this->ownerPid) {
167
            // owner
168
            $this->log->info('pid: ' . getmypid());
169
170
            return $this->masterPid;
171
        } else {
172
            // master
173
            $taskQueue          = new TaskQueue($this->ownerPid);
174
            $activeWorkerSet    = new ActiveWorkerSet();
175
            $this->log->info('pid: ' . $this->masterPid);
176
177
            $log = $this->log;
178
            foreach ($this->signals as $sig) {
179
                $this->pcntl->signal($sig, function ($sig) use ($log, $activeWorkerSet) {
180
                    $log->info('received signal: ' . $sig);
181
182
                    if ($activeWorkerSet->count() === 0) {
183
                        $log->info('no worker is active.');
184
                    } else {
185
                        $log->info('------> sending signal to workers. signal: ' . $sig);
186
                        $activeWorkerSet->terminate($sig);
187
                        $log->info('<------ sent signal');
188
                    }
189
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
190
                });
191
            }
192
193
            while ($task = $taskQueue->dequeue()) {
194
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
195
                if ($activeWorkerSet->count() >= $this->concurrency) {
196
                    $status = null;
197
                    $workerPid = $this->pcntl->waitpid(-1, $status);
198
                    $activeWorkerSet->delete($workerPid);
199
                }
200
                $activeWorkerSet->add(
201
                    $this->forkWorker($task)
202
                );
203
            }
204
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
205
        }
206
    }
207
208
    /**
209
     * fork worker process
210
     *
211
     * @param   \Ackintosh\Snidel\Task
212
     * @return  \Ackintosh\Snidel\Worker
213
     * @throws  \RuntimeException
214
     */
215
    private function forkWorker($task)
216
    {
217
        try {
218
            $fork = $this->fork();
219
        } catch (\RuntimeException $e) {
220
            $this->log->error($e->getMessage());
221
            throw $e;
222
        }
223
224
        $worker = new Worker($fork, $task);
225
226
        if (getmypid() === $this->masterPid) {
227
            // master
228
            $this->log->info('forked worker. pid: ' . $worker->getPid());
229
            return $worker;
230
        } else {
231
            // worker
232
            $this->log->info('has forked. pid: ' . getmypid());
233
            // @codeCoverageIgnoreStart
234
235
            foreach ($this->signals as $sig) {
236
                $this->pcntl->signal($sig, SIG_DFL, true);
237
            }
238
239
            $worker->setResultQueue(new ResultQueue($this->ownerPid));
240
241
            $resultHasQueued = false;
242
            register_shutdown_function(function () use (&$resultHasQueued, $worker) {
243
                if (!$resultHasQueued) {
244
                    $worker->error();
245
                }
246
            });
247
248
            $this->log->info('----> started the function.');
249
            try {
250
                $worker->run();
251
            } catch (\RuntimeException $e) {
252
                $this->log->error($e->getMessage());
253
                exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
254
            }
255
            $this->log->info('<---- completed the function.');
256
257
            $resultHasQueued = true;
0 ignored issues
show
Unused Code introduced by
$resultHasQueued is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
258
            $this->log->info('queued the result and exit.');
259
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
260
            // @codeCoverageIgnoreEnd
261
        }
262
    }
263
264
    /**
265
     * @return  bool
266
     */
267
    public function existsMaster()
268
    {
269
        return $this->masterPid !== null;
270
    }
271
272
    /**
273
     * send signal to master process
274
     *
275
     * @return  void
276
     */
277
    public function sendSignalToMaster($sig = SIGTERM)
278
    {
279
        $this->log->info('----> sending signal to master. signal: ' . $sig);
280
        posix_kill($this->masterPid, $sig);
281
        $this->log->info('<---- sent signal.');
282
283
        $status = null;
284
        $this->pcntl->waitpid($this->masterPid, $status);
285
        $this->log->info('. status: ' . $status);
286
        $this->masterPid = null;
287
    }
288
289
    /**
290
     *
291
     * @param   string  $tag
292
     * @return  bool
293
     */
294
    public function hasTag($tag)
295
    {
296
        foreach ($this->results as $result) {
297
            if ($result->getTask()->getTag() === $tag) {
298
                return true;
299
            }
300
        }
301
302
        return false;
303
    }
304
305
    /**
306
     * @return void
307
     */
308
    public function wait()
309
    {
310
        for (; $this->queuedCount() > $this->dequeuedCount();) {
311
            $result = $this->dequeue();
312
            $pid = $result->getFork()->getPid();
313
            $this->results[$pid] = $result;
314
315
            if ($result->isFailure()) {
316
                $this->error[$pid] = $result;
317
            }
318
        }
319
    }
320
321
    /**
322
     * wait child
323
     *
324
     * @return \Ackintosh\Snidel\Result\Result
325
     */
326
    public function waitForChild()
327
    {
328
        $status = null;
329
        $childPid = $this->pcntl->waitpid(-1, $status);
330
        try {
331
            $result = $this->dataRepository->load($childPid)->readAndDelete();
332
        } catch (SharedMemoryControlException $e) {
333
            throw $e;
334
        }
335
        $fork = $result->getFork();
336
        $fork->setStatus($status);
337
        $result->setFork($fork);
338
339
        if ($result->isFailure() || !$this->pcntl->wifexited($status) || $this->pcntl->wexitstatus($status) !== 0) {
340
            $this->error[$childPid] = $fork;
341
        }
342
        $this->results[$childPid] = $result;
343
344
        return $result;
345
    }
346
347
    /**
348
     * @return  array
349
     */
350
    public function getChildren()
351
    {
352
        return $this->children;
353
    }
354
355
    /**
356
     * return fork
357
     *
358
     * @param   int     $pid
359
     * @return  \Ackintosh\Snidel\Fork\Fork
360
     */
361
    public function get($pid)
362
    {
363
        return $this->results[$pid];
364
    }
365
366
    public function getCollection($tag = null)
367
    {
368
        if ($tag === null) {
369
            $collection = new Collection($this->results);
370
            $this->results = array();
371
372
            return $collection;
373
        }
374
375
        return $this->getCollectionWithTag($tag);
376
    }
377
378
    /**
379
     * return results
380
     *
381
     * @param   string  $tag
382
     * @return  \Ackintosh\Snidel\Result\Collection
383
     */
384
    private function getCollectionWithTag($tag)
385
    {
386
        $results = array();
387
        foreach ($this->results as $r) {
388
            if ($r->getTask()->getTag() !== $tag) {
389
                continue;
390
            }
391
392
            $results[] = $r;
393
            unset($this->results[$r->getFork()->getPid()]);
394
        }
395
396
        return new Collection($results);
397
    }
398
399
    /**
400
     * @return  bool
401
     */
402
    public function hasError()
403
    {
404
        return $this->error->exists();
405
    }
406
407
    /**
408
     * @return  \Ackintosh\Sniden\Error
409
     */
410
    public function getError()
411
    {
412
        return $this->error;
413
    }
414
415
    public function __destruct()
416
    {
417
        unset($this->taskQueue);
418
        unset($this->resultQueue);
419
    }
420
}
421