Completed
Pull Request — master (#16)
by Akihito
03:28
created

Container::getCollection()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 11
rs 9.4285
cc 2
eloc 6
nc 2
nop 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A Container::__destruct() 0 5 1
1
<?php
2
declare(ticks = 1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\ActiveWorkerSet;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\QueueFactory;
10
use Ackintosh\Snidel\Worker;
11
12
class Container
13
{
14
    /** @var int */
15
    private $masterPid;
16
17
    /** @var \Ackintosh\Snidel\Pcntl */
18
    private $pcntl;
19
20
    /** @var \Ackintosh\Snidel\Error */
21
    private $error;
22
23
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
24
    private $taskQueue;
25
26
    /** @var \Ackintosh\Snidel\Result\QueueInterface */
27
    private $resultQueue;
28
29
    /** @var \Ackintosh\Snidel\Log */
30
    private $log;
31
32
    /** @var array */
33
    private $signals = [
34
        SIGTERM,
35
        SIGINT,
36
    ];
37
38
    /** @var \Ackintosh\Snidel\Config */
39
    private $config;
40
41
    /** @var \Ackintosh\Snidel\QueueFactory  */
42
    private $queueFactory;
43
44
    /** @var  int */
45
    private $receivedSignal;
46
47
    /**
48
     * @param   int     $ownerPid
0 ignored issues
show
Bug introduced by
There is no parameter named $ownerPid. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
49
     */
50
    public function __construct(Config $config, $log)
51
    {
52
        $this->log              = $log;
53
        $this->config           = $config;
54
        $this->pcntl            = new Pcntl();
55
        $this->error            = new Error();
56
        $this->queueFactory     = new QueueFactory($config);
57
    }
58
59
    /**
60
     * @param   \Ackintosh\Snidel\Task
61
     * @return  void
62
     * @throws  \RuntimeException
63
     */
64
    public function enqueue($task)
65
    {
66
        try {
67
            $this->taskQueue->enqueue($task);
68
        } catch (\RuntimeException $e) {
69
            throw $e;
70
        }
71
    }
72
73
    /**
74
     * @return  int
75
     */
76
    public function queuedCount()
77
    {
78
        if (is_null($this->taskQueue)) {
79
            return 0;
80
        }
81
82
        return $this->taskQueue->queuedCount();
83
    }
84
85
    /**
86
     * @return  \Ackintosh\Snidel\Result\Result
87
     */
88
    private function dequeue()
89
    {
90
        return $this->resultQueue->dequeue();
91
    }
92
93
    /**
94
     * @return  int
95
     */
96
    public function dequeuedCount()
97
    {
98
        if (is_null($this->resultQueue)) {
99
            return 0;
100
        }
101
102
        return $this->resultQueue->dequeuedCount();
103
    }
104
105
    /**
106
     * fork process
107
     *
108
     * @return  \Ackintosh\Snidel\Fork\Fork
109
     * @throws  \RuntimeException
110
     */
111
    private function fork()
112
    {
113
        $pid = $this->pcntl->fork();
114
        if ($pid === -1) {
115
            throw new \RuntimeException('could not fork a new process');
116
        }
117
118
        $pid = ($pid === 0) ? getmypid() : $pid;
119
120
        return new Fork($pid);
121
    }
122
123
    /**
124
     * fork master process
125
     *
126
     * @return  int     $masterPid
127
     */
128
    public function forkMaster()
129
    {
130
        try {
131
            $fork = $this->fork();
132
        } catch (\RuntimeException $e) {
133
            throw $e;
134
        }
135
136
        $this->masterPid = $fork->getPid();
137
        $this->log->setMasterPid($this->masterPid);
138
139
        if (getmypid() === $this->config->get('ownerPid')) {
140
            // owner
141
            $this->log->info('pid: ' . getmypid());
142
            $this->taskQueue    = $this->queueFactory->createTaskQueue();
143
            $this->resultQueue  = $this->queueFactory->createResultQueue();
144
145
            return $this->masterPid;
146
        } else {
147
            // master
148
            $activeWorkerSet = new ActiveWorkerSet();
149
            $this->log->info('pid: ' . $this->masterPid);
150
151
            $receivedSignal = &$this->receivedSignal;
152
            foreach ($this->signals as $sig) {
153
                $this->pcntl->signal($sig, function ($sig) use ($activeWorkerSet, $receivedSignal) {
154
                    $receivedSignal = $sig;
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $receivedSignal, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
Unused Code introduced by
$receivedSignal is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
155
                    $this->log->info('received signal: ' . $sig);
156
157
                    if ($activeWorkerSet->count() === 0) {
158
                        $this->log->info('no worker is active.');
159
                    } else {
160
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
161
                        $activeWorkerSet->terminate($sig);
162
                        $this->log->info('<------ sent signal');
163
                    }
164
                    exit;
165
                });
166
            }
167
168
            $concurrency = (int)$this->config->get('concurrency');
169
            for ($i = 0; $i < $concurrency; $i++) {
170
                $activeWorkerSet->add($this->forkWorker());
171
            }
172
            $status = null;
173
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
174
                if ($workerPid === true || $workerPid === 0) {
175
                    usleep(100000);
176
                    continue;
177
                }
178
                $activeWorkerSet->delete($workerPid);
179
                $activeWorkerSet->add($this->forkWorker());
180
                $status = null;
181
            }
182
            exit;
183
        }
184
    }
185
186
    /**
187
     * fork worker process
188
     *
189
     * @return  \Ackintosh\Snidel\Worker
190
     * @throws  \RuntimeException
191
     */
192
    private function forkWorker()
193
    {
194
        try {
195
            $fork = $this->fork();
196
        } catch (\RuntimeException $e) {
197
            $this->log->error($e->getMessage());
198
            throw $e;
199
        }
200
201
        $worker = new Worker($fork);
202
203
        if (getmypid() === $this->masterPid) {
204
            // master
205
            $this->log->info('forked worker. pid: ' . $worker->getPid());
206
            return $worker;
207
        } else {
208
            // worker
209
            // @codeCoverageIgnoreStart
210
            $this->log->info('has forked. pid: ' . getmypid());
211
212
            foreach ($this->signals as $sig) {
213
                $this->pcntl->signal($sig, function ($sig) {
214
                    $this->receivedSignal = $sig;
215
                    exit;
216
                }, false);
217
            }
218
219
            $worker->setTaskQueue($this->queueFactory->createTaskQueue());
220
            $worker->setResultQueue($this->queueFactory->createResultQueue());
221
222
            register_shutdown_function(function () use ($worker) {
223
                if ($worker->isFailedToEnqueueResult() && $this->receivedSignal === null) {
224
                    $worker->error();
225
                }
226
            });
227
228
            $this->log->info('----> started the function.');
229
            try {
230
                $worker->run();
231
            } catch (\RuntimeException $e) {
232
                $this->log->error($e->getMessage());
233
                exit;
234
            }
235
            $this->log->info('<---- completed the function.');
236
237
            $this->log->info('queued the result and exit.');
238
            exit;
239
            // @codeCoverageIgnoreEnd
240
        }
241
    }
242
243
    /**
244
     * @return  bool
245
     */
246
    public function existsMaster()
247
    {
248
        return $this->masterPid !== null;
249
    }
250
251
    /**
252
     * send signal to master process
253
     *
254
     * @return  void
255
     */
256
    public function sendSignalToMaster($sig = SIGTERM)
257
    {
258
        $this->log->info('----> sending signal to master. signal: ' . $sig);
259
        posix_kill($this->masterPid, $sig);
260
        $this->log->info('<---- sent signal.');
261
262
        $this->log->info('----> waiting for master shutdown.');
263
        $status = null;
264
        $this->pcntl->waitpid($this->masterPid, $status);
265
        $this->log->info('<---- master shutdown. status: ' . $status);
266
        $this->masterPid = null;
267
    }
268
269
    /**
270
     * @return void
271
     */
272
    public function wait()
273
    {
274
        for (; $this->queuedCount() > $this->dequeuedCount();) {
275
            $result = $this->dequeue();
276
            if ($result->isFailure()) {
277
                $this->error[$result->getFork()->getPid()] = $result;
278
            }
279
        }
280
    }
281
282
    /**
283
     * @return \Generator
284
     */
285
    public function results()
286
    {
287
        for (; $this->queuedCount() > $this->dequeuedCount();) {
288
            $result = $this->dequeue();
289
290
            if ($result->isFailure()) {
291
                $pid = $result->getFork()->getPid();
292
                $this->error[$pid] = $result;
293
            } else {
294
                yield $result;
295
            }
296
        }
297
    }
298
299
    /**
300
     * @return  bool
301
     */
302
    public function hasError()
303
    {
304
        return $this->error->exists();
305
    }
306
307
    /**
308
     * @return  \Ackintosh\Snidel\Error
309
     */
310
    public function getError()
311
    {
312
        return $this->error;
313
    }
314
315
    public function __destruct()
316
    {
317
        unset($this->taskQueue);
318
        unset($this->resultQueue);
319
    }
320
}
321