Completed
Push — master ( e481c7...bcdb1b )
by Marco
35:35 queued 20:00
created

Manager::cycle()   C

Complexity

Conditions 9
Paths 12

Size

Total Lines 49
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 20
nc 12
nop 0
dl 0
loc 49
rs 5.7446
c 0
b 0
f 0
1
<?php namespace Comodojo\Extender\Task;
2
3
use \Comodojo\Foundation\Base\Configuration;
4
use \Comodojo\Foundation\Events\Manager as EventsManager;
5
use \Comodojo\Foundation\Logging\LoggerTrait;
6
use \Comodojo\Foundation\Events\EventsTrait;
7
use \Comodojo\Daemon\Utils\ProcessTools;
8
use \Comodojo\Foundation\Base\ConfigurationTrait;
9
use \Comodojo\Extender\Traits\TasksTableTrait;
10
use \Comodojo\Extender\Traits\TaskErrorHandlerTrait;
11
use \Comodojo\Extender\Utils\Validator as ExtenderCommonValidations;
12
use \Comodojo\Extender\Components\Ipc;
13
use \Comodojo\Extender\Task\Table as TasksTable;
14
use \Comodojo\Extender\Components\Database;
15
use \Psr\Log\LoggerInterface;
16
use \DateTime;
17
use \Exception;
18
19
/**
20
* @package     Comodojo Extender
21
* @author      Marco Giovinazzi <[email protected]>
22
* @license     MIT
23
*
24
* LICENSE:
25
*
26
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
29
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
30
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
31
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
32
* THE SOFTWARE.
33
 */
34
35
class Manager {
36
37
    use ConfigurationTrait;
38
    use LoggerTrait;
39
    use EventsTrait;
40
    use TasksTableTrait;
41
    use TaskErrorHandlerTrait;
42
43
    /**
44
     * @var int
45
     */
46
    protected $lagger_timeout;
47
48
    /**
49
     * @var bool
50
     */
51
    protected $multithread;
52
53
    /**
54
     * @var int
55
     */
56
    protected $max_runtime;
57
58
    /**
59
     * @var int
60
     */
61
    protected $max_childs;
62
63
    /**
64
     * @var Ipc
65
     */
66
    protected $ipc;
67
68
    /**
69
     * @var Locker
70
     */
71
    protected $locker;
72
73
    /**
74
     * @var Tracker
75
     */
76
    protected $tracker;
77
78
    /**
79
     * Class constructor
80
     *
81
     * @param string $manager_name
82
     * @param Configuration $configuration
83
     * @param LoggerInterface $logger
84
     * @param TasksTable $tasks
85
     * @param EventsManager $events
86
     * @param EntityManager $em
0 ignored issues
show
Bug introduced by
The type Comodojo\Extender\Task\EntityManager was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
87
     */
88
    public function __construct(
89
        Locker $locker,
90
        Configuration $configuration,
91
        LoggerInterface $logger,
92
        TasksTable $tasks,
93
        EventsManager $events
94
    ) {
95
96
        $this->setConfiguration($configuration);
97
        $this->setLogger($logger);
98
        $this->setTasksTable($tasks);
99
        $this->setEvents($events);
100
101
        $this->locker = $locker;
102
        $this->tracker = new Tracker($configuration, $logger);
103
        $this->ipc = new Ipc($configuration);
104
105
        // retrieve parameters
106
        $this->lagger_timeout = ExtenderCommonValidations::laggerTimeout($this->configuration->get('child-lagger-timeout'));
107
        $this->multithread = ExtenderCommonValidations::multithread($this->configuration->get('multithread'));
108
        $this->max_runtime = ExtenderCommonValidations::maxChildRuntime($this->configuration->get('child-max-runtime'));
109
        $this->max_childs = ExtenderCommonValidations::forkLimit($this->configuration->get('fork-limit'));
110
111
        // $logger->debug("Tasks Manager online", array(
112
        //     'lagger_timeout' => $this->lagger_timeout,
113
        //     'multithread' => $this->multithread,
114
        //     'max_runtime' => $this->max_runtime,
115
        //     'max_childs' => $this->max_childs,
116
        //     'tasks_count' => count($this->table)
117
        // ));
118
119
    }
120
121
    public function add(Request $request) {
122
123
        $this->tracker->setQueued($request);
124
125
        return $this;
126
127
    }
128
129
    public function addBulk(array $requests) {
130
131
        foreach ($requests as $id => $request) {
132
133
            if ($request instanceof \Comodojo\Extender\Task\Request) {
134
                $this->add($request);
135
            } else {
136
                $this->logger->error("Skipping invalid request with local id $id: class mismatch");
137
            }
138
139
        }
140
141
        return $this;
142
143
    }
144
145
    public function run() {
146
147
        $this->updateTrackerSetQueued();
148
149
        while ( $this->tracker->countQueued() > 0 ) {
150
151
            // Start to cycle queued tasks
152
            $this->cycle();
153
154
        }
155
156
        $this->ipc->free();
157
158
        return $this->tracker->getCompleted();
159
160
    }
161
162
    protected function cycle() {
163
164
        $this->installErrorHandler();
165
166
        foreach ($this->tracker->getQueued() as $uid => $request) {
167
168
            if ( $this->multithread === false ) {
169
170
                $this->runSingleThread($uid, $request);
171
172
            } else {
173
174
                try {
175
176
                    $pid = $this->forker($request);
177
178
                } catch (Exception $e) {
179
180
                    $result = self::generateSyntheticResult($uid, $e->getMessage(), $request->getJid(), false);
181
182
                    $this->updateTrackerSetAborted($uid, $result);
183
184
                    if ( $request->isChain() ) $this->evalChain($request, $result);
185
186
                    continue;
187
188
                }
189
190
                $this->updateTrackerSetRunning($uid, $pid);
191
192
                if ( $this->max_childs > 0 && $this->tracker->countRunning() >= $this->max_childs ) {
193
194
                    while( $this->tracker->countRunning() >= $this->max_childs ) {
195
196
                        $this->catcher();
197
198
                    }
199
200
                }
201
202
            }
203
204
        }
205
206
        // spawn the loop if multithread
207
        if ( $this->multithread === true ) {
208
            $this->catcher_loop();
209
        } else {
210
            $this->restoreErrorHandler();
211
        }
212
213
    }
214
215
    protected function runSingleThread($uid, Request $request) {
216
217
        $pid = ProcessTools::getPid();
218
219
        $this->updateTrackerSetRunning($uid, $pid);
220
221
        $result = Runner::fastStart(
222
            $request,
223
            $this->getConfiguration(),
224
            $this->getLogger(),
225
            $this->getTasksTable(),
226
            $this->getEvents()
227
        );
228
229
        if ( $request->isChain() ) $this->evalChain($request, $result);
230
231
        $this->updateTrackerSetCompleted($uid, $result);
232
233
        $success = $result->success === false ? "error" : "success";
0 ignored issues
show
Bug Best Practice introduced by
The property success does not exist on Comodojo\Extender\Task\Result. Since you implemented __get, consider adding a @property annotation.
Loading history...
234
        $this->logger->notice("Task ".$request->getName()."(uid: ".$request->getUid().") ends in $success");
235
236
    }
237
238
    private function forker(Request $request) {
239
240
        $uid = $request->getUid();
241
242
        try {
243
244
            $this->ipc->init($uid);
245
246
        } catch (Exception $e) {
247
248
            $this->logger->error("Aborting task ".$request->getName().": ".$e->getMessage());
249
250
            $this->ipc->hang($uid);
251
252
            throw $e;
253
254
        }
255
256
        $pid = pcntl_fork();
257
258
        if ( $pid == -1 ) {
259
260
            throw new Exception("Unable to fork job, aborting");
261
262
        } elseif ( $pid ) {
263
264
            $niceness = $request->getNiceness();
265
266
            if ( $niceness !== null ) ProcessTools::setNiceness($niceness, $pid);
0 ignored issues
show
introduced by
The condition $niceness !== null is always true.
Loading history...
267
268
        } else {
269
270
            $this->ipc->close($uid, Ipc::READER);
271
272
            $result = Runner::fastStart(
273
                $request,
274
                $this->getConfiguration(),
275
                $this->getLogger(),
276
                $this->getTasksTable(),
277
                $this->getEvents()
278
            );
279
280
            $this->ipc->write($uid, serialize($result));
281
282
            $this->ipc->close($uid, Ipc::WRITER);
283
284
            exit(!$result->success);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
Bug Best Practice introduced by
The property success does not exist on Comodojo\Extender\Task\Result. Since you implemented __get, consider adding a @property annotation.
Loading history...
285
286
        }
287
288
        return $pid;
289
290
    }
291
292
    private function catcher_loop() {
293
294
        while ( !empty($this->tracker->getRunning()) ) {
295
296
            $this->catcher();
297
298
        }
299
300
        $this->restoreErrorHandler();
301
302
    }
303
304
    /**
305
     * Catch results from completed jobs
306
     *
307
     */
308
    private function catcher() {
309
310
        foreach ( $this->tracker->getRunning() as $uid => $request ) {
311
312
            if ( ProcessTools::isRunning($request->getPid()) === false ) {
313
314
                $this->ipc->close($uid, Ipc::WRITER);
315
316
                try {
317
318
                    $raw_output = $this->ipc->read($uid);
319
320
                    $result = unserialize(rtrim($raw_output));
321
322
                    $this->ipc->close($uid, Ipc::READER);
323
324
                } catch (Exception $e) {
325
326
                    $result = self::generateSyntheticResult($uid, $e->getMessage(), $request->getJid(), false);
0 ignored issues
show
Bug Best Practice introduced by
The method Comodojo\Extender\Task\M...nerateSyntheticResult() is not static, but was called statically. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

326
                    /** @scrutinizer ignore-call */ 
327
                    $result = self::generateSyntheticResult($uid, $e->getMessage(), $request->getJid(), false);
Loading history...
327
328
                }
329
330
                if ( $request->isChain() ) $this->evalChain($request, $result);
331
332
                $this->updateTrackerSetCompleted($uid, $result);
333
334
                $success = $result->success === false ? "error" : "success";
0 ignored issues
show
Bug Best Practice introduced by
The property success does not exist on Comodojo\Extender\Task\Result. Since you implemented __get, consider adding a @property annotation.
Loading history...
335
                $this->logger->notice("Task ".$request->getName()."(uid: ".$request->getUid().") ends in $success");
336
337
            } else {
338
339
                $current_time = microtime(true);
340
341
                $request_max_time = $request->getMaxtime();
342
                $maxtime = $request_max_time === null ? $this->max_runtime : $request_max_time;
343
344
                if ( $current_time > $request->getStartTimestamp() + $maxtime ) {
345
346
                    $pid = $request->getPid();
347
348
                    $this->logger->warning("Killing pid $pid due to maximum exec time reached", [
349
                        "START_TIME"    => $request->getStartTimestamp(),
350
                        "CURRENT_TIME"  => $current_time,
351
                        "MAX_RUNTIME"   => $maxtime
352
                    ]);
353
354
                    $kill = ProcessTools::term($pid, $this->lagger_timeout);
355
356
                    if ( $kill ) {
357
                        $this->logger->warning("Pid $pid killed");
358
                    } else {
359
                        $this->logger->warning("Pid $pid could not be killed");
360
                    }
361
362
                    $this->ipc->hang($uid);
363
364
                    $result = self::generateSyntheticResult($uid, "Job killed due to max runtime reached", $request->getJid(), false);
365
366
                    if ( $request->isChain() ) $this->evalChain($request, $result);
367
368
                    $this->updateTrackerSetCompleted($uid, $result);
369
370
                    $this->logger->notice("Task ".$request->getName()."(uid: $uid) ends in error");
371
372
                }
373
374
            }
375
376
        }
377
378
    }
379
380
    private function evalChain(Request $request, Result $result) {
381
382
        if ( $result->success && $request->hasOnDone() ) {
0 ignored issues
show
Bug Best Practice introduced by
The property success does not exist on Comodojo\Extender\Task\Result. Since you implemented __get, consider adding a @property annotation.
Loading history...
383
            $chain_done = $request->getOnDone();
384
            $chain_done->getParameters()->set('parent', $result);
385
            $chain_done->setParentUid($result->uid);
0 ignored issues
show
Bug Best Practice introduced by
The property uid does not exist on Comodojo\Extender\Task\Result. Since you implemented __get, consider adding a @property annotation.
Loading history...
386
            $this->add($chain_done);
387
        }
388
389
        if ( $result->success === false && $request->hasOnFail() ) {
390
            $chain_fail = $request->getOnFail();
391
            $chain_fail->getParameters()->set('parent', $result);
392
            $chain_fail->setParentUid($result->uid);
393
            $this->add($chain_fail);
394
        }
395
396
        if ( $request->hasPipe() ) {
397
            $chain_pipe = $request->getPipe();
398
            $chain_pipe->getParameters()->set('parent', $result);
399
            $chain_pipe->setParentUid($result->uid);
400
            $this->add($chain_pipe);
401
        }
402
403
    }
404
405
    private function generateSyntheticResult($uid, $message, $jid = null, $success = true) {
406
407
        return new Result([
408
            $uid,
409
            null,
410
            $jid,
411
            null,
412
            $success,
413
            new DateTime(),
414
            null,
415
            $message,
416
            null
417
        ]);
418
419
    }
420
421
    private function updateTrackerSetQueued() {
422
423
        $this->locker->lock([
424
            'QUEUED' => $this->tracker->countQueued()
425
        ]);
426
427
    }
428
429
    private function updateTrackerSetRunning($uid, $pid) {
430
431
        $this->tracker->setRunning($uid, $pid);
432
        $this->locker->lock([
433
            'QUEUED' => $this->tracker->countQueued(),
434
            'RUNNING' => $this->tracker->countRunning()
435
        ]);
436
437
    }
438
439
    private function updateTrackerSetCompleted($uid, $result) {
440
441
        $this->tracker->setCompleted($uid, $result);
442
443
        $lock_data = [
444
            'RUNNING' => $this->tracker->countRunning(),
445
            'COMPLETED' => 1
446
        ];
447
448
        if ( $result->success ) {
449
            $lock_data['SUCCEEDED'] = 1;
450
        } else {
451
            $lock_data['FAILED'] = 1;
452
        }
453
454
        $this->locker->lock($lock_data);
455
456
    }
457
458
    private function updateTrackerSetAborted($uid, $result) {
459
460
        $this->tracker->setAborted($uid, $result);
461
        $lock_data = [
0 ignored issues
show
Unused Code introduced by
The assignment to $lock_data is dead and can be removed.
Loading history...
462
            'RUNNING' => $this->tracker->countRunning(),
463
            'ABORTED' => 1
464
        ];
465
466
    }
467
468
}
469