Completed
Pull Request — master (#19)
by Akihito
01:32
created

Container::dequeue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
declare(ticks = 1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\ActiveWorkerSet;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\Result\Formatter as ResultFormatter;
10
use Ackintosh\Snidel\Task\Formatter as TaskFormatter;
11
use Ackintosh\Snidel\Worker;
12
use Bernard\Consumer;
13
use Bernard\Driver\FlatFileDriver;
14
use Bernard\Message\PlainMessage;
15
use Bernard\Producer;
16
use Bernard\QueueFactory\PersistentFactory;
17
use Bernard\Router\SimpleRouter;
18
use Bernard\Serializer;
19
use Symfony\Component\EventDispatcher\EventDispatcher;
20
21
class Container
22
{
23
    /** @var Process */
24
    private $master;
25
26
    /** @var \Ackintosh\Snidel\Pcntl */
27
    private $pcntl;
28
29
    /** @var \Ackintosh\Snidel\Error */
30
    private $error;
31
32
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
33
    private $taskQueue;
0 ignored issues
show
Unused Code introduced by
The property $taskQueue is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
34
35
36
    /** @var \Ackintosh\Snidel\Log */
37
    private $log;
38
39
    /** @var array */
40
    private $signals = [
41
        SIGTERM,
42
        SIGINT,
43
    ];
44
45
    /** @var \Ackintosh\Snidel\Config */
46
    private $config;
47
48
    /** @var  int */
49
    private $receivedSignal;
50
51
    private $queuedCount = 0;
52
    private $dequeuedCount = 0;
53
54
    private $factory;
55
    private $producer;
56
    private $consumer;
57
    private $resultQueue;
58
59
    /**
60
     * @param   int     $ownerPid
0 ignored issues
show
Bug introduced by
There is no parameter named $ownerPid. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
61
     */
62
    public function __construct(Config $config, $log)
63
    {
64
        $this->log              = $log;
65
        $this->config           = $config;
66
        $this->pcntl            = new Pcntl();
67
        $this->error            = new Error();
68
69
        $driver = new FlatFileDriver('/tmp/hoge');
70
        $this->factory = new PersistentFactory($driver, new Serializer());
71
        $this->producer = new Producer($this->factory, new EventDispatcher());
72
73
        $router = new SimpleRouter();
74
        $router->add('Result', $this);
75
        $this->consumer = new Consumer($router, new EventDispatcher());
76
    }
77
78
    /**
79
     * @param   \Ackintosh\Snidel\Task
80
     * @return  void
81
     * @throws  \RuntimeException
82
     */
83
    public function enqueue($task)
84
    {
85
        try {
86
            $this->producer->produce(
87
                new PlainMessage(
88
                    'Task',
89
                    [
90
                        'task' => TaskFormatter::serialize($task),
91
                    ]
92
                )
93
            );
94
            $this->queuedCount++;
95
96
        } catch (\RuntimeException $e) {
97
            throw $e;
98
        }
99
    }
100
101
    /**
102
     * @return  int
103
     */
104
    public function queuedCount()
105
    {
106
        return $this->queuedCount;
107
    }
108
109
    /**
110
     * @return  int
111
     */
112
    public function dequeuedCount()
113
    {
114
        return $this->dequeuedCount;
115
    }
116
117
    /**
118
     * fork master process
119
     *
120
     * @return Process $master
121
     * @throws \RuntimeException
122
     */
123
    public function forkMaster()
124
    {
125
        try {
126
            $this->master = $this->pcntl->fork();
127
        } catch (\RuntimeException $e) {
128
            $message = 'failed to fork master: ' . $e->getMessage();
129
            $this->log->error($message);
130
            throw new \RuntimeException($message);
131
        }
132
133
        $this->log->setMasterPid($this->master->getPid());
134
135
        if (getmypid() === $this->config->get('ownerPid')) {
136
            // owner
137
            $this->log->info('pid: ' . getmypid());
138
            $this->resultQueue  = $this->factory->create('result');
139
140
            return $this->master;
141
        } else {
142
            // master
143
            $activeWorkerSet = new ActiveWorkerSet();
144
            $this->log->info('pid: ' . $this->master->getPid());
145
146
            foreach ($this->signals as $sig) {
147
                $this->pcntl->signal($sig, function ($sig) use ($activeWorkerSet) {
148
                    $this->receivedSignal = $sig;
149
                    $this->log->info('received signal: ' . $sig);
150
151
                    if ($activeWorkerSet->count() === 0) {
152
                        $this->log->info('no worker is active.');
153
                    } else {
154
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
155
                        $activeWorkerSet->terminate($sig);
156
                        $this->log->info('<------ sent signal');
157
                    }
158
                    exit;
159
                });
160
            }
161
162
            $concurrency = (int)$this->config->get('concurrency');
163
            for ($i = 0; $i < $concurrency; $i++) {
164
                $activeWorkerSet->add($this->forkWorker());
165
            }
166
            $status = null;
167
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
168
                if ($workerPid === true || $workerPid === 0) {
169
                    usleep(100000);
170
                    continue;
171
                }
172
                $activeWorkerSet->delete($workerPid);
173
                $activeWorkerSet->add($this->forkWorker());
174
                $status = null;
175
            }
176
            exit;
177
        }
178
    }
179
180
    /**
181
     * fork worker process
182
     *
183
     * @return  \Ackintosh\Snidel\Worker
184
     * @throws  \RuntimeException
185
     */
186
    private function forkWorker()
187
    {
188
        try {
189
            $process = $this->pcntl->fork();
190
        } catch (\RuntimeException $e) {
191
            $message = 'failed to fork worker: ' . $e->getMessage();
192
            $this->log->error($message);
193
            throw new \RuntimeException($message);
194
        }
195
196
        $worker = new Worker($process);
197
198
        if (getmypid() === $this->master->getPid()) {
199
            // master
200
            $this->log->info('forked worker. pid: ' . $worker->getPid());
201
            return $worker;
202
        } else {
203
            // worker
204
            // @codeCoverageIgnoreStart
205
            $this->log->info('has forked. pid: ' . getmypid());
206
207
            foreach ($this->signals as $sig) {
208
                $this->pcntl->signal($sig, function ($sig) {
209
                    $this->receivedSignal = $sig;
210
                    exit;
211
                }, false);
212
            }
213
214
            register_shutdown_function(function () use ($worker) {
215
                if ($this->receivedSignal === null && $worker->isInProgress()) {
216
                    $worker->error();
217
                }
218
            });
219
220
            $this->log->info('----> started the function.');
221
            try {
222
                $worker->run();
223
            } catch (\RuntimeException $e) {
224
                $this->log->error($e->getMessage());
225
                exit;
226
            }
227
            $this->log->info('<---- completed the function.');
228
229
            $this->log->info('queued the result and exit.');
230
            exit;
231
            // @codeCoverageIgnoreEnd
232
        }
233
    }
234
235
    /**
236
     * @return  bool
237
     */
238
    public function existsMaster()
239
    {
240
        return $this->master !== null;
241
    }
242
243
    /**
244
     * send signal to master process
245
     *
246
     * @return  void
247
     */
248
    public function sendSignalToMaster($sig = SIGTERM)
249
    {
250
        $this->log->info('----> sending signal to master. signal: ' . $sig);
251
        posix_kill($this->master->getPid(), $sig);
252
        $this->log->info('<---- sent signal.');
253
254
        $this->log->info('----> waiting for master shutdown.');
255
        $status = null;
256
        $this->pcntl->waitpid($this->master->getPid(), $status);
257
        $this->log->info('<---- master shutdown. status: ' . $status);
258
        $this->master = null;
259
    }
260
261
    /**
262
     * @return void
263
     */
264
    public function wait()
265
    {
266
        foreach ($this->results() as $_) {}
0 ignored issues
show
Unused Code introduced by
This foreach statement is empty and can be removed.

This check looks for foreach loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

Consider removing the loop.

Loading history...
267
    }
268
269
    /**
270
     * @return \Generator
271
     */
272
    public function results()
273
    {
274
        for (; $this->queuedCount() > $this->dequeuedCount();) {
275
            for (;;) {
276
                if ($r = $this->resultQueue->dequeue()) {
277
                    $this->dequeuedCount++;
278
                    break;
279
                }
280
            }
281
            $result = ResultFormatter::unserialize(
282
                $r->getMessage()['result']
0 ignored issues
show
Bug introduced by
The variable $r does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
283
            );
284
285
            if ($result->isFailure()) {
286
                $pid = $result->getProcess()->getPid();
287
                $this->error[$pid] = $result;
288
            } else {
289
                yield $result;
290
            }
291
        }
292
    }
293
294
    /**
295
     * @return  bool
296
     */
297
    public function hasError()
298
    {
299
        return $this->error->exists();
300
    }
301
302
    /**
303
     * @return  \Ackintosh\Snidel\Error
304
     */
305
    public function getError()
306
    {
307
        return $this->error;
308
    }
309
310
    public function __destruct()
311
    {
312
        unset($this->resultQueue);
313
    }
314
}
315