Completed
Pull Request — master (#16)
by Akihito
03:03
created

Container::getCollection()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 11
rs 9.4285
cc 2
eloc 6
nc 2
nop 1
1
<?php
2
declare(ticks = 1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\ActiveWorkerSet;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\QueueFactory;
10
use Ackintosh\Snidel\Worker;
11
12
class Container
13
{
14
    /** @var Process */
15
    private $master;
16
17
    /** @var \Ackintosh\Snidel\Pcntl */
18
    private $pcntl;
19
20
    /** @var \Ackintosh\Snidel\Error */
21
    private $error;
22
23
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
24
    private $taskQueue;
25
26
    /** @var \Ackintosh\Snidel\Result\QueueInterface */
27
    private $resultQueue;
28
29
    /** @var \Ackintosh\Snidel\Log */
30
    private $log;
31
32
    /** @var array */
33
    private $signals = [
34
        SIGTERM,
35
        SIGINT,
36
    ];
37
38
    /** @var \Ackintosh\Snidel\Config */
39
    private $config;
40
41
    /** @var \Ackintosh\Snidel\QueueFactory  */
42
    private $queueFactory;
43
44
    /** @var  int */
45
    private $receivedSignal;
46
47
    /**
48
     * @param   int     $ownerPid
0 ignored issues
show
Bug introduced by
There is no parameter named $ownerPid. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
49
     */
50
    public function __construct(Config $config, $log)
51
    {
52
        $this->log              = $log;
53
        $this->config           = $config;
54
        $this->pcntl            = new Pcntl();
55
        $this->error            = new Error();
56
        $this->queueFactory     = new QueueFactory($config);
57
    }
58
59
    /**
60
     * @param   \Ackintosh\Snidel\Task
61
     * @return  void
62
     * @throws  \RuntimeException
63
     */
64
    public function enqueue($task)
65
    {
66
        try {
67
            $this->taskQueue->enqueue($task);
68
        } catch (\RuntimeException $e) {
69
            throw $e;
70
        }
71
    }
72
73
    /**
74
     * @return  int
75
     */
76
    public function queuedCount()
77
    {
78
        if (!isset($this->taskQueue) || is_null($this->taskQueue)) {
79
            return 0;
80
        }
81
82
        return $this->taskQueue->queuedCount();
83
    }
84
85
    /**
86
     * @return  \Ackintosh\Snidel\Result\Result
87
     */
88
    private function dequeue()
89
    {
90
        return $this->resultQueue->dequeue();
91
    }
92
93
    /**
94
     * @return  int
95
     */
96
    public function dequeuedCount()
97
    {
98
        if (!isset($this->resultQueue) || is_null($this->resultQueue)) {
99
            return 0;
100
        }
101
102
        return $this->resultQueue->dequeuedCount();
103
    }
104
105
    /**
106
     * fork master process
107
     *
108
     * @return Process $master
109
     * @throws \RuntimeException
110
     */
111
    public function forkMaster()
112
    {
113
        try {
114
            $this->master = $this->pcntl->fork();
115
        } catch (\RuntimeException $e) {
116
            $message = 'failed to fork master: ' . $e->getMessage();
117
            $this->log->error($message);
118
            throw new \RuntimeException($message);
119
        }
120
121
        $this->log->setMasterPid($this->master->getPid());
122
123
        if (getmypid() === $this->config->get('ownerPid')) {
124
            // owner
125
            $this->log->info('pid: ' . getmypid());
126
            $this->taskQueue    = $this->queueFactory->createTaskQueue();
127
            $this->resultQueue  = $this->queueFactory->createResultQueue();
128
129
            return $this->master;
130
        } else {
131
            // master
132
            $activeWorkerSet = new ActiveWorkerSet();
133
            $this->log->info('pid: ' . $this->master->getPid());
134
135
            foreach ($this->signals as $sig) {
136
                $this->pcntl->signal($sig, function ($sig) use ($activeWorkerSet) {
137
                    $this->receivedSignal = $sig;
138
                    $this->log->info('received signal: ' . $sig);
139
140
                    if ($activeWorkerSet->count() === 0) {
141
                        $this->log->info('no worker is active.');
142
                    } else {
143
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
144
                        $activeWorkerSet->terminate($sig);
145
                        $this->log->info('<------ sent signal');
146
                    }
147
                    exit;
148
                });
149
            }
150
151
            $concurrency = (int)$this->config->get('concurrency');
152
            for ($i = 0; $i < $concurrency; $i++) {
153
                $activeWorkerSet->add($this->forkWorker());
154
            }
155
            $status = null;
156
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
157
                if ($workerPid === true || $workerPid === 0) {
158
                    usleep(100000);
159
                    continue;
160
                }
161
                $activeWorkerSet->delete($workerPid);
162
                $activeWorkerSet->add($this->forkWorker());
163
                $status = null;
164
            }
165
            exit;
166
        }
167
    }
168
169
    /**
170
     * fork worker process
171
     *
172
     * @return  \Ackintosh\Snidel\Worker
173
     * @throws  \RuntimeException
174
     */
175
    private function forkWorker()
176
    {
177
        try {
178
            $process = $this->pcntl->fork();
179
        } catch (\RuntimeException $e) {
180
            $message = 'failed to fork worker: ' . $e->getMessage();
181
            $this->log->error($message);
182
            throw new \RuntimeException($message);
183
        }
184
185
        $worker = new Worker($process);
186
187
        if (getmypid() === $this->master->getPid()) {
188
            // master
189
            $this->log->info('forked worker. pid: ' . $worker->getPid());
190
            return $worker;
191
        } else {
192
            // worker
193
            // @codeCoverageIgnoreStart
194
            $this->log->info('has forked. pid: ' . getmypid());
195
196
            foreach ($this->signals as $sig) {
197
                $this->pcntl->signal($sig, function ($sig) {
198
                    $this->receivedSignal = $sig;
199
                    exit;
200
                }, false);
201
            }
202
203
            $worker->setTaskQueue($this->queueFactory->createTaskQueue());
204
            $worker->setResultQueue($this->queueFactory->createResultQueue());
205
206
            register_shutdown_function(function () use ($worker) {
207
                if (!$worker->hasTask()) {
208
                    return;
209
                }
210
211
                if (!$worker->done() && $this->receivedSignal === null) {
212
                    $worker->error();
213
                }
214
            });
215
216
            $this->log->info('----> started the function.');
217
            try {
218
                $worker->run();
219
            } catch (\RuntimeException $e) {
220
                $this->log->error($e->getMessage());
221
                exit;
222
            }
223
            $this->log->info('<---- completed the function.');
224
225
            $this->log->info('queued the result and exit.');
226
            exit;
227
            // @codeCoverageIgnoreEnd
228
        }
229
    }
230
231
    /**
232
     * @return  bool
233
     */
234
    public function existsMaster()
235
    {
236
        return $this->master !== null;
237
    }
238
239
    /**
240
     * send signal to master process
241
     *
242
     * @return  void
243
     */
244
    public function sendSignalToMaster($sig = SIGTERM)
245
    {
246
        $this->log->info('----> sending signal to master. signal: ' . $sig);
247
        posix_kill($this->master->getPid(), $sig);
248
        $this->log->info('<---- sent signal.');
249
250
        $this->log->info('----> waiting for master shutdown.');
251
        $status = null;
252
        $this->pcntl->waitpid($this->master->getPid(), $status);
253
        $this->log->info('<---- master shutdown. status: ' . $status);
254
        $this->master = null;
255
    }
256
257
    /**
258
     * @return void
259
     */
260
    public function wait()
261
    {
262
        for (; $this->queuedCount() > $this->dequeuedCount();) {
263
            $result = $this->dequeue();
264
            if ($result->isFailure()) {
265
                $this->error[$result->getProcess()->getPid()] = $result;
266
            }
267
        }
268
    }
269
270
    /**
271
     * @return \Generator
272
     */
273
    public function results()
274
    {
275
        for (; $this->queuedCount() > $this->dequeuedCount();) {
276
            $result = $this->dequeue();
277
278
            if ($result->isFailure()) {
279
                $pid = $result->getProcess()->getPid();
280
                $this->error[$pid] = $result;
281
            } else {
282
                yield $result;
283
            }
284
        }
285
    }
286
287
    /**
288
     * @return  bool
289
     */
290
    public function hasError()
291
    {
292
        return $this->error->exists();
293
    }
294
295
    /**
296
     * @return  \Ackintosh\Snidel\Error
297
     */
298
    public function getError()
299
    {
300
        return $this->error;
301
    }
302
303
    public function __destruct()
304
    {
305
        unset($this->taskQueue);
306
        unset($this->resultQueue);
307
    }
308
}
309