Completed
Push — master ( 3f0397...4c7c4d )
by Akihito
03:07
created

Container::killMaster()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
namespace Ackintosh\Snidel\Fork;
3
4
use Ackintosh\Snidel\Fork\Fork;
5
use Ackintosh\Snidel\Pcntl;
6
use Ackintosh\Snidel\DataRepository;
7
use Ackintosh\Snidel\Task\Queue as TaskQueue;
8
use Ackintosh\Snidel\Result\Result;
9
use Ackintosh\Snidel\Result\Queue as ResultQueue;
10
use Ackintosh\Snidel\Result\Collection;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
13
14
class Container
15
{
16
    /** @var int */
17
    private $ownerPid;
18
19
    /** @var int */
20
    private $masterPid;
21
22
    /** @var \Ackintosh\Snidel\Fork\Fork[] */
23
    private $forks = array();
24
25
    /** @var \Ackintosh\Snidel\Result\Result[] */
26
    private $results = array();
27
28
    /** @var \Ackintosh\Snidel\Pcntl */
29
    private $pcntl;
30
31
    /** @var \Ackintosh\Snidel\DataRepository */
32
    private $dataRepository;
33
34
    /** @var \Ackintosh\Snidel\Error */
35
    private $error;
36
37
    /** @var \Ackintosh\Snidel\Task\Queue */
38
    private $taskQueue;
39
40
    /** @var \Ackintosh\Snidel\Result\Queue */
41
    private $resultQueue;
42
43
    /** @var \Ackintosh\Snidel\Log */
44
    private $log;
45
46
    /** @var array */
47
    private $signals = array(
48
        SIGTERM,
49
        SIGINT,
50
    );
51
52
    /** @var int */
53
    private $concurrency;
54
55
    /**
56
     * @param   int     $ownerPid
57
     */
58
    public function __construct($ownerPid, $log, $concurrency = 5)
59
    {
60
        $this->ownerPid         = $ownerPid;
61
        $this->log              = $log;
62
        $this->concurrency      = $concurrency;
63
        $this->pcntl            = new Pcntl();
64
        $this->dataRepository   = new DataRepository();
65
        $this->taskQueue        = new TaskQueue($this->ownerPid);
66
        $this->resultQueue      = new ResultQueue($this->ownerPid);
67
        $this->error            = new Error();
68
    }
69
70
    /**
71
     * @param   \Ackintosh\Snidel\Task
72
     * @return  void
73
     * @throws  \RuntimeException
74
     */
75
    public function enqueue($task)
76
    {
77
        try {
78
            $this->taskQueue->enqueue($task);
79
        } catch (\RuntimeException $e) {
80
            throw $e;
81
        }
82
    }
83
84
    /**
85
     * @return  int
86
     */
87
    public function queuedCount()
88
    {
89
        return $this->taskQueue->queuedCount();
90
    }
91
92
    /**
93
     * @return  \Ackintosh\Snidel\Fork\Fork
94
     */
95
    private function dequeue()
96
    {
97
        return $this->resultQueue->dequeue();
98
    }
99
100
    /**
101
     * @return  int
102
     */
103
    public function dequeuedCount()
104
    {
105
        return $this->resultQueue->dequeuedCount();
106
    }
107
108
    /**
109
     * fork process
110
     *
111
     * @return  \Ackintosh\Snidel\Fork\Fork
112
     * @throws  \RuntimeException
113
     */
114
    public function fork()
115
    {
116
        $pid = $this->pcntl->fork();
117
        if ($pid === -1) {
118
            throw new \RuntimeException('could not fork a new process');
119
        }
120
121
        $pid = ($pid === 0) ? getmypid() : $pid;
122
123
        $fork = new Fork($pid);
124
        $this->forks[$pid] = $fork;
125
126
        return $fork;
127
    }
128
129
    /**
130
     * fork master process
131
     *
132
     * @return  int     $masterPid
133
     */
134
    public function forkMaster()
135
    {
136
        $pid = $this->pcntl->fork();
137
        $this->masterPid = ($pid === 0) ? getmypid() : $pid;
138
        $this->log->setMasterPid($this->masterPid);
139
140
        if ($pid) {
141
            // owner
142
            $this->log->info('pid: ' . getmypid());
143
144
            return $this->masterPid;
145
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
146
            // error
147
        } else {
148
            // master
149
            $taskQueue = new TaskQueue($this->ownerPid);
150
            $this->log->info('pid: ' . $this->masterPid);
151
152
            $log = $this->log;
153
            foreach ($this->signals as $sig) {
154
                $this->pcntl->signal($sig, function ($sig) use ($log) {
155
                    $log->info('received signal: ' . $sig);
156
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
157
                });
158
            }
159
            $workerCount = 0;
160
161
            while ($task = $taskQueue->dequeue()) {
162
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
163
                if ($workerCount >= $this->concurrency) {
164
                    $status = null;
165
                    $this->pcntl->waitpid(-1, $status);
166
                    $workerCount--;
167
                }
168
                $this->forkWorker($task);
169
                $workerCount++;
170
            }
171
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
172
        }
173
    }
174
175
    /**
176
     * fork worker process
177
     *
178
     * @param   \Ackintosh\Snidel\Task
179
     * @return  void
180
     * @throws  \RuntimeException
181
     */
182
    private function forkWorker($task)
183
    {
184
        try {
185
            $fork = $this->fork();
186
        } catch (\RuntimeException $e) {
187
            $this->log->error($e->getMessage());
188
            throw $e;
189
        }
190
191
        if (getmypid() === $this->masterPid) {
192
            // master
193
            $this->log->info('forked worker. pid: ' . $fork->getPid());
194
        } else {
195
            // worker
196
            $this->log->info('has forked. pid: ' . getmypid());
197
            // @codeCoverageIgnoreStart
198
199
            foreach ($this->signals as $sig) {
200
                $this->pcntl->signal($sig, SIG_DFL, true);
201
            }
202
203
            $resultQueue = new ResultQueue($this->ownerPid);
204
            $resultHasQueued = false;
205
            register_shutdown_function(function () use (&$resultHasQueued, $fork, $task, $resultQueue) {
206
                if (!$resultHasQueued) {
207
                    $result = new Result();
208
                    $result->setError(error_get_last());
209
                    $result->setTask($task);
210
                    $result->setFork($fork);
211
                    $resultQueue->enqueue($result);
212
                }
213
            });
214
215
            $this->log->info('----> started the function.');
216
            $result = $task->execute();
217
            $result->setFork($fork);
218
            $this->log->info('<---- completed the function.');
219
220
            try {
221
                $resultQueue->enqueue($result);
222
            } catch (\RuntimeException $e) {
223
                $this->log->error($e->getMessage());
224
                $result->setError(error_get_last());
225
                $resultQueue->enqueue($result);
226
            }
227
            $resultHasQueued = true;
0 ignored issues
show
Unused Code introduced by
$resultHasQueued is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
228
            $this->log->info('queued the result and exit.');
229
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
230
            // @codeCoverageIgnoreEnd
231
        }
232
    }
233
234
    /**
235
     * @return  bool
236
     */
237
    public function existsMaster()
238
    {
239
        return $this->masterPid !== null;
240
    }
241
242
    /**
243
     * send signal to master process
244
     *
245
     * @return  void
246
     */
247
    public function sendSignalToMaster($sig = SIGTERM)
248
    {
249
        $this->log->info('----> sending signal to master. signal: ' . $sig);
250
        posix_kill($this->masterPid, $sig);
251
        $this->log->info('<---- sent signal.');
252
253
        $status = null;
254
        $this->pcntl->waitpid($this->masterPid, $status);
255
        $this->log->info('. status: ' . $status);
256
        $this->masterPid = null;
257
    }
258
259
    /**
260
     *
261
     * @param   string  $tag
262
     * @return  bool
263
     */
264
    public function hasTag($tag)
265
    {
266
        foreach ($this->results as $result) {
267
            if ($result->getTask()->getTag() === $tag) {
268
                return true;
269
            }
270
        }
271
272
        return false;
273
    }
274
275
    /**
276
     * @return void
277
     */
278
    public function wait()
279
    {
280
        for (; $this->queuedCount() > $this->dequeuedCount();) {
281
            $result = $this->dequeue();
282
            $pid = $result->getFork()->getPid();
283
            $this->results[$pid] = $result;
284
285
            if ($result->isFailure()) {
286
                $this->error[$pid] = $result;
287
            }
288
        }
289
    }
290
291
    /**
292
     * wait child
293
     *
294
     * @return \Ackintosh\Snidel\Result\Result
295
     */
296
    public function waitForChild()
297
    {
298
        $status = null;
299
        $childPid = $this->pcntl->waitpid(-1, $status);
300
        try {
301
            $result = $this->dataRepository->load($childPid)->readAndDelete();
302
        } catch (SharedMemoryControlException $e) {
303
            throw $e;
304
        }
305
        $fork = $result->getFork();
306
        $fork->setStatus($status);
307
        $result->setFork($fork);
308
309
        if ($result->isFailure() || !$this->pcntl->wifexited($status) || $this->pcntl->wexitstatus($status) !== 0) {
310
            $this->error[$childPid] = $fork;
311
        }
312
        $this->results[$childPid] = $result;
313
314
        return $result;
315
    }
316
317
    /**
318
     * @return  array
319
     */
320
    public function getChildPids()
321
    {
322
        return array_keys($this->forks);
323
    }
324
325
    /**
326
     * return fork
327
     *
328
     * @param   int     $pid
329
     * @return  \Ackintosh\Snidel\Fork\Fork
330
     */
331
    public function get($pid)
332
    {
333
        return $this->results[$pid];
334
    }
335
336
    public function getCollection($tag = null)
337
    {
338
        if ($tag === null) {
339
            $collection = new Collection($this->results);
340
            $this->results = array();
341
342
            return $collection;
343
        }
344
345
        return $this->getCollectionWithTag($tag);
346
    }
347
348
    /**
349
     * return results
350
     *
351
     * @param   string  $tag
352
     * @return  \Ackintosh\Snidel\Result\Collection
353
     */
354
    private function getCollectionWithTag($tag)
355
    {
356
        $results = array();
357
        foreach ($this->results as $r) {
358
            if ($r->getTask()->getTag() !== $tag) {
359
                continue;
360
            }
361
362
            $results[] = $r;
363
            unset($this->results[$r->getFork()->getPid()]);
364
        }
365
366
        return new Collection($results);
367
    }
368
369
    /**
370
     * @return  bool
371
     */
372
    public function hasError()
373
    {
374
        return $this->error->exists();
375
    }
376
377
    /**
378
     * @return  \Ackintosh\Sniden\Error
379
     */
380
    public function getError()
381
    {
382
        return $this->error;
383
    }
384
385
    public function __destruct()
386
    {
387
        unset($this->taskQueue);
388
        unset($this->resultQueue);
389
    }
390
}
391