Completed
Pull Request — master (#19)
by Akihito
04:09
created

Container::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 15
rs 9.4285
cc 1
eloc 11
nc 1
nop 2
1
<?php
2
declare(ticks = 1);
3
namespace Ackintosh\Snidel\Fork;
4
5
use Ackintosh\Snidel\ActiveWorkerSet;
6
use Ackintosh\Snidel\Config;
7
use Ackintosh\Snidel\Error;
8
use Ackintosh\Snidel\Pcntl;
9
use Ackintosh\Snidel\Result\Formatter as ResultFormatter;
10
use Ackintosh\Snidel\Task\Formatter as TaskFormatter;
11
use Ackintosh\Snidel\Worker;
12
use Bernard\Consumer;
13
use Bernard\Driver\FlatFileDriver;
14
use Bernard\Message\PlainMessage;
15
use Bernard\Producer;
16
use Bernard\QueueFactory\PersistentFactory;
17
use Bernard\Router\SimpleRouter;
18
use Bernard\Serializer;
19
use Symfony\Component\EventDispatcher\EventDispatcher;
20
21
class Container
22
{
23
    /** @var Process */
24
    private $master;
25
26
    /** @var \Ackintosh\Snidel\Pcntl */
27
    private $pcntl;
28
29
    /** @var \Ackintosh\Snidel\Error */
30
    private $error;
31
32
    /** @var \Ackintosh\Snidel\Task\QueueInterface */
33
    private $taskQueue;
0 ignored issues
show
Unused Code introduced by
The property $taskQueue is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
34
35
36
    /** @var \Ackintosh\Snidel\Log */
37
    private $log;
38
39
    /** @var array */
40
    private $signals = [
41
        SIGTERM,
42
        SIGINT,
43
    ];
44
45
    /** @var \Ackintosh\Snidel\Config */
46
    private $config;
47
48
    /** @var  int */
49
    private $receivedSignal;
50
51
    private $queuedCount = 0;
52
    private $dequeuedCount = 0;
53
54
    private $factory;
55
    private $producer;
56
    private $consumer;
57
    private $resultQueue;
58
59
    /**
60
     * @param   int     $ownerPid
0 ignored issues
show
Bug introduced by
There is no parameter named $ownerPid. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
61
     */
62
    public function __construct(Config $config, $log)
63
    {
64
        $this->log              = $log;
65
        $this->config           = $config;
66
        $this->pcntl            = new Pcntl();
67
        $this->error            = new Error();
68
69
        $driver = new FlatFileDriver('/tmp/hoge');
70
        $this->factory = new PersistentFactory($driver, new Serializer());
71
        $this->producer = new Producer($this->factory, new EventDispatcher());
72
73
        $router = new SimpleRouter();
74
        $router->add('Result', $this);
75
        $this->consumer = new Consumer($router, new EventDispatcher());
76
    }
77
78
    /**
79
     * @param   \Ackintosh\Snidel\Task
80
     * @return  void
81
     * @throws  \RuntimeException
82
     */
83
    public function enqueue($task)
84
    {
85
        try {
86
            $this->producer->produce(
87
                new PlainMessage(
88
                    'Task',
89
                    [
90
                        'task' => TaskFormatter::serialize($task),
91
                    ]
92
                )
93
            );
94
            $this->queuedCount++;
95
96
        } catch (\RuntimeException $e) {
97
            throw $e;
98
        }
99
    }
100
101
    /**
102
     * @return  int
103
     */
104
    public function queuedCount()
105
    {
106
        return $this->queuedCount;
107
    }
108
109
    /**
110
     * @return  int
111
     */
112
    public function dequeuedCount()
113
    {
114
        return $this->dequeuedCount;
115
    }
116
117
    /**
118
     * fork master process
119
     *
120
     * @return Process $master
121
     * @throws \RuntimeException
122
     */
123
    public function forkMaster()
124
    {
125
        try {
126
            $this->master = $this->pcntl->fork();
127
        } catch (\RuntimeException $e) {
128
            $message = 'failed to fork master: ' . $e->getMessage();
129
            $this->log->error($message);
130
            throw new \RuntimeException($message);
131
        }
132
133
        $this->log->setMasterPid($this->master->getPid());
134
135
        if (getmypid() === $this->config->get('ownerPid')) {
136
            // owner
137
            $this->log->info('pid: ' . getmypid());
138
            $this->resultQueue  = $this->factory->create('result');
139
140
            return $this->master;
141
        } else {
142
            // master
143
            $activeWorkerSet = new ActiveWorkerSet();
144
            $this->log->info('pid: ' . $this->master->getPid());
145
146
            foreach ($this->signals as $sig) {
147
                $this->pcntl->signal($sig, function ($sig) use ($activeWorkerSet) {
148
                    $this->receivedSignal = $sig;
149
                    $this->log->info('received signal: ' . $sig);
150
151
                    if ($activeWorkerSet->count() === 0) {
152
                        $this->log->info('no worker is active.');
153
                    } else {
154
                        $this->log->info('------> sending signal to workers. signal: ' . $sig);
155
                        $activeWorkerSet->terminate($sig);
156
                        $this->log->info('<------ sent signal');
157
                    }
158
                    exit;
159
                });
160
            }
161
162
            $concurrency = (int)$this->config->get('concurrency');
163
            for ($i = 0; $i < $concurrency; $i++) {
164
                $activeWorkerSet->add($this->forkWorker());
165
            }
166
            $status = null;
167
            while (($workerPid = $this->pcntl->waitpid(-1, $status, WNOHANG)) !== -1) {
168
                if ($workerPid === true || $workerPid === 0) {
169
                    usleep(100000);
170
                    continue;
171
                }
172
                $activeWorkerSet->delete($workerPid);
173
                $activeWorkerSet->add($this->forkWorker());
174
                $status = null;
175
            }
176
            exit;
177
        }
178
    }
179
180
    /**
181
     * fork worker process
182
     *
183
     * @return  \Ackintosh\Snidel\Worker
184
     * @throws  \RuntimeException
185
     */
186
    private function forkWorker()
187
    {
188
        try {
189
            $process = $this->pcntl->fork();
190
        } catch (\RuntimeException $e) {
191
            $message = 'failed to fork worker: ' . $e->getMessage();
192
            $this->log->error($message);
193
            throw new \RuntimeException($message);
194
        }
195
196
        $worker = new Worker($process);
197
198
        if (getmypid() === $this->master->getPid()) {
199
            // master
200
            $this->log->info('forked worker. pid: ' . $worker->getPid());
201
            return $worker;
202
        } else {
203
            // worker
204
            // @codeCoverageIgnoreStart
205
            $this->log->info('has forked. pid: ' . getmypid());
206
207
            foreach ($this->signals as $sig) {
208
                $this->pcntl->signal($sig, function ($sig) {
209
                    $this->receivedSignal = $sig;
210
                    exit;
211
                }, false);
212
            }
213
214
            register_shutdown_function(function () use ($worker) {
215
                if (!$worker->hasTask()) {
216
                    return;
217
                }
218
219
                if (!$worker->done() && $this->receivedSignal === null) {
220
                    $worker->error();
221
                }
222
            });
223
224
            $this->log->info('----> started the function.');
225
            try {
226
                $worker->run();
227
            } catch (\RuntimeException $e) {
228
                $this->log->error($e->getMessage());
229
                exit;
230
            }
231
            $this->log->info('<---- completed the function.');
232
233
            $this->log->info('queued the result and exit.');
234
            exit;
235
            // @codeCoverageIgnoreEnd
236
        }
237
    }
238
239
    /**
240
     * @return  bool
241
     */
242
    public function existsMaster()
243
    {
244
        return $this->master !== null;
245
    }
246
247
    /**
248
     * send signal to master process
249
     *
250
     * @return  void
251
     */
252
    public function sendSignalToMaster($sig = SIGTERM)
253
    {
254
        $this->log->info('----> sending signal to master. signal: ' . $sig);
255
        posix_kill($this->master->getPid(), $sig);
256
        $this->log->info('<---- sent signal.');
257
258
        $this->log->info('----> waiting for master shutdown.');
259
        $status = null;
260
        $this->pcntl->waitpid($this->master->getPid(), $status);
261
        $this->log->info('<---- master shutdown. status: ' . $status);
262
        $this->master = null;
263
    }
264
265
    /**
266
     * @return void
267
     */
268
    public function wait()
269
    {
270
        foreach ($this->results() as $_) {}
0 ignored issues
show
Unused Code introduced by
This foreach statement is empty and can be removed.

This check looks for foreach loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

Consider removing the loop.

Loading history...
271
    }
272
273
    /**
274
     * @return \Generator
275
     */
276
    public function results()
277
    {
278
        for (; $this->queuedCount() > $this->dequeuedCount();) {
279
            $result = ResultFormatter::unserialize(
280
                $this->resultQueue->dequeue()->getMessage()['result']
281
            );
282
            $this->dequeuedCount++;
283
284
            if ($result->isFailure()) {
285
                $pid = $result->getProcess()->getPid();
286
                $this->error[$pid] = $result;
287
            } else {
288
                yield $result;
289
            }
290
        }
291
    }
292
293
    /**
294
     * @return  bool
295
     */
296
    public function hasError()
297
    {
298
        return $this->error->exists();
299
    }
300
301
    /**
302
     * @return  \Ackintosh\Snidel\Error
303
     */
304
    public function getError()
305
    {
306
        return $this->error;
307
    }
308
309
    public function __destruct()
310
    {
311
        unset($this->resultQueue);
312
    }
313
}
314