Completed
Pull Request — master (#16)
by Akihito
03:10
created

Container::existsMaster()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
declare(ticks = 1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\ActiveWorkerSet;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\QueueFactory;
10
use Ackintosh\Snidel\Result\Collection;
11
use Ackintosh\Snidel\Worker;
12
13
class Container
14
{
15
    /** @var int */
16
    private $ownerPid;
17
18
    /** @var int */
19
    private $masterPid;
20
21
    /** @var \Ackintosh\Snidel\Result\Result[] */
22
    private $results = [];
23
24
    /** @var \Ackintosh\Snidel\Pcntl */
25
    private $pcntl;
26
27
    /** @var \Ackintosh\Snidel\Error */
28
    private $error;
29
30
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
31
    private $taskQueue;
32
33
    /** @var \Ackintosh\Snidel\Result\QueueInterface */
34
    private $resultQueue;
35
36
    /** @var \Ackintosh\Snidel\Log */
37
    private $log;
38
39
    /** @var array */
40
    private $signals = [
41
        SIGTERM,
42
        SIGINT,
43
    ];
44
45
    /** @var \Ackintosh\Snidel\Config */
46
    private $config;
47
48
    /** @var \Ackintosh\Snidel\QueueFactory  */
49
    private $queueFactory;
50
51
    /** @var  int */
52
    private $receivedSignal;
53
54
    /**
55
     * @param   int     $ownerPid
56
     */
57
    public function __construct($ownerPid, $log, Config $config)
58
    {
59
        $this->ownerPid         = $ownerPid;
60
        $this->log              = $log;
61
        $this->config           = $config;
62
        $this->pcntl            = new Pcntl();
63
        $this->error            = new Error();
64
        $this->queueFactory     = new QueueFactory($config);
65
    }
66
67
    /**
68
     * @param   \Ackintosh\Snidel\Task
69
     * @return  void
70
     * @throws  \RuntimeException
71
     */
72
    public function enqueue($task)
73
    {
74
        try {
75
            $this->taskQueue->enqueue($task);
76
        } catch (\RuntimeException $e) {
77
            throw $e;
78
        }
79
    }
80
81
    /**
82
     * @return  int
83
     */
84
    public function queuedCount()
85
    {
86
        if (is_null($this->taskQueue)) {
87
            return 0;
88
        }
89
90
        return $this->taskQueue->queuedCount();
91
    }
92
93
    /**
94
     * @return  \Ackintosh\Snidel\Result\Result
95
     */
96
    private function dequeue()
97
    {
98
        return $this->resultQueue->dequeue();
99
    }
100
101
    /**
102
     * @return  int
103
     */
104
    public function dequeuedCount()
105
    {
106
        if (is_null($this->resultQueue)) {
107
            return 0;
108
        }
109
110
        return $this->resultQueue->dequeuedCount();
111
    }
112
113
    /**
114
     * fork process
115
     *
116
     * @return  \Ackintosh\Snidel\Fork\Fork
117
     * @throws  \RuntimeException
118
     */
119
    private function fork()
120
    {
121
        $pid = $this->pcntl->fork();
122
        if ($pid === -1) {
123
            throw new \RuntimeException('could not fork a new process');
124
        }
125
126
        $pid = ($pid === 0) ? getmypid() : $pid;
127
128
        return new Fork($pid);
129
    }
130
131
    /**
132
     * fork master process
133
     *
134
     * @return  int     $masterPid
135
     */
136
    public function forkMaster()
137
    {
138
        try {
139
            $fork = $this->fork();
140
        } catch (\RuntimeException $e) {
141
            throw $e;
142
        }
143
144
        $this->masterPid = $fork->getPid();
145
        $this->log->setMasterPid($this->masterPid);
146
147
        if (getmypid() === $this->ownerPid) {
148
            // owner
149
            $this->log->info('pid: ' . getmypid());
150
            $this->taskQueue    = $this->queueFactory->createTaskQueue();
151
            $this->resultQueue  = $this->queueFactory->createResultQueue();
152
153
            return $this->masterPid;
154
        } else {
155
            // master
156
            $activeWorkerSet = new ActiveWorkerSet();
157
            $this->log->info('pid: ' . $this->masterPid);
158
159
            $receivedSignal = &$this->receivedSignal;
160
            foreach ($this->signals as $sig) {
161
                $this->pcntl->signal($sig, function ($sig) use ($activeWorkerSet, $receivedSignal) {
162
                    $receivedSignal = $sig;
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $receivedSignal, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
Unused Code introduced by
$receivedSignal is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
163
                    $this->log->info('received signal: ' . $sig);
164
165
                    if ($activeWorkerSet->count() === 0) {
166
                        $this->log->info('no worker is active.');
167
                    } else {
168
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
169
                        $activeWorkerSet->terminate($sig);
170
                        $this->log->info('<------ sent signal');
171
                    }
172
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
173
                });
174
            }
175
176
            $concurrency = (int)$this->config->get('concurrency');
177
            for ($i = 0; $i < $concurrency; $i++) {
178
                $activeWorkerSet->add($this->forkWorker());
179
            }
180
            $status = null;
181
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
182
                if ($workerPid === true || $workerPid === 0) {
183
                    usleep(100000);
184
                    continue;
185
                }
186
                $activeWorkerSet->delete($workerPid);
187
                $activeWorkerSet->add($this->forkWorker());
188
                $status = null;
189
            }
190
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkMaster() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
191
        }
192
    }
193
194
    /**
195
     * fork worker process
196
     *
197
     * @return  \Ackintosh\Snidel\Worker
198
     * @throws  \RuntimeException
199
     */
200
    private function forkWorker()
201
    {
202
        try {
203
            $fork = $this->fork();
204
        } catch (\RuntimeException $e) {
205
            $this->log->error($e->getMessage());
206
            throw $e;
207
        }
208
209
        $worker = new Worker($fork);
210
211
        if (getmypid() === $this->masterPid) {
212
            // master
213
            $this->log->info('forked worker. pid: ' . $worker->getPid());
214
            return $worker;
215
        } else {
216
            // worker
217
            // @codeCoverageIgnoreStart
218
            $this->log->info('has forked. pid: ' . getmypid());
219
220
            // for php5.3
221
            $receivedSignal = &$this->receivedSignal;
222
            foreach ($this->signals as $sig) {
223
                $this->pcntl->signal($sig, function ($sig) use ($receivedSignal) {
224
                    $receivedSignal = $sig;
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $receivedSignal, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
Unused Code introduced by
$receivedSignal is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
225
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
226
                }, false);
227
            }
228
229
            $worker->setTaskQueue($this->queueFactory->createTaskQueue());
230
            $worker->setResultQueue($this->queueFactory->createResultQueue());
231
232
            register_shutdown_function(function () use ($worker, $receivedSignal) {
233
                if ($worker->isFailedToEnqueueResult() && $receivedSignal === null) {
234
                    $worker->error();
235
                }
236
            });
237
238
            $this->log->info('----> started the function.');
239
            try {
240
                $worker->run();
241
            } catch (\RuntimeException $e) {
242
                $this->log->error($e->getMessage());
243
                exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
244
            }
245
            $this->log->info('<---- completed the function.');
246
247
            $this->log->info('queued the result and exit.');
248
            exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method forkWorker() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
249
            // @codeCoverageIgnoreEnd
250
        }
251
    }
252
253
    /**
254
     * @return  bool
255
     */
256
    public function existsMaster()
257
    {
258
        return $this->masterPid !== null;
259
    }
260
261
    /**
262
     * send signal to master process
263
     *
264
     * @return  void
265
     */
266
    public function sendSignalToMaster($sig = SIGTERM)
267
    {
268
        $this->log->info('----> sending signal to master. signal: ' . $sig);
269
        posix_kill($this->masterPid, $sig);
270
        $this->log->info('<---- sent signal.');
271
272
        $this->log->info('----> waiting for master shutdown.');
273
        $status = null;
274
        $this->pcntl->waitpid($this->masterPid, $status);
275
        $this->log->info('<---- master shutdown. status: ' . $status);
276
        $this->masterPid = null;
277
    }
278
279
    /**
280
     *
281
     * @param   string  $tag
282
     * @return  bool
283
     */
284
    public function hasTag($tag)
285
    {
286
        foreach ($this->results as $result) {
287
            if ($result->getTask()->getTag() === $tag) {
288
                return true;
289
            }
290
        }
291
292
        return false;
293
    }
294
295
    /**
296
     * @return void
297
     */
298
    public function wait()
299
    {
300
        for (; $this->queuedCount() > $this->dequeuedCount();) {
301
            $result = $this->dequeue();
302
            $pid = $result->getFork()->getPid();
303
            $this->results[$pid] = $result;
304
305
            if ($result->isFailure()) {
306
                $this->error[$pid] = $result;
307
            }
308
        }
309
    }
310
311
    public function getCollection($tag = null)
312
    {
313
        if ($tag === null) {
314
            $collection = new Collection($this->results);
315
            $this->results = [];
316
317
            return $collection;
318
        }
319
320
        return $this->getCollectionWithTag($tag);
321
    }
322
323
    /**
324
     * return results
325
     *
326
     * @param   string  $tag
327
     * @return  \Ackintosh\Snidel\Result\Collection
328
     */
329
    private function getCollectionWithTag($tag)
330
    {
331
        $results = [];
332
        foreach ($this->results as $r) {
333
            if ($r->getTask()->getTag() !== $tag) {
334
                continue;
335
            }
336
337
            $results[] = $r;
338
            unset($this->results[$r->getFork()->getPid()]);
339
        }
340
341
        return new Collection($results);
342
    }
343
344
    /**
345
     * @return  bool
346
     */
347
    public function hasError()
348
    {
349
        return $this->error->exists();
350
    }
351
352
    /**
353
     * @return  \Ackintosh\Snidel\Error
354
     */
355
    public function getError()
356
    {
357
        return $this->error;
358
    }
359
360
    public function __destruct()
361
    {
362
        unset($this->taskQueue);
363
        unset($this->resultQueue);
364
    }
365
}
366