Completed
Push — master ( 024d05...2ca7c9 )
by Akihito
02:21
created

ForkContainer::killMaster()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
namespace Ackintosh\Snidel;
3
4
use Ackintosh\Snidel\Fork;
5
use Ackintosh\Snidel\ForkCollection;
6
use Ackintosh\Snidel\Token;
7
use Ackintosh\Snidel\Pcntl;
8
use Ackintosh\Snidel\DataRepository;
9
use Ackintosh\Snidel\TaskQueue;
10
use Ackintosh\Snidel\ResultQueue;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
13
14
class ForkContainer
15
{
16
    /** @var int */
17
    private $ownerPid;
18
19
    /** @var int */
20
    private $masterPid;
21
22
    /** @var \Ackintosh\Snidel\Fork[] */
23
    private $forks = array();
24
25
    /** @var \Ackintosh\Snidel\Pcntl */
26
    private $pcntl;
27
28
    /** @var \Ackintosh\Snidel\DataRepository */
29
    private $dataRepository;
30
31
    /** @var \Ackintosh\Snidel\Error */
32
    private $error;
33
34
    /** @var \Ackintosh\Snidel\TaskQueue */
35
    private $taskQueue;
36
37
    /** @var \Ackintosh\Snidel\ResultQueue */
38
    private $resultQueue;
39
40
    /** @var \Ackintosh\Snidel\Log */
41
    private $log;
42
43
    /** @var array */
44
    private $signals = array(
45
        SIGTERM,
46
        SIGINT,
47
    );
48
49
    /**
50
     * @param   int     $ownerPid
51
     */
52
    public function __construct($ownerPid, $log, $concurrency = 5)
53
    {
54
        $this->ownerPid         = $ownerPid;
55
        $this->log              = $log;
56
        $this->token            = new Token($this->ownerPid, $concurrency);
0 ignored issues
show
Bug introduced by
The property token does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
57
        $this->pcntl            = new Pcntl();
58
        $this->dataRepository   = new DataRepository();
59
        $this->taskQueue        = new TaskQueue($this->ownerPid);
60
        $this->resultQueue      = new ResultQueue($this->ownerPid);
61
        $this->error            = new Error();
62
    }
63
64
    /**
65
     * @param   \Ackintosh\Snidel\Task
66
     * @return  void
67
     */
68
    public function enqueue($task)
69
    {
70
        $this->taskQueue->enqueue($task);
71
    }
72
73
    /**
74
     * @return  int
75
     */
76
    public function queuedCount()
77
    {
78
        return $this->taskQueue->queuedCount();
79
    }
80
81
    /**
82
     * @return  \Ackintosh\Snidel\Fork
83
     */
84
    private function dequeue()
85
    {
86
        return $this->resultQueue->dequeue();
87
    }
88
89
    /**
90
     * @return  int
91
     */
92
    public function dequeuedCount()
93
    {
94
        return $this->resultQueue->dequeuedCount();
95
    }
96
97
    /**
98
     * fork process
99
     *
100
     * @param   \Ackintosh\Snidel\Task
101
     * @return  \Ackintosh\Snidel\Fork
102
     * @throws  \RuntimeException
103
     */
104
    public function fork($task)
105
    {
106
        $pid = $this->pcntl->fork();
107
        if ($pid === -1) {
108
            throw new \RuntimeException('could not fork a new process');
109
        }
110
111
        $pid = ($pid === 0) ? getmypid() : $pid;
112
113
        $fork = new Fork($pid, $task);
114
        $this->forks[$pid] = $fork;
115
116
        return $fork;
117
    }
118
119
    /**
120
     * fork master process
121
     *
122
     * @return  int     $masterPid
123
     */
124
    public function forkMaster()
125
    {
126
        $pid = $this->pcntl->fork();
127
        $this->masterPid = ($pid === 0) ? getmypid() : $pid;
128
        $this->log->setMasterProcessId($this->masterPid);
129
130
        if ($pid) {
131
            // owner
132
            $this->log->info('pid: ' . getmypid());
133
134
            return $this->masterPid;
135
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
136
            // error
137
        } else {
138
            // master
139
            $taskQueue = new TaskQueue($this->ownerPid);
140
            $this->log->info('pid: ' . $this->masterPid);
141
142
            foreach ($this->signals as $sig) {
143
                $this->pcntl->signal($sig, SIG_DFL, true);
144
            }
145
146
            while ($task = $taskQueue->dequeue()) {
147
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
148
                if ($this->token->accept()) {
149
                    $this->forkWorker($task);
150
                }
151
            }
152
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
153
        }
154
    }
155
156
    /**
157
     * fork worker process
158
     *
159
     * @param   \Ackintosh\Snidel\Task
160
     * @return  void
161
     * @throws  \RuntimeException
162
     */
163
    private function forkWorker($task)
164
    {
165
        try {
166
            $fork = $this->fork($task);
167
        } catch (\RuntimeException $e) {
168
            $this->log->error($e->getMessage());
169
            throw $e;
170
        }
171
172
        if (getmypid() === $this->masterPid) {
173
            // master
174
            $this->log->info('forked worker. pid: ' . $fork->getPid());
175
        } else {
176
            // worker
177
            $this->log->info('has forked. pid: ' . getmypid());
178
            // @codeCoverageIgnoreStart
179
180
            foreach ($this->signals as $sig) {
181
                $this->pcntl->signal($sig, SIG_DFL, true);
182
            }
183
184
            $resultQueue = new ResultQueue($this->ownerPid);
185
            register_shutdown_function(function () use ($fork, $resultQueue) {
186
                if ($fork->hasNoResult() || !$fork->isQueued()) {
187
                    $result = new Result();
188
                    $result->setFailure();
189
                    $fork->setResult($result);
190
                    $resultQueue->enqueue($fork);
191
                }
192
            });
193
194
            $this->log->info('----> started the function.');
195
            $fork->executeTask();
196
            $this->log->info('<---- completed the function.');
197
198
            $resultQueue->enqueue($fork);
199
            $fork->setQueued();
200
            $this->log->info('queued the result.');
201
202
            $this->token->back();
203
            $this->log->info('return the token and exit.');
204
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
205
            // @codeCoverageIgnoreEnd
206
        }
207
    }
208
209
    /**
210
     * @return  bool
211
     */
212
    public function existsMaster()
213
    {
214
        return $this->masterPid !== null;
215
    }
216
217
    /**
218
     * kill master process
219
     *
220
     * @return  void
221
     */
222
    public function killMaster()
223
    {
224
        posix_kill($this->masterPid, SIGTERM);
225
    }
226
227
    /**
228
     *
229
     * @param   string  $tag
230
     * @return  bool
231
     */
232
    public function hasTag($tag)
233
    {
234
        foreach ($this->forks as $fork) {
235
            if ($fork->getTag() === $tag) {
236
                return true;
237
            }
238
        }
239
240
        return false;
241
    }
242
243
    /**
244
     * @return void
245
     */
246
    public function wait()
247
    {
248
        for (; $this->queuedCount() > $this->dequeuedCount();) {
249
            $fork = $this->dequeue();
250
            $this->forks[$fork->getPid()] = $fork;
251
252
            if ($fork->getResult()->isFailure()) {
253
                $this->error[$fork->getPid()] = $fork;
254
            }
255
        }
256
    }
257
258
    /**
259
     * wait child
260
     *
261
     * @return \Ackintosh\Snidel\Fork
262
     */
263
    public function waitSimply()
264
    {
265
        $status = null;
266
        $childPid = $this->pcntl->waitpid(-1, $status);
267
        try {
268
            $fork = $this->dataRepository->load($childPid)->readAndDelete();
269
        } catch (SharedMemoryControlException $e) {
270
            throw $e;
271
        }
272
        $fork->setStatus($status);
273
274
        if ($fork->hasNotFinishedSuccessfully()) {
275
            $this->error[$childPid] = $fork;
276
        }
277
278
        $this->forks[$childPid] = $fork;
279
        return $fork;
280
    }
281
282
    /**
283
     * @return  array
284
     */
285
    public function getChildPids()
286
    {
287
        return array_keys($this->forks);
288
    }
289
290
    /**
291
     * return fork
292
     *
293
     * @param   int     $pid
294
     * @return  \Ackintosh\Snidel\Fork
295
     */
296
    public function get($pid)
297
    {
298
        return $this->forks[$pid];
299
    }
300
301
    public function getCollection($tag = null)
302
    {
303
        if ($tag === null) {
304
            return new ForkCollection($this->forks);
305
        }
306
307
        return $this->getCollectionWithTag($tag);
308
    }
309
310
    /**
311
     * return forks
312
     *
313
     * @param   string  $tag
314
     * @return  \Ackintosh\Snidel\Fork[]
315
     */
316
    private function getCollectionWithTag($tag)
317
    {
318
        $collection = array_filter($this->forks, function ($fork) use ($tag) {
319
            return $fork->getTag() ===  $tag;
320
        });
321
322
        return new ForkCollection($collection);
323
    }
324
325
    /**
326
     * @return  bool
327
     */
328
    public function hasError()
329
    {
330
        return $this->error->exists();
331
    }
332
333
    /**
334
     * @return  \Ackintosh\Sniden\Error
335
     */
336
    public function getError()
337
    {
338
        return $this->error;
339
    }
340
341
    public function __destruct()
342
    {
343
        unset($this->taskQueue);
344
        unset($this->resultQueue);
345
    }
346
}
347