Completed
Pull Request — master (#33)
by Akihito
01:30
created

Coordinator::existsMaster()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
declare(ticks=1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\WorkerPool;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\Traits\Queueing;
10
use Ackintosh\Snidel\Worker;
11
use Bernard\Router\SimpleRouter;
12
13
class Coordinator
14
{
15
    use Queueing;
16
17
    /** @var Process */
18
    private $master;
19
20
    /** @var \Ackintosh\Snidel\Pcntl */
21
    private $pcntl;
22
23
    /** @var \Ackintosh\Snidel\Error */
24
    private $error;
25
26
    /** @var \Ackintosh\Snidel\Log */
27
    private $log;
28
29
    /** @var array */
30
    private $signals = [
31
        SIGTERM,
32
        SIGINT,
33
    ];
34
35
    /** @var \Ackintosh\Snidel\Config */
36
    private $config;
37
38
    /** @var  int */
39
    private $receivedSignal;
40
41
    /** @var int */
42
    private $queuedCount = 0;
43
    /** @var int */
44
    private $dequeuedCount = 0;
45
46
    /** @var \Bernard\QueueFactory\PersistentFactory */
47
    private $factory;
48
49
    /** @var \Bernard\Producer */
50
    private $producer;
51
52
    /** @var \Bernard\Consumer */
53
    private $consumer;
54
55
    /** @var \Bernard\Queue  */
56
    private $resultQueue;
57
58
    /**
59
     * @param   int     $ownerPid
0 ignored issues
show
Bug introduced by
There is no parameter named $ownerPid. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
60
     */
61
    public function __construct(Config $config, $log)
62
    {
63
        $this->log = $log;
64
        $this->config = $config;
65
        $this->pcntl = new Pcntl();
66
        $this->error = new Error();
67
68
        $this->factory = $this->createFactory($this->config->get('driver'));
69
        $router = new SimpleRouter();
70
        $router->add('Result', $this);
71
        $this->consumer = $this->createConsumer($router);
72
        $this->producer = $this->createProducer($this->factory);
73
    }
74
75
    /**
76
     * @param   \Ackintosh\Snidel\Task
77
     * @return  void
78
     * @throws  \RuntimeException
79
     */
80
    public function enqueue($task)
81
    {
82
        try {
83
            $this->producer->produce($task);
84
            $this->queuedCount++;
85
86
        } catch (\RuntimeException $e) {
87
            throw $e;
88
        }
89
    }
90
91
    /**
92
     * @return  int
93
     */
94
    public function queuedCount()
95
    {
96
        return $this->queuedCount;
97
    }
98
99
    /**
100
     * @return  int
101
     */
102
    public function dequeuedCount()
103
    {
104
        return $this->dequeuedCount;
105
    }
106
107
    /**
108
     * fork master process
109
     *
110
     * @return Process $master
111
     * @throws \RuntimeException
112
     */
113
    public function forkMaster()
114
    {
115
        try {
116
            $this->master = $this->pcntl->fork();
117
        } catch (\RuntimeException $e) {
118
            $message = 'failed to fork master: ' . $e->getMessage();
119
            $this->log->error($message);
120
            throw new \RuntimeException($message);
121
        }
122
123
        $this->log->setMasterPid($this->master->getPid());
124
125
        if (getmypid() === $this->config->get('ownerPid')) {
126
            // owner
127
            $this->log->info('pid: ' . getmypid());
128
            $this->resultQueue  = $this->factory->create('result');
129
130
            return $this->master;
131
        } else {
132
            // @codeCoverageIgnoreStart
133
            // covered by SnidelTest via master process
134
            // master
135
            $workerPool = new WorkerPool();
136
            $this->log->info('pid: ' . $this->master->getPid());
137
138
            foreach ($this->signals as $sig) {
139
                $this->pcntl->signal($sig, function ($sig) use ($workerPool) {
140
                    $this->receivedSignal = $sig;
141
                    $this->log->info('received signal: ' . $sig);
142
143
                    if ($workerPool->count() === 0) {
144
                        $this->log->info('no worker is active.');
145
                    } else {
146
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
147
                        $workerPool->terminate($sig);
148
                        $this->log->info('<------ sent signal');
149
                    }
150
                    exit;
151
                });
152
            }
153
154
            $concurrency = (int)$this->config->get('concurrency');
155
            for ($i = 0; $i < $concurrency; $i++) {
156
                $workerPool->add($this->forkWorker());
157
            }
158
            $status = null;
159
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
160
                if ($workerPid === true || $workerPid === 0) {
161
                    usleep(100000);
162
                    continue;
163
                }
164
                $workerPool->delete($workerPid);
165
                $workerPool->add($this->forkWorker());
166
                $status = null;
167
            }
168
            exit;
169
            // @codeCoverageIgnoreEnd
170
        }
171
    }
172
173
    /**
174
     * fork worker process
175
     *
176
     * @return  \Ackintosh\Snidel\Worker
177
     * @throws  \RuntimeException
178
     */
179
    private function forkWorker()
180
    {
181
        try {
182
            $process = $this->pcntl->fork();
183
        } catch (\RuntimeException $e) {
184
            $message = 'failed to fork worker: ' . $e->getMessage();
185
            $this->log->error($message);
186
            throw new \RuntimeException($message);
187
        }
188
189
        $worker = new Worker($process, $this->config->get('driver'), $this->config->get('pollingDuration'));
190
191
        if (getmypid() === $this->master->getPid()) {
192
            // master
193
            $this->log->info('forked worker. pid: ' . $worker->getPid());
194
            return $worker;
195
        } else {
196
            // @codeCoverageIgnoreStart
197
            // covered by SnidelTest via worker process
198
            // worker
199
            $this->log->info('has forked. pid: ' . getmypid());
200
201
            foreach ($this->signals as $sig) {
202
                $this->pcntl->signal($sig, function ($sig) {
203
                    $this->receivedSignal = $sig;
204
                    exit;
205
                }, false);
206
            }
207
208
            register_shutdown_function(function () use ($worker) {
209
                if ($this->receivedSignal === null && $worker->isInProgress()) {
210
                    $worker->error();
211
                }
212
            });
213
214
            $this->log->info('----> started the function.');
215
            try {
216
                $worker->run();
217
            } catch (\RuntimeException $e) {
218
                $this->log->error($e->getMessage());
219
                exit;
220
            }
221
            $this->log->info('<---- completed the function.');
222
223
            $this->log->info('queued the result and exit.');
224
            exit;
225
            // @codeCoverageIgnoreEnd
226
        }
227
    }
228
229
    /**
230
     * @return  bool
231
     */
232
    public function existsMaster()
233
    {
234
        return $this->master !== null;
235
    }
236
237
    /**
238
     * send signal to master process
239
     *
240
     * @return  void
241
     */
242
    public function sendSignalToMaster($sig = SIGTERM)
243
    {
244
        $this->log->info('----> sending signal to master. signal: ' . $sig);
245
        posix_kill($this->master->getPid(), $sig);
246
        $this->log->info('<---- sent signal.');
247
248
        $this->log->info('----> waiting for master shutdown.');
249
        $status = null;
250
        $this->pcntl->waitpid($this->master->getPid(), $status);
251
        $this->log->info('<---- master shutdown. status: ' . $status);
252
        $this->master = null;
253
    }
254
255
    /**
256
     * @return void
257
     */
258
    public function wait()
259
    {
260
        foreach ($this->results() as $_) {}
261
    }
262
263
    /**
264
     * @return \Generator
265
     */
266
    public function results()
267
    {
268
        for (; $this->queuedCount() > $this->dequeuedCount();) {
269
            for (;;) {
270
                if ($envelope = $this->resultQueue->dequeue($this->config->get('pollingDuration'))) {
0 ignored issues
show
Unused Code introduced by
The call to Queue::dequeue() has too many arguments starting with $this->config->get('pollingDuration').

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
271
                    $this->dequeuedCount++;
272
                    break;
273
                }
274
            }
275
            $result = $envelope->getMessage();
0 ignored issues
show
Bug introduced by
The variable $envelope does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
276
            if ($result->isFailure()) {
277
                $pid = $result->getProcess()->getPid();
278
                $this->error[$pid] = $result;
279
            } else {
280
                yield $result;
281
            }
282
        }
283
    }
284
285
    /**
286
     * @return  bool
287
     */
288
    public function hasError()
289
    {
290
        return $this->error->exists();
291
    }
292
293
    /**
294
     * @return  \Ackintosh\Snidel\Error
295
     */
296
    public function getError()
297
    {
298
        return $this->error;
299
    }
300
301
    public function __destruct()
302
    {
303
        unset($this->resultQueue);
304
    }
305
}
306