Completed
Pull Request — master (#19)
by Akihito
01:39
created

Container::wait()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 2
eloc 2
nc 2
nop 0
1
<?php
2
declare(ticks = 1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\ActiveWorkerSet;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\Result\Formatter as ResultFormatter;
10
use Ackintosh\Snidel\Task\Formatter as TaskFormatter;
11
use Ackintosh\Snidel\Worker;
12
use Bernard\Consumer;
13
use Bernard\Message\PlainMessage;
14
use Bernard\Producer;
15
use Bernard\QueueFactory\PersistentFactory;
16
use Bernard\Router\SimpleRouter;
17
use Bernard\Serializer;
18
use Symfony\Component\EventDispatcher\EventDispatcher;
19
20
class Container
21
{
22
    /** @var Process */
23
    private $master;
24
25
    /** @var \Ackintosh\Snidel\Pcntl */
26
    private $pcntl;
27
28
    /** @var \Ackintosh\Snidel\Error */
29
    private $error;
30
31
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
32
    private $taskQueue;
0 ignored issues
show
Unused Code introduced by
The property $taskQueue is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
33
34
35
    /** @var \Ackintosh\Snidel\Log */
36
    private $log;
37
38
    /** @var array */
39
    private $signals = [
40
        SIGTERM,
41
        SIGINT,
42
    ];
43
44
    /** @var \Ackintosh\Snidel\Config */
45
    private $config;
46
47
    /** @var  int */
48
    private $receivedSignal;
49
50
    private $queuedCount = 0;
51
    private $dequeuedCount = 0;
52
53
    private $factory;
54
    private $producer;
55
    private $consumer;
56
    private $resultQueue;
57
58
    /**
59
     * @param   int     $ownerPid
0 ignored issues
show
Bug introduced by
There is no parameter named $ownerPid. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
60
     */
61
    public function __construct(Config $config, $log)
62
    {
63
        $this->log              = $log;
64
        $this->config           = $config;
65
        $this->pcntl            = new Pcntl();
66
        $this->error            = new Error();
67
68
        $this->factory = new PersistentFactory($this->config->get('driver'), new Serializer());
69
        $this->producer = new Producer($this->factory, new EventDispatcher());
70
71
        $router = new SimpleRouter();
72
        $router->add('Result', $this);
73
        $this->consumer = new Consumer($router, new EventDispatcher());
74
    }
75
76
    /**
77
     * @param   \Ackintosh\Snidel\Task
78
     * @return  void
79
     * @throws  \RuntimeException
80
     */
81
    public function enqueue($task)
82
    {
83
        try {
84
            $this->producer->produce(
85
                new PlainMessage(
86
                    'Task',
87
                    [
88
                        'task' => TaskFormatter::serialize($task),
89
                    ]
90
                )
91
            );
92
            $this->queuedCount++;
93
94
        } catch (\RuntimeException $e) {
95
            throw $e;
96
        }
97
    }
98
99
    /**
100
     * @return  int
101
     */
102
    public function queuedCount()
103
    {
104
        return $this->queuedCount;
105
    }
106
107
    /**
108
     * @return  int
109
     */
110
    public function dequeuedCount()
111
    {
112
        return $this->dequeuedCount;
113
    }
114
115
    /**
116
     * fork master process
117
     *
118
     * @return Process $master
119
     * @throws \RuntimeException
120
     */
121
    public function forkMaster()
122
    {
123
        try {
124
            $this->master = $this->pcntl->fork();
125
        } catch (\RuntimeException $e) {
126
            $message = 'failed to fork master: ' . $e->getMessage();
127
            $this->log->error($message);
128
            throw new \RuntimeException($message);
129
        }
130
131
        $this->log->setMasterPid($this->master->getPid());
132
133
        if (getmypid() === $this->config->get('ownerPid')) {
134
            // owner
135
            $this->log->info('pid: ' . getmypid());
136
            $this->resultQueue  = $this->factory->create('result');
137
138
            return $this->master;
139
        } else {
140
            // master
141
            $activeWorkerSet = new ActiveWorkerSet();
142
            $this->log->info('pid: ' . $this->master->getPid());
143
144
            foreach ($this->signals as $sig) {
145
                $this->pcntl->signal($sig, function ($sig) use ($activeWorkerSet) {
146
                    $this->receivedSignal = $sig;
147
                    $this->log->info('received signal: ' . $sig);
148
149
                    if ($activeWorkerSet->count() === 0) {
150
                        $this->log->info('no worker is active.');
151
                    } else {
152
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
153
                        $activeWorkerSet->terminate($sig);
154
                        $this->log->info('<------ sent signal');
155
                    }
156
                    exit;
157
                });
158
            }
159
160
            $concurrency = (int)$this->config->get('concurrency');
161
            for ($i = 0; $i < $concurrency; $i++) {
162
                $activeWorkerSet->add($this->forkWorker());
163
            }
164
            $status = null;
165
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
166
                if ($workerPid === true || $workerPid === 0) {
167
                    usleep(100000);
168
                    continue;
169
                }
170
                $activeWorkerSet->delete($workerPid);
171
                $activeWorkerSet->add($this->forkWorker());
172
                $status = null;
173
            }
174
            exit;
175
        }
176
    }
177
178
    /**
179
     * fork worker process
180
     *
181
     * @return  \Ackintosh\Snidel\Worker
182
     * @throws  \RuntimeException
183
     */
184
    private function forkWorker()
185
    {
186
        try {
187
            $process = $this->pcntl->fork();
188
        } catch (\RuntimeException $e) {
189
            $message = 'failed to fork worker: ' . $e->getMessage();
190
            $this->log->error($message);
191
            throw new \RuntimeException($message);
192
        }
193
194
        $worker = new Worker($process, $this->config->get('driver'));
195
196
        if (getmypid() === $this->master->getPid()) {
197
            // master
198
            $this->log->info('forked worker. pid: ' . $worker->getPid());
199
            return $worker;
200
        } else {
201
            // worker
202
            // @codeCoverageIgnoreStart
203
            $this->log->info('has forked. pid: ' . getmypid());
204
205
            foreach ($this->signals as $sig) {
206
                $this->pcntl->signal($sig, function ($sig) {
207
                    $this->receivedSignal = $sig;
208
                    exit;
209
                }, false);
210
            }
211
212
            register_shutdown_function(function () use ($worker) {
213
                if ($this->receivedSignal === null && $worker->isInProgress()) {
214
                    $worker->error();
215
                }
216
            });
217
218
            $this->log->info('----> started the function.');
219
            try {
220
                $worker->run();
221
            } catch (\RuntimeException $e) {
222
                $this->log->error($e->getMessage());
223
                exit;
224
            }
225
            $this->log->info('<---- completed the function.');
226
227
            $this->log->info('queued the result and exit.');
228
            exit;
229
            // @codeCoverageIgnoreEnd
230
        }
231
    }
232
233
    /**
234
     * @return  bool
235
     */
236
    public function existsMaster()
237
    {
238
        return $this->master !== null;
239
    }
240
241
    /**
242
     * send signal to master process
243
     *
244
     * @return  void
245
     */
246
    public function sendSignalToMaster($sig = SIGTERM)
247
    {
248
        $this->log->info('----> sending signal to master. signal: ' . $sig);
249
        posix_kill($this->master->getPid(), $sig);
250
        $this->log->info('<---- sent signal.');
251
252
        $this->log->info('----> waiting for master shutdown.');
253
        $status = null;
254
        $this->pcntl->waitpid($this->master->getPid(), $status);
255
        $this->log->info('<---- master shutdown. status: ' . $status);
256
        $this->master = null;
257
    }
258
259
    /**
260
     * @return void
261
     */
262
    public function wait()
263
    {
264
        foreach ($this->results() as $_) {}
0 ignored issues
show
Unused Code introduced by
This foreach statement is empty and can be removed.

This check looks for foreach loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

Consider removing the loop.

Loading history...
265
    }
266
267
    /**
268
     * @return \Generator
269
     */
270
    public function results()
271
    {
272
        for (; $this->queuedCount() > $this->dequeuedCount();) {
273
            for (;;) {
274
                if ($r = $this->resultQueue->dequeue()) {
275
                    $this->dequeuedCount++;
276
                    break;
277
                }
278
            }
279
            $result = ResultFormatter::unserialize(
280
                $r->getMessage()['result']
0 ignored issues
show
Bug introduced by
The variable $r does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
281
            );
282
283
            if ($result->isFailure()) {
284
                $pid = $result->getProcess()->getPid();
285
                $this->error[$pid] = $result;
286
            } else {
287
                yield $result;
288
            }
289
        }
290
    }
291
292
    /**
293
     * @return  bool
294
     */
295
    public function hasError()
296
    {
297
        return $this->error->exists();
298
    }
299
300
    /**
301
     * @return  \Ackintosh\Snidel\Error
302
     */
303
    public function getError()
304
    {
305
        return $this->error;
306
    }
307
308
    public function __destruct()
309
    {
310
        unset($this->resultQueue);
311
    }
312
}
313