Completed
Push — master ( 1fa11a...2abe5b )
by Akihito
02:24
created

Container   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 389
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 13

Importance

Changes 9
Bugs 2 Features 1
Metric Value
wmc 45
c 9
b 2
f 1
lcom 1
cbo 13
dl 0
loc 389
rs 8.3673

20 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 11 1
A enqueue() 0 8 2
A queuedCount() 0 4 1
A dequeue() 0 4 1
A dequeuedCount() 0 4 1
A fork() 0 14 3
B forkMaster() 0 52 7
B forkWorker() 0 48 6
A existsMaster() 0 4 1
A sendSignalToMaster() 0 11 1
A hasTag() 0 10 3
A wait() 0 12 3
B waitForChild() 0 20 5
A getChildPids() 0 4 1
A get() 0 4 1
A getCollection() 0 11 2
A getCollectionWithTag() 0 14 3
A hasError() 0 4 1
A getError() 0 4 1
A __destruct() 0 5 1

How to fix   Complexity   

Complex Class

Complex classes like Container often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Container, and based on these observations, apply Extract Interface, too.

1
<?php
2
namespace Ackintosh\Snidel\Fork;
3
4
use Ackintosh\Snidel\Fork\Fork;
5
use Ackintosh\Snidel\Pcntl;
6
use Ackintosh\Snidel\DataRepository;
7
use Ackintosh\Snidel\Task\Queue as TaskQueue;
8
use Ackintosh\Snidel\Result\Result;
9
use Ackintosh\Snidel\Result\Queue as ResultQueue;
10
use Ackintosh\Snidel\Result\Collection;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
13
use Ackintosh\Snidel\Worker;
14
use Ackintosh\Snidel\ActiveWorkerSet;
15
16
class Container
17
{
18
    /** @var int */
19
    private $ownerPid;
20
21
    /** @var int */
22
    private $masterPid;
23
24
    /** @var int[] */
25
    private $workerPids = array();
0 ignored issues
show
Unused Code introduced by
The property $workerPids is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
26
27
    /** @var \Ackintosh\Snidel\Fork\Fork[] */
28
    private $forks = array();
29
30
    /** @var \Ackintosh\Snidel\Result\Result[] */
31
    private $results = array();
32
33
    /** @var \Ackintosh\Snidel\Pcntl */
34
    private $pcntl;
35
36
    /** @var \Ackintosh\Snidel\DataRepository */
37
    private $dataRepository;
38
39
    /** @var \Ackintosh\Snidel\Error */
40
    private $error;
41
42
    /** @var \Ackintosh\Snidel\Task\Queue */
43
    private $taskQueue;
44
45
    /** @var \Ackintosh\Snidel\Result\Queue */
46
    private $resultQueue;
47
48
    /** @var \Ackintosh\Snidel\Log */
49
    private $log;
50
51
    /** @var array */
52
    private $signals = array(
53
        SIGTERM,
54
        SIGINT,
55
    );
56
57
    /** @var int */
58
    private $concurrency;
59
60
    /**
61
     * @param   int     $ownerPid
62
     */
63
    public function __construct($ownerPid, $log, $concurrency = 5)
64
    {
65
        $this->ownerPid         = $ownerPid;
66
        $this->log              = $log;
67
        $this->concurrency      = $concurrency;
68
        $this->pcntl            = new Pcntl();
69
        $this->dataRepository   = new DataRepository();
70
        $this->taskQueue        = new TaskQueue($this->ownerPid);
71
        $this->resultQueue      = new ResultQueue($this->ownerPid);
72
        $this->error            = new Error();
73
    }
74
75
    /**
76
     * @param   \Ackintosh\Snidel\Task
77
     * @return  void
78
     * @throws  \RuntimeException
79
     */
80
    public function enqueue($task)
81
    {
82
        try {
83
            $this->taskQueue->enqueue($task);
84
        } catch (\RuntimeException $e) {
85
            throw $e;
86
        }
87
    }
88
89
    /**
90
     * @return  int
91
     */
92
    public function queuedCount()
93
    {
94
        return $this->taskQueue->queuedCount();
95
    }
96
97
    /**
98
     * @return  \Ackintosh\Snidel\Fork\Fork
99
     */
100
    private function dequeue()
101
    {
102
        return $this->resultQueue->dequeue();
103
    }
104
105
    /**
106
     * @return  int
107
     */
108
    public function dequeuedCount()
109
    {
110
        return $this->resultQueue->dequeuedCount();
111
    }
112
113
    /**
114
     * fork process
115
     *
116
     * @return  \Ackintosh\Snidel\Fork\Fork
117
     * @throws  \RuntimeException
118
     */
119
    public function fork()
120
    {
121
        $pid = $this->pcntl->fork();
122
        if ($pid === -1) {
123
            throw new \RuntimeException('could not fork a new process');
124
        }
125
126
        $pid = ($pid === 0) ? getmypid() : $pid;
127
128
        $fork = new Fork($pid);
129
        $this->forks[$pid] = $fork;
130
131
        return $fork;
132
    }
133
134
    /**
135
     * fork master process
136
     *
137
     * @return  int     $masterPid
138
     */
139
    public function forkMaster()
140
    {
141
        try {
142
            $fork = $this->fork();
143
        } catch (\RuntimeException $e) {
144
            throw $e;
145
        }
146
147
        $this->masterPid = $fork->getPid();
148
        $this->log->setMasterPid($this->masterPid);
149
150
        if (getmypid() === $this->ownerPid) {
151
            // owner
152
            $this->log->info('pid: ' . getmypid());
153
154
            return $this->masterPid;
155
        } else {
156
            // master
157
            $taskQueue          = new TaskQueue($this->ownerPid);
158
            $activeWorkerSet    = new ActiveWorkerSet();
159
            $this->log->info('pid: ' . $this->masterPid);
160
161
            $log = $this->log;
162
            foreach ($this->signals as $sig) {
163
                $this->pcntl->signal($sig, function ($sig) use ($log, $activeWorkerSet) {
164
                    $log->info('received signal: ' . $sig);
165
166
                    if ($activeWorkerSet->count() === 0) {
167
                        $log->info('no worker is active.');
168
                    } else {
169
                        $log->info('------> sending signal to workers. signal: ' . $sig);
170
                        $activeWorkerSet->terminate($sig);
171
                        $log->info('<------ sent signal');
172
                    }
173
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
174
                });
175
            }
176
177
            while ($task = $taskQueue->dequeue()) {
178
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
179
                if ($activeWorkerSet->count() >= $this->concurrency) {
180
                    $status = null;
181
                    $workerPid = $this->pcntl->waitpid(-1, $status);
182
                    $activeWorkerSet->delete($workerPid);
183
                }
184
                $activeWorkerSet->add(
185
                    $this->forkWorker($task)
186
                );
187
            }
188
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
189
        }
190
    }
191
192
    /**
193
     * fork worker process
194
     *
195
     * @param   \Ackintosh\Snidel\Task
196
     * @return  \Ackintosh\Snidel\Worker
197
     * @throws  \RuntimeException
198
     */
199
    private function forkWorker($task)
200
    {
201
        try {
202
            $fork = $this->fork();
203
        } catch (\RuntimeException $e) {
204
            $this->log->error($e->getMessage());
205
            throw $e;
206
        }
207
208
        $worker = new Worker($fork, $task);
209
210
        if (getmypid() === $this->masterPid) {
211
            // master
212
            $this->log->info('forked worker. pid: ' . $worker->getPid());
213
            return $worker;
214
        } else {
215
            // worker
216
            $this->log->info('has forked. pid: ' . getmypid());
217
            // @codeCoverageIgnoreStart
218
219
            foreach ($this->signals as $sig) {
220
                $this->pcntl->signal($sig, SIG_DFL, true);
221
            }
222
223
            $worker->setResultQueue(new ResultQueue($this->ownerPid));
224
225
            $resultHasQueued = false;
226
            register_shutdown_function(function () use (&$resultHasQueued, $worker) {
227
                if (!$resultHasQueued) {
228
                    $worker->error();
229
                }
230
            });
231
232
            $this->log->info('----> started the function.');
233
            try {
234
                $worker->run();
235
            } catch (\RuntimeException $e) {
236
                $this->log->error($e->getMessage());
237
                exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
238
            }
239
            $this->log->info('<---- completed the function.');
240
241
            $resultHasQueued = true;
0 ignored issues
show
Unused Code introduced by
$resultHasQueued is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
242
            $this->log->info('queued the result and exit.');
243
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
244
            // @codeCoverageIgnoreEnd
245
        }
246
    }
247
248
    /**
249
     * @return  bool
250
     */
251
    public function existsMaster()
252
    {
253
        return $this->masterPid !== null;
254
    }
255
256
    /**
257
     * send signal to master process
258
     *
259
     * @return  void
260
     */
261
    public function sendSignalToMaster($sig = SIGTERM)
262
    {
263
        $this->log->info('----> sending signal to master. signal: ' . $sig);
264
        posix_kill($this->masterPid, $sig);
265
        $this->log->info('<---- sent signal.');
266
267
        $status = null;
268
        $this->pcntl->waitpid($this->masterPid, $status);
269
        $this->log->info('. status: ' . $status);
270
        $this->masterPid = null;
271
    }
272
273
    /**
274
     *
275
     * @param   string  $tag
276
     * @return  bool
277
     */
278
    public function hasTag($tag)
279
    {
280
        foreach ($this->results as $result) {
281
            if ($result->getTask()->getTag() === $tag) {
282
                return true;
283
            }
284
        }
285
286
        return false;
287
    }
288
289
    /**
290
     * @return void
291
     */
292
    public function wait()
293
    {
294
        for (; $this->queuedCount() > $this->dequeuedCount();) {
295
            $result = $this->dequeue();
296
            $pid = $result->getFork()->getPid();
297
            $this->results[$pid] = $result;
298
299
            if ($result->isFailure()) {
300
                $this->error[$pid] = $result;
301
            }
302
        }
303
    }
304
305
    /**
306
     * wait child
307
     *
308
     * @return \Ackintosh\Snidel\Result\Result
309
     */
310
    public function waitForChild()
311
    {
312
        $status = null;
313
        $childPid = $this->pcntl->waitpid(-1, $status);
314
        try {
315
            $result = $this->dataRepository->load($childPid)->readAndDelete();
316
        } catch (SharedMemoryControlException $e) {
317
            throw $e;
318
        }
319
        $fork = $result->getFork();
320
        $fork->setStatus($status);
321
        $result->setFork($fork);
322
323
        if ($result->isFailure() || !$this->pcntl->wifexited($status) || $this->pcntl->wexitstatus($status) !== 0) {
324
            $this->error[$childPid] = $fork;
325
        }
326
        $this->results[$childPid] = $result;
327
328
        return $result;
329
    }
330
331
    /**
332
     * @return  array
333
     */
334
    public function getChildPids()
335
    {
336
        return array_keys($this->forks);
337
    }
338
339
    /**
340
     * return fork
341
     *
342
     * @param   int     $pid
343
     * @return  \Ackintosh\Snidel\Fork\Fork
344
     */
345
    public function get($pid)
346
    {
347
        return $this->results[$pid];
348
    }
349
350
    public function getCollection($tag = null)
351
    {
352
        if ($tag === null) {
353
            $collection = new Collection($this->results);
354
            $this->results = array();
355
356
            return $collection;
357
        }
358
359
        return $this->getCollectionWithTag($tag);
360
    }
361
362
    /**
363
     * return results
364
     *
365
     * @param   string  $tag
366
     * @return  \Ackintosh\Snidel\Result\Collection
367
     */
368
    private function getCollectionWithTag($tag)
369
    {
370
        $results = array();
371
        foreach ($this->results as $r) {
372
            if ($r->getTask()->getTag() !== $tag) {
373
                continue;
374
            }
375
376
            $results[] = $r;
377
            unset($this->results[$r->getFork()->getPid()]);
378
        }
379
380
        return new Collection($results);
381
    }
382
383
    /**
384
     * @return  bool
385
     */
386
    public function hasError()
387
    {
388
        return $this->error->exists();
389
    }
390
391
    /**
392
     * @return  \Ackintosh\Sniden\Error
393
     */
394
    public function getError()
395
    {
396
        return $this->error;
397
    }
398
399
    public function __destruct()
400
    {
401
        unset($this->taskQueue);
402
        unset($this->resultQueue);
403
    }
404
}
405