Completed
Pull Request — master (#16)
by Akihito
02:30
created

Container::getCollection()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 11
rs 9.4285
cc 2
eloc 6
nc 2
nop 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A Container::__destruct() 0 5 1
1
<?php
2
declare(ticks = 1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\ActiveWorkerSet;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\QueueFactory;
10
use Ackintosh\Snidel\Worker;
11
12
class Container
13
{
14
    /** @var int */
15
    private $masterPid;
16
17
    /** @var \Ackintosh\Snidel\Pcntl */
18
    private $pcntl;
19
20
    /** @var \Ackintosh\Snidel\Error */
21
    private $error;
22
23
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
24
    private $taskQueue;
25
26
    /** @var \Ackintosh\Snidel\Result\QueueInterface */
27
    private $resultQueue;
28
29
    /** @var \Ackintosh\Snidel\Log */
30
    private $log;
31
32
    /** @var array */
33
    private $signals = [
34
        SIGTERM,
35
        SIGINT,
36
    ];
37
38
    /** @var \Ackintosh\Snidel\Config */
39
    private $config;
40
41
    /** @var \Ackintosh\Snidel\QueueFactory  */
42
    private $queueFactory;
43
44
    /** @var  int */
45
    private $receivedSignal;
46
47
    /**
48
     * @param   int     $ownerPid
0 ignored issues
show
Bug introduced by
There is no parameter named $ownerPid. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
49
     */
50
    public function __construct(Config $config, $log)
51
    {
52
        $this->log              = $log;
53
        $this->config           = $config;
54
        $this->pcntl            = new Pcntl();
55
        $this->error            = new Error();
56
        $this->queueFactory     = new QueueFactory($config);
57
    }
58
59
    /**
60
     * @param   \Ackintosh\Snidel\Task
61
     * @return  void
62
     * @throws  \RuntimeException
63
     */
64
    public function enqueue($task)
65
    {
66
        try {
67
            $this->taskQueue->enqueue($task);
68
        } catch (\RuntimeException $e) {
69
            throw $e;
70
        }
71
    }
72
73
    /**
74
     * @return  int
75
     */
76
    public function queuedCount()
77
    {
78
        if (is_null($this->taskQueue)) {
79
            return 0;
80
        }
81
82
        return $this->taskQueue->queuedCount();
83
    }
84
85
    /**
86
     * @return  \Ackintosh\Snidel\Result\Result
87
     */
88
    private function dequeue()
89
    {
90
        return $this->resultQueue->dequeue();
91
    }
92
93
    /**
94
     * @return  int
95
     */
96
    public function dequeuedCount()
97
    {
98
        if (is_null($this->resultQueue)) {
99
            return 0;
100
        }
101
102
        return $this->resultQueue->dequeuedCount();
103
    }
104
105
    /**
106
     * fork process
107
     *
108
     * @return  \Ackintosh\Snidel\Fork\Fork
109
     * @throws  \RuntimeException
110
     */
111
    private function fork()
112
    {
113
        $pid = $this->pcntl->fork();
114
        if ($pid === -1) {
115
            throw new \RuntimeException('could not fork a new process');
116
        }
117
118
        $pid = ($pid === 0) ? getmypid() : $pid;
119
120
        return new Fork($pid);
121
    }
122
123
    /**
124
     * fork master process
125
     *
126
     * @return  int     $masterPid
127
     */
128
    public function forkMaster()
129
    {
130
        try {
131
            $fork = $this->fork();
132
        } catch (\RuntimeException $e) {
133
            throw $e;
134
        }
135
136
        $this->masterPid = $fork->getPid();
137
        $this->log->setMasterPid($this->masterPid);
138
139
        if (getmypid() === $this->config->get('ownerPid')) {
140
            // owner
141
            $this->log->info('pid: ' . getmypid());
142
            $this->taskQueue    = $this->queueFactory->createTaskQueue();
143
            $this->resultQueue  = $this->queueFactory->createResultQueue();
144
145
            return $this->masterPid;
146
        } else {
147
            // master
148
            $activeWorkerSet = new ActiveWorkerSet();
149
            $this->log->info('pid: ' . $this->masterPid);
150
151
            foreach ($this->signals as $sig) {
152
                $this->pcntl->signal($sig, function ($sig) use ($activeWorkerSet) {
153
                    $this->receivedSignal = $sig;
154
                    $this->log->info('received signal: ' . $sig);
155
156
                    if ($activeWorkerSet->count() === 0) {
157
                        $this->log->info('no worker is active.');
158
                    } else {
159
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
160
                        $activeWorkerSet->terminate($sig);
161
                        $this->log->info('<------ sent signal');
162
                    }
163
                    exit;
164
                });
165
            }
166
167
            $concurrency = (int)$this->config->get('concurrency');
168
            for ($i = 0; $i < $concurrency; $i++) {
169
                $activeWorkerSet->add($this->forkWorker());
170
            }
171
            $status = null;
172
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
173
                if ($workerPid === true || $workerPid === 0) {
174
                    usleep(100000);
175
                    continue;
176
                }
177
                $activeWorkerSet->delete($workerPid);
178
                $activeWorkerSet->add($this->forkWorker());
179
                $status = null;
180
            }
181
            exit;
182
        }
183
    }
184
185
    /**
186
     * fork worker process
187
     *
188
     * @return  \Ackintosh\Snidel\Worker
189
     * @throws  \RuntimeException
190
     */
191
    private function forkWorker()
192
    {
193
        try {
194
            $fork = $this->fork();
195
        } catch (\RuntimeException $e) {
196
            $this->log->error($e->getMessage());
197
            throw $e;
198
        }
199
200
        $worker = new Worker($fork);
201
202
        if (getmypid() === $this->masterPid) {
203
            // master
204
            $this->log->info('forked worker. pid: ' . $worker->getPid());
205
            return $worker;
206
        } else {
207
            // worker
208
            // @codeCoverageIgnoreStart
209
            $this->log->info('has forked. pid: ' . getmypid());
210
211
            foreach ($this->signals as $sig) {
212
                $this->pcntl->signal($sig, function ($sig) {
213
                    $this->receivedSignal = $sig;
214
                    exit;
215
                }, false);
216
            }
217
218
            $worker->setTaskQueue($this->queueFactory->createTaskQueue());
219
            $worker->setResultQueue($this->queueFactory->createResultQueue());
220
221
            register_shutdown_function(function () use ($worker) {
222
                if ($worker->isFailedToEnqueueResult() && $this->receivedSignal === null) {
223
                    $worker->error();
224
                }
225
            });
226
227
            $this->log->info('----> started the function.');
228
            try {
229
                $worker->run();
230
            } catch (\RuntimeException $e) {
231
                $this->log->error($e->getMessage());
232
                exit;
233
            }
234
            $this->log->info('<---- completed the function.');
235
236
            $this->log->info('queued the result and exit.');
237
            exit;
238
            // @codeCoverageIgnoreEnd
239
        }
240
    }
241
242
    /**
243
     * @return  bool
244
     */
245
    public function existsMaster()
246
    {
247
        return $this->masterPid !== null;
248
    }
249
250
    /**
251
     * send signal to master process
252
     *
253
     * @return  void
254
     */
255
    public function sendSignalToMaster($sig = SIGTERM)
256
    {
257
        $this->log->info('----> sending signal to master. signal: ' . $sig);
258
        posix_kill($this->masterPid, $sig);
259
        $this->log->info('<---- sent signal.');
260
261
        $this->log->info('----> waiting for master shutdown.');
262
        $status = null;
263
        $this->pcntl->waitpid($this->masterPid, $status);
264
        $this->log->info('<---- master shutdown. status: ' . $status);
265
        $this->masterPid = null;
266
    }
267
268
    /**
269
     * @return void
270
     */
271
    public function wait()
272
    {
273
        for (; $this->queuedCount() > $this->dequeuedCount();) {
274
            $result = $this->dequeue();
275
            if ($result->isFailure()) {
276
                $this->error[$result->getFork()->getPid()] = $result;
277
            }
278
        }
279
    }
280
281
    /**
282
     * @return \Generator
283
     */
284
    public function results()
285
    {
286
        for (; $this->queuedCount() > $this->dequeuedCount();) {
287
            $result = $this->dequeue();
288
289
            if ($result->isFailure()) {
290
                $pid = $result->getFork()->getPid();
291
                $this->error[$pid] = $result;
292
            } else {
293
                yield $result;
294
            }
295
        }
296
    }
297
298
    /**
299
     * @return  bool
300
     */
301
    public function hasError()
302
    {
303
        return $this->error->exists();
304
    }
305
306
    /**
307
     * @return  \Ackintosh\Snidel\Error
308
     */
309
    public function getError()
310
    {
311
        return $this->error;
312
    }
313
314
    public function __destruct()
315
    {
316
        unset($this->taskQueue);
317
        unset($this->resultQueue);
318
    }
319
}
320