Completed
Push — master ( 8a1f66...f0441f )
by Akihito
02:26
created

ForkContainer::killMaster()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
namespace Ackintosh\Snidel;
3
4
use Ackintosh\Snidel\Fork;
5
use Ackintosh\Snidel\ForkCollection;
6
use Ackintosh\Snidel\Pcntl;
7
use Ackintosh\Snidel\DataRepository;
8
use Ackintosh\Snidel\TaskQueue;
9
use Ackintosh\Snidel\ResultQueue;
10
use Ackintosh\Snidel\Error;
11
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
12
13
class ForkContainer
14
{
15
    /** @var int */
16
    private $ownerPid;
17
18
    /** @var int */
19
    private $masterPid;
20
21
    /** @var \Ackintosh\Snidel\Fork[] */
22
    private $forks = array();
23
24
    /** @var \Ackintosh\Snidel\Pcntl */
25
    private $pcntl;
26
27
    /** @var \Ackintosh\Snidel\DataRepository */
28
    private $dataRepository;
29
30
    /** @var \Ackintosh\Snidel\Error */
31
    private $error;
32
33
    /** @var \Ackintosh\Snidel\TaskQueue */
34
    private $taskQueue;
35
36
    /** @var \Ackintosh\Snidel\ResultQueue */
37
    private $resultQueue;
38
39
    /** @var \Ackintosh\Snidel\Log */
40
    private $log;
41
42
    /** @var array */
43
    private $signals = array(
44
        SIGTERM,
45
        SIGINT,
46
    );
47
48
    /** @var int */
49
    private $concurrency;
50
51
    /**
52
     * @param   int     $ownerPid
53
     */
54
    public function __construct($ownerPid, $log, $concurrency = 5)
55
    {
56
        $this->ownerPid         = $ownerPid;
57
        $this->log              = $log;
58
        $this->concurrency      = $concurrency;
59
        $this->pcntl            = new Pcntl();
60
        $this->dataRepository   = new DataRepository();
61
        $this->taskQueue        = new TaskQueue($this->ownerPid);
62
        $this->resultQueue      = new ResultQueue($this->ownerPid);
63
        $this->error            = new Error();
64
    }
65
66
    /**
67
     * @param   \Ackintosh\Snidel\Task
68
     * @return  void
69
     * @throws  \RuntimeException
70
     */
71
    public function enqueue($task)
72
    {
73
        try {
74
            $this->taskQueue->enqueue($task);
75
        } catch (\RuntimeException $e) {
76
            throw $e;
77
        }
78
    }
79
80
    /**
81
     * @return  int
82
     */
83
    public function queuedCount()
84
    {
85
        return $this->taskQueue->queuedCount();
86
    }
87
88
    /**
89
     * @return  \Ackintosh\Snidel\Fork
90
     */
91
    private function dequeue()
92
    {
93
        return $this->resultQueue->dequeue();
94
    }
95
96
    /**
97
     * @return  int
98
     */
99
    public function dequeuedCount()
100
    {
101
        return $this->resultQueue->dequeuedCount();
102
    }
103
104
    /**
105
     * fork process
106
     *
107
     * @param   \Ackintosh\Snidel\Task
108
     * @return  \Ackintosh\Snidel\Fork
109
     * @throws  \RuntimeException
110
     */
111
    public function fork($task)
112
    {
113
        $pid = $this->pcntl->fork();
114
        if ($pid === -1) {
115
            throw new \RuntimeException('could not fork a new process');
116
        }
117
118
        $pid = ($pid === 0) ? getmypid() : $pid;
119
120
        $fork = new Fork($pid, $task);
121
        $this->forks[$pid] = $fork;
122
123
        return $fork;
124
    }
125
126
    /**
127
     * fork master process
128
     *
129
     * @return  int     $masterPid
130
     */
131
    public function forkMaster()
132
    {
133
        $pid = $this->pcntl->fork();
134
        $this->masterPid = ($pid === 0) ? getmypid() : $pid;
135
        $this->log->setMasterPid($this->masterPid);
136
137
        if ($pid) {
138
            // owner
139
            $this->log->info('pid: ' . getmypid());
140
141
            return $this->masterPid;
142
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
143
            // error
144
        } else {
145
            // master
146
            $taskQueue = new TaskQueue($this->ownerPid);
147
            $this->log->info('pid: ' . $this->masterPid);
148
149
            foreach ($this->signals as $sig) {
150
                $this->pcntl->signal($sig, SIG_DFL, true);
151
            }
152
            $workerCount = 0;
153
154
            while ($task = $taskQueue->dequeue()) {
155
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
156
                if ($workerCount >= $this->concurrency) {
157
                    $status = null;
158
                    $this->pcntl->waitpid(-1, $status);
159
                    $workerCount--;
160
                }
161
                $this->forkWorker($task);
162
                $workerCount++;
163
            }
164
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
165
        }
166
    }
167
168
    /**
169
     * fork worker process
170
     *
171
     * @param   \Ackintosh\Snidel\Task
172
     * @return  void
173
     * @throws  \RuntimeException
174
     */
175
    private function forkWorker($task)
176
    {
177
        try {
178
            $fork = $this->fork($task);
179
        } catch (\RuntimeException $e) {
180
            $this->log->error($e->getMessage());
181
            throw $e;
182
        }
183
184
        if (getmypid() === $this->masterPid) {
185
            // master
186
            $this->log->info('forked worker. pid: ' . $fork->getPid());
187
        } else {
188
            // worker
189
            $this->log->info('has forked. pid: ' . getmypid());
190
            // @codeCoverageIgnoreStart
191
192
            foreach ($this->signals as $sig) {
193
                $this->pcntl->signal($sig, SIG_DFL, true);
194
            }
195
196
            $resultQueue = new ResultQueue($this->ownerPid);
197
            register_shutdown_function(function () use ($fork, $resultQueue) {
198
                if ($fork->hasNoResult() || !$fork->isQueued()) {
199
                    $result = new Result();
200
                    $result->setFailure();
201
                    $fork->setResult($result);
202
                    $resultQueue->enqueue($fork);
203
                }
204
            });
205
206
            $this->log->info('----> started the function.');
207
            $fork->executeTask();
208
            $this->log->info('<---- completed the function.');
209
210
            try {
211
                $resultQueue->enqueue($fork);
212
            } catch (\RuntimeException $e) {
213
                $this->log->error($e->getMessage());
214
                $result = new Result();
215
                $result->setFailure();
216
                $fork->setResult($result);
217
                $resultQueue->enqueue($fork);
218
            }
219
            $fork->setQueued();
220
            $this->log->info('queued the result and exit.');
221
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
222
            // @codeCoverageIgnoreEnd
223
        }
224
    }
225
226
    /**
227
     * @return  bool
228
     */
229
    public function existsMaster()
230
    {
231
        return $this->masterPid !== null;
232
    }
233
234
    /**
235
     * kill master process
236
     *
237
     * @return  void
238
     */
239
    public function killMaster()
240
    {
241
        posix_kill($this->masterPid, SIGTERM);
242
    }
243
244
    /**
245
     *
246
     * @param   string  $tag
247
     * @return  bool
248
     */
249
    public function hasTag($tag)
250
    {
251
        foreach ($this->forks as $fork) {
252
            if ($fork->getTag() === $tag) {
253
                return true;
254
            }
255
        }
256
257
        return false;
258
    }
259
260
    /**
261
     * @return void
262
     */
263
    public function wait()
264
    {
265
        for (; $this->queuedCount() > $this->dequeuedCount();) {
266
            $fork = $this->dequeue();
267
            $this->forks[$fork->getPid()] = $fork;
268
269
            if ($fork->getResult()->isFailure()) {
270
                $this->error[$fork->getPid()] = $fork;
271
            }
272
        }
273
    }
274
275
    /**
276
     * wait child
277
     *
278
     * @return \Ackintosh\Snidel\Fork
279
     */
280
    public function waitForChild()
281
    {
282
        $status = null;
283
        $childPid = $this->pcntl->waitpid(-1, $status);
284
        try {
285
            $fork = $this->dataRepository->load($childPid)->readAndDelete();
286
        } catch (SharedMemoryControlException $e) {
287
            throw $e;
288
        }
289
        $fork->setStatus($status);
290
291
        if ($fork->hasNotFinishedSuccessfully()) {
292
            $this->error[$childPid] = $fork;
293
        }
294
295
        $this->forks[$childPid] = $fork;
296
        return $fork;
297
    }
298
299
    /**
300
     * @return  array
301
     */
302
    public function getChildPids()
303
    {
304
        return array_keys($this->forks);
305
    }
306
307
    /**
308
     * return fork
309
     *
310
     * @param   int     $pid
311
     * @return  \Ackintosh\Snidel\Fork
312
     */
313
    public function get($pid)
314
    {
315
        return $this->forks[$pid];
316
    }
317
318
    public function getCollection($tag = null)
319
    {
320
        if ($tag === null) {
321
            $collection = new ForkCollection($this->forks);
322
            $this->forks = array();
323
324
            return $collection;
325
        }
326
327
        return $this->getCollectionWithTag($tag);
328
    }
329
330
    /**
331
     * return forks
332
     *
333
     * @param   string  $tag
334
     * @return  \Ackintosh\Snidel\Fork[]
335
     */
336
    private function getCollectionWithTag($tag)
337
    {
338
        $forks = array();
339
        foreach ($this->forks as $f) {
340
            if ($f->getTag() !== $tag) {
341
                continue;
342
            }
343
344
            $forks[] = $f;
345
            unset($this->forks[$f->getPid()]);
346
        }
347
348
        return new ForkCollection($forks);
349
    }
350
351
    /**
352
     * @return  bool
353
     */
354
    public function hasError()
355
    {
356
        return $this->error->exists();
357
    }
358
359
    /**
360
     * @return  \Ackintosh\Sniden\Error
361
     */
362
    public function getError()
363
    {
364
        return $this->error;
365
    }
366
367
    public function __destruct()
368
    {
369
        unset($this->taskQueue);
370
        unset($this->resultQueue);
371
    }
372
}
373