Completed
Push — master ( 52239f...2586be )
by Akihito
02:50
created

Container::hasError()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
namespace Ackintosh\Snidel\Fork;
3
4
use Ackintosh\Snidel\Fork\Fork;
5
use Ackintosh\Snidel\Pcntl;
6
use Ackintosh\Snidel\DataRepository;
7
use Ackintosh\Snidel\Task\Queue as TaskQueue;
8
use Ackintosh\Snidel\Result\Result;
9
use Ackintosh\Snidel\Result\Queue as ResultQueue;
10
use Ackintosh\Snidel\Result\Collection;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
13
use Ackintosh\Snidel\Worker;
14
use Ackintosh\Snidel\ActiveWorkerSet;
15
16
class Container
17
{
18
    /** @var int */
19
    private $ownerPid;
20
21
    /** @var int */
22
    private $masterPid;
23
24
    /** @var \Ackintosh\Snidel\Result\Result[] */
25
    private $results = array();
26
27
    /** @var \Ackintosh\Snidel\Pcntl */
28
    private $pcntl;
29
30
    /** @var \Ackintosh\Snidel\DataRepository */
31
    private $dataRepository;
32
33
    /** @var \Ackintosh\Snidel\Error */
34
    private $error;
35
36
    /** @var \Ackintosh\Snidel\Task\Queue */
37
    private $taskQueue;
38
39
    /** @var \Ackintosh\Snidel\Result\Queue */
40
    private $resultQueue;
41
42
    /** @var \Ackintosh\Snidel\Log */
43
    private $log;
44
45
    /** @var array */
46
    private $signals = array(
47
        SIGTERM,
48
        SIGINT,
49
    );
50
51
    /** @var int */
52
    private $concurrency;
53
54
    /**
55
     * @param   int     $ownerPid
56
     */
57
    public function __construct($ownerPid, $log, $concurrency = 5)
58
    {
59
        $this->ownerPid         = $ownerPid;
60
        $this->log              = $log;
61
        $this->concurrency      = $concurrency;
62
        $this->pcntl            = new Pcntl();
63
        $this->dataRepository   = new DataRepository();
64
        $this->taskQueue        = new TaskQueue($this->ownerPid);
65
        $this->resultQueue      = new ResultQueue($this->ownerPid);
66
        $this->error            = new Error();
67
    }
68
69
    /**
70
     * @param   \Ackintosh\Snidel\Task
71
     * @return  void
72
     * @throws  \RuntimeException
73
     */
74
    public function enqueue($task)
75
    {
76
        try {
77
            $this->taskQueue->enqueue($task);
78
        } catch (\RuntimeException $e) {
79
            throw $e;
80
        }
81
    }
82
83
    /**
84
     * @return  int
85
     */
86
    public function queuedCount()
87
    {
88
        return $this->taskQueue->queuedCount();
89
    }
90
91
    /**
92
     * @return  \Ackintosh\Snidel\Fork\Fork
93
     */
94
    private function dequeue()
95
    {
96
        return $this->resultQueue->dequeue();
97
    }
98
99
    /**
100
     * @return  int
101
     */
102
    public function dequeuedCount()
103
    {
104
        return $this->resultQueue->dequeuedCount();
105
    }
106
107
    /**
108
     * fork process
109
     *
110
     * @return  \Ackintosh\Snidel\Fork\Fork
111
     * @throws  \RuntimeException
112
     */
113
    private function fork()
114
    {
115
        $pid = $this->pcntl->fork();
116
        if ($pid === -1) {
117
            throw new \RuntimeException('could not fork a new process');
118
        }
119
120
        $pid = ($pid === 0) ? getmypid() : $pid;
121
122
        return new Fork($pid);
123
    }
124
125
    /**
126
     * fork master process
127
     *
128
     * @return  int     $masterPid
129
     */
130
    public function forkMaster()
131
    {
132
        try {
133
            $fork = $this->fork();
134
        } catch (\RuntimeException $e) {
135
            throw $e;
136
        }
137
138
        $this->masterPid = $fork->getPid();
139
        $this->log->setMasterPid($this->masterPid);
140
141
        if (getmypid() === $this->ownerPid) {
142
            // owner
143
            $this->log->info('pid: ' . getmypid());
144
145
            return $this->masterPid;
146
        } else {
147
            // master
148
            $taskQueue          = new TaskQueue($this->ownerPid);
149
            $activeWorkerSet    = new ActiveWorkerSet();
150
            $this->log->info('pid: ' . $this->masterPid);
151
152
            $log = $this->log;
153
            foreach ($this->signals as $sig) {
154
                $this->pcntl->signal($sig, function ($sig) use ($log, $activeWorkerSet) {
155
                    $log->info('received signal: ' . $sig);
156
157
                    if ($activeWorkerSet->count() === 0) {
158
                        $log->info('no worker is active.');
159
                    } else {
160
                        $log->info('------> sending signal to workers. signal: ' . $sig);
161
                        $activeWorkerSet->terminate($sig);
162
                        $log->info('<------ sent signal');
163
                    }
164
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
165
                });
166
            }
167
168
            while ($task = $taskQueue->dequeue()) {
169
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
170
                if ($activeWorkerSet->count() >= $this->concurrency) {
171
                    $status = null;
172
                    $workerPid = $this->pcntl->waitpid(-1, $status);
173
                    $activeWorkerSet->delete($workerPid);
174
                }
175
                $activeWorkerSet->add(
176
                    $this->forkWorker($task)
177
                );
178
            }
179
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
180
        }
181
    }
182
183
    /**
184
     * fork worker process
185
     *
186
     * @param   \Ackintosh\Snidel\Task
187
     * @return  \Ackintosh\Snidel\Worker
188
     * @throws  \RuntimeException
189
     */
190
    private function forkWorker($task)
191
    {
192
        try {
193
            $fork = $this->fork();
194
        } catch (\RuntimeException $e) {
195
            $this->log->error($e->getMessage());
196
            throw $e;
197
        }
198
199
        $worker = new Worker($fork, $task);
200
201
        if (getmypid() === $this->masterPid) {
202
            // master
203
            $this->log->info('forked worker. pid: ' . $worker->getPid());
204
            return $worker;
205
        } else {
206
            // worker
207
            $this->log->info('has forked. pid: ' . getmypid());
208
            // @codeCoverageIgnoreStart
209
210
            foreach ($this->signals as $sig) {
211
                $this->pcntl->signal($sig, SIG_DFL, true);
212
            }
213
214
            $worker->setResultQueue(new ResultQueue($this->ownerPid));
215
216
            $resultHasQueued = false;
217
            register_shutdown_function(function () use (&$resultHasQueued, $worker) {
218
                if (!$resultHasQueued) {
219
                    $worker->error();
220
                }
221
            });
222
223
            $this->log->info('----> started the function.');
224
            try {
225
                $worker->run();
226
            } catch (\RuntimeException $e) {
227
                $this->log->error($e->getMessage());
228
                exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
229
            }
230
            $this->log->info('<---- completed the function.');
231
232
            $resultHasQueued = true;
0 ignored issues
show
Unused Code introduced by
$resultHasQueued is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
233
            $this->log->info('queued the result and exit.');
234
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
235
            // @codeCoverageIgnoreEnd
236
        }
237
    }
238
239
    /**
240
     * @return  bool
241
     */
242
    public function existsMaster()
243
    {
244
        return $this->masterPid !== null;
245
    }
246
247
    /**
248
     * send signal to master process
249
     *
250
     * @return  void
251
     */
252
    public function sendSignalToMaster($sig = SIGTERM)
253
    {
254
        $this->log->info('----> sending signal to master. signal: ' . $sig);
255
        posix_kill($this->masterPid, $sig);
256
        $this->log->info('<---- sent signal.');
257
258
        $status = null;
259
        $this->pcntl->waitpid($this->masterPid, $status);
260
        $this->log->info('. status: ' . $status);
261
        $this->masterPid = null;
262
    }
263
264
    /**
265
     *
266
     * @param   string  $tag
267
     * @return  bool
268
     */
269
    public function hasTag($tag)
270
    {
271
        foreach ($this->results as $result) {
272
            if ($result->getTask()->getTag() === $tag) {
273
                return true;
274
            }
275
        }
276
277
        return false;
278
    }
279
280
    /**
281
     * @return void
282
     */
283
    public function wait()
284
    {
285
        for (; $this->queuedCount() > $this->dequeuedCount();) {
286
            $result = $this->dequeue();
287
            $pid = $result->getFork()->getPid();
288
            $this->results[$pid] = $result;
289
290
            if ($result->isFailure()) {
291
                $this->error[$pid] = $result;
292
            }
293
        }
294
    }
295
296
    public function getCollection($tag = null)
297
    {
298
        if ($tag === null) {
299
            $collection = new Collection($this->results);
300
            $this->results = array();
301
302
            return $collection;
303
        }
304
305
        return $this->getCollectionWithTag($tag);
306
    }
307
308
    /**
309
     * return results
310
     *
311
     * @param   string  $tag
312
     * @return  \Ackintosh\Snidel\Result\Collection
313
     */
314
    private function getCollectionWithTag($tag)
315
    {
316
        $results = array();
317
        foreach ($this->results as $r) {
318
            if ($r->getTask()->getTag() !== $tag) {
319
                continue;
320
            }
321
322
            $results[] = $r;
323
            unset($this->results[$r->getFork()->getPid()]);
324
        }
325
326
        return new Collection($results);
327
    }
328
329
    /**
330
     * @return  bool
331
     */
332
    public function hasError()
333
    {
334
        return $this->error->exists();
335
    }
336
337
    /**
338
     * @return  \Ackintosh\Sniden\Error
339
     */
340
    public function getError()
341
    {
342
        return $this->error;
343
    }
344
345
    public function __destruct()
346
    {
347
        unset($this->taskQueue);
348
        unset($this->resultQueue);
349
    }
350
}
351