Completed
Pull Request — master (#19)
by Akihito
01:28
created

Container::wait()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 2
eloc 2
nc 2
nop 0
1
<?php
2
declare(ticks = 1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\ActiveWorkerSet;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\Result\Formatter as ResultFormatter;
10
use Ackintosh\Snidel\Task\Formatter as TaskFormatter;
11
use Ackintosh\Snidel\Worker;
12
use Bernard\Consumer;
13
use Bernard\Message\PlainMessage;
14
use Bernard\Producer;
15
use Bernard\QueueFactory\PersistentFactory;
16
use Bernard\Router\SimpleRouter;
17
use Bernard\Serializer;
18
use Symfony\Component\EventDispatcher\EventDispatcher;
19
20
class Container
21
{
22
    /** @var Process */
23
    private $master;
24
25
    /** @var \Ackintosh\Snidel\Pcntl */
26
    private $pcntl;
27
28
    /** @var \Ackintosh\Snidel\Error */
29
    private $error;
30
31
    /** @var \Ackintosh\Snidel\Log */
32
    private $log;
33
34
    /** @var array */
35
    private $signals = [
36
        SIGTERM,
37
        SIGINT,
38
    ];
39
40
    /** @var \Ackintosh\Snidel\Config */
41
    private $config;
42
43
    /** @var  int */
44
    private $receivedSignal;
45
46
    private $queuedCount = 0;
47
    private $dequeuedCount = 0;
48
49
    private $factory;
50
    private $producer;
51
    private $consumer;
52
    private $resultQueue;
53
54
    /**
55
     * @param   int     $ownerPid
0 ignored issues
show
Bug introduced by
There is no parameter named $ownerPid. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
56
     */
57
    public function __construct(Config $config, $log)
58
    {
59
        $this->log              = $log;
60
        $this->config           = $config;
61
        $this->pcntl            = new Pcntl();
62
        $this->error            = new Error();
63
64
        $this->factory = new PersistentFactory($this->config->get('driver'), new Serializer());
65
        $this->producer = new Producer($this->factory, new EventDispatcher());
66
67
        $router = new SimpleRouter();
68
        $router->add('Result', $this);
69
        $this->consumer = new Consumer($router, new EventDispatcher());
70
    }
71
72
    /**
73
     * @param   \Ackintosh\Snidel\Task
74
     * @return  void
75
     * @throws  \RuntimeException
76
     */
77
    public function enqueue($task)
78
    {
79
        try {
80
            $this->producer->produce(
81
                new PlainMessage(
82
                    'Task',
83
                    [
84
                        'task' => TaskFormatter::serialize($task),
85
                    ]
86
                )
87
            );
88
            $this->queuedCount++;
89
90
        } catch (\RuntimeException $e) {
91
            throw $e;
92
        }
93
    }
94
95
    /**
96
     * @return  int
97
     */
98
    public function queuedCount()
99
    {
100
        return $this->queuedCount;
101
    }
102
103
    /**
104
     * @return  int
105
     */
106
    public function dequeuedCount()
107
    {
108
        return $this->dequeuedCount;
109
    }
110
111
    /**
112
     * fork master process
113
     *
114
     * @return Process $master
115
     * @throws \RuntimeException
116
     */
117
    public function forkMaster()
118
    {
119
        try {
120
            $this->master = $this->pcntl->fork();
121
        } catch (\RuntimeException $e) {
122
            $message = 'failed to fork master: ' . $e->getMessage();
123
            $this->log->error($message);
124
            throw new \RuntimeException($message);
125
        }
126
127
        $this->log->setMasterPid($this->master->getPid());
128
129
        if (getmypid() === $this->config->get('ownerPid')) {
130
            // owner
131
            $this->log->info('pid: ' . getmypid());
132
            $this->resultQueue  = $this->factory->create('result');
133
134
            return $this->master;
135
        } else {
136
            // @codeCoverageIgnoreStart
137
            // covered by SnidelTest via master process
138
            // master
139
            $activeWorkerSet = new ActiveWorkerSet();
140
            $this->log->info('pid: ' . $this->master->getPid());
141
142
            foreach ($this->signals as $sig) {
143
                $this->pcntl->signal($sig, function ($sig) use ($activeWorkerSet) {
144
                    $this->receivedSignal = $sig;
145
                    $this->log->info('received signal: ' . $sig);
146
147
                    if ($activeWorkerSet->count() === 0) {
148
                        $this->log->info('no worker is active.');
149
                    } else {
150
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
151
                        $activeWorkerSet->terminate($sig);
152
                        $this->log->info('<------ sent signal');
153
                    }
154
                    exit;
155
                });
156
            }
157
158
            $concurrency = (int)$this->config->get('concurrency');
159
            for ($i = 0; $i < $concurrency; $i++) {
160
                $activeWorkerSet->add($this->forkWorker());
161
            }
162
            $status = null;
163
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
164
                if ($workerPid === true || $workerPid === 0) {
165
                    usleep(100000);
166
                    continue;
167
                }
168
                $activeWorkerSet->delete($workerPid);
169
                $activeWorkerSet->add($this->forkWorker());
170
                $status = null;
171
            }
172
            exit;
173
            // @codeCoverageIgnoreEnd
174
        }
175
    }
176
177
    /**
178
     * fork worker process
179
     *
180
     * @return  \Ackintosh\Snidel\Worker
181
     * @throws  \RuntimeException
182
     */
183
    private function forkWorker()
184
    {
185
        try {
186
            $process = $this->pcntl->fork();
187
        } catch (\RuntimeException $e) {
188
            $message = 'failed to fork worker: ' . $e->getMessage();
189
            $this->log->error($message);
190
            throw new \RuntimeException($message);
191
        }
192
193
        $worker = new Worker($process, $this->config->get('driver'));
194
195
        if (getmypid() === $this->master->getPid()) {
196
            // master
197
            $this->log->info('forked worker. pid: ' . $worker->getPid());
198
            return $worker;
199
        } else {
200
            // @codeCoverageIgnoreStart
201
            // covered by SnidelTest via worker process
202
            // worker
203
            $this->log->info('has forked. pid: ' . getmypid());
204
205
            foreach ($this->signals as $sig) {
206
                $this->pcntl->signal($sig, function ($sig) {
207
                    $this->receivedSignal = $sig;
208
                    exit;
209
                }, false);
210
            }
211
212
            register_shutdown_function(function () use ($worker) {
213
                if ($this->receivedSignal === null && $worker->isInProgress()) {
214
                    $worker->error();
215
                }
216
            });
217
218
            $this->log->info('----> started the function.');
219
            try {
220
                $worker->run();
221
            } catch (\RuntimeException $e) {
222
                $this->log->error($e->getMessage());
223
                exit;
224
            }
225
            $this->log->info('<---- completed the function.');
226
227
            $this->log->info('queued the result and exit.');
228
            exit;
229
            // @codeCoverageIgnoreEnd
230
        }
231
    }
232
233
    /**
234
     * @return  bool
235
     */
236
    public function existsMaster()
237
    {
238
        return $this->master !== null;
239
    }
240
241
    /**
242
     * send signal to master process
243
     *
244
     * @return  void
245
     */
246
    public function sendSignalToMaster($sig = SIGTERM)
247
    {
248
        $this->log->info('----> sending signal to master. signal: ' . $sig);
249
        posix_kill($this->master->getPid(), $sig);
250
        $this->log->info('<---- sent signal.');
251
252
        $this->log->info('----> waiting for master shutdown.');
253
        $status = null;
254
        $this->pcntl->waitpid($this->master->getPid(), $status);
255
        $this->log->info('<---- master shutdown. status: ' . $status);
256
        $this->master = null;
257
    }
258
259
    /**
260
     * @return void
261
     */
262
    public function wait()
263
    {
264
        foreach ($this->results() as $_) {}
0 ignored issues
show
Unused Code introduced by
This foreach statement is empty and can be removed.

This check looks for foreach loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

Consider removing the loop.

Loading history...
265
    }
266
267
    /**
268
     * @return \Generator
269
     */
270
    public function results()
271
    {
272
        for (; $this->queuedCount() > $this->dequeuedCount();) {
273
            for (;;) {
274
                if ($r = $this->resultQueue->dequeue()) {
275
                    $this->dequeuedCount++;
276
                    break;
277
                }
278
            }
279
            $result = ResultFormatter::unserialize(
280
                $r->getMessage()['result']
0 ignored issues
show
Bug introduced by
The variable $r does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
281
            );
282
283
            if ($result->isFailure()) {
284
                $pid = $result->getProcess()->getPid();
285
                $this->error[$pid] = $result;
286
            } else {
287
                yield $result;
288
            }
289
        }
290
    }
291
292
    /**
293
     * @return  bool
294
     */
295
    public function hasError()
296
    {
297
        return $this->error->exists();
298
    }
299
300
    /**
301
     * @return  \Ackintosh\Snidel\Error
302
     */
303
    public function getError()
304
    {
305
        return $this->error;
306
    }
307
308
    public function __destruct()
309
    {
310
        unset($this->resultQueue);
311
    }
312
}
313