Passed
Branch master (931c2d)
by Arul
29:18 queued 14:21
created

Async::throw()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 4
c 1
b 0
f 0
nc 2
nop 1
dl 0
loc 6
rs 10
1
<?php
2
3
namespace LogicalSteps\Async;
4
5
6
use Closure;
7
use Generator;
8
use Psr\Log\LoggerInterface;
9
use React\Promise\Promise;
10
use React\Promise\PromiseInterface;
11
use ReflectionException;
12
use ReflectionFunction;
13
use ReflectionFunctionAbstract;
14
use ReflectionGenerator;
15
use ReflectionMethod;
16
use ReflectionObject;
17
use Throwable;
18
use TypeError;
19
use function GuzzleHttp\Promise\all as guzzleAll;
20
use function React\Promise\all;
21
22
/**
23
 * @method static PromiseInterface await($process) await for the completion of an asynchronous process
24
 * @method PromiseInterface await($process) await for the completion of an asynchronous process
25
 *
26
 * @method static void await($process, callable $callback) await for the completion of an asynchronous process
27
 * @method void await($process, callable $callback) await for the completion of an asynchronous process
28
 *
29
 * @method static PromiseInterface awaitAll(array $processes) concurrently await for multiple processes
30
 * @method PromiseInterface awaitAll(array $processes) concurrently await for multiple processes
31
 *
32
 * @method static void awaitAll(array $processes, callable $callback) concurrently await for multiple processes
33
 * @method void awaitAll(array $processes, callable $callback) concurrently await for multiple processes
34
 *
35
 * @method static setLogger(LoggerInterface $param)
36
 * @method setLogger(LoggerInterface $param)
37
 */
38
class Async
39
{
40
    const PROMISE_REACT = 'React\Promise\PromiseInterface';
41
    const PROMISE_AMP = 'Amp\Promise';
42
    const PROMISE_GUZZLE = 'GuzzleHttp\Promise\PromiseInterface';
43
    const PROMISE_HTTP = 'Http\Promise\Promise';
44
45
    const promise = 'promise';
46
    const parallel = 'parallel';
47
    const all = 'all';
48
    const await = 'await';
49
    const later = 'later';
50
51
    const ACTIONS = [self::await, self::parallel, self::all, self::promise, self::later];
52
53
    public static $knownPromises = [
54
        self::PROMISE_REACT,
55
        self::PROMISE_AMP,
56
        self::PROMISE_GUZZLE,
57
        self::PROMISE_HTTP,
58
    ];
59
60
    /**
61
     * @var LoggerInterface
62
     */
63
    protected $logger;
64
    /**
65
     * @var bool
66
     */
67
    public $waitForGuzzleAndHttplug = true;
68
69
    /**
70
     * @var bool
71
     */
72
    protected $parallelGuzzleLoading = false;
73
74
    protected $guzzlePromises = [];
75
76
    public function __construct(LoggerInterface $logger = null)
77
    {
78
        if ($logger) {
79
            $this->logger = $logger;
80
        }
81
    }
82
83
    public function __call($name, $arguments)
84
    {
85
        $value = null;
86
        switch ($name) {
87
            case 'await':
88
            case 'awaitAll':
89
                if ($this->logger) {
90
                    $this->logger->info('start');
91
                }
92
                if (1 == count($arguments)) {
93
                    //return promise
94
                    list($value, $resolver, $rejector) = $this->makePromise();
95
                    $callback = function ($error = null, $result = null) use ($resolver, $rejector) {
96
                        if ($this->logger) {
97
                            $this->logger->info('end');
98
                        }
99
                        if ($error) {
100
                            return $rejector($error);
101
                        }
102
                        $resolver($result);
103
                    };
104
                    $arguments[1] = $callback;
105
                } elseif ($this->logger) {
106
                    $c = $arguments[1];
107
                    $callback = function ($error, $result = null) use ($c) {
108
                        $this->logger->info('end');
109
                        $c($error, $result);
110
                    };
111
                    $arguments[1] = $callback;
112
                }
113
            case 'setLogger':
114
                $name = "_$name";
115
                break;
116
            default:
117
                return null;
118
        }
119
        call_user_func_array([$this, $name], $arguments);
120
        return $value;
121
    }
122
123
    public static function __callStatic($name, $arguments)
124
    {
125
        static $instance;
126
        if (!$instance) {
127
            $instance = new static();
128
        }
129
        return $instance->__call($name, $arguments);
130
    }
131
132
    /**
133
     * Throws specified or subclasses of specified exception inside the generator class so that it can be handled.
134
     *
135
     * @param string $throwable
136
     * @return string command
137
     *
138
     * @throws TypeError when given value is not a valid exception
139
     */
140
    public static function throw(string $throwable): string
141
    {
142
        if (is_a($throwable, Throwable::class, true)) {
143
            return __FUNCTION__ . ':' . $throwable;
144
        }
145
        throw new TypeError('Invalid value for throwable, it must extend Throwable class');
146
    }
147
148
    /**
149
     * Run this side by side with the remainder of the process
150
     *
151
     * @return string
152
     */
153
    public static function parallel(): string
154
    {
155
        return __FUNCTION__;
156
    }
157
158
    /**
159
     * Await for all parallel processes previously to finish
160
     *
161
     * @return string
162
     */
163
    public static function all(): string
164
    {
165
        return __FUNCTION__;
166
    }
167
168
    /**
169
     * Return a promise instead of awaiting the response of the process
170
     *
171
     * @return string
172
     */
173
    public static function promise(): string
174
    {
175
        return __FUNCTION__;
176
    }
177
178
    protected function _awaitAll(array $processes, callable $callback): void
179
    {
180
        $this->parallelGuzzleLoading = true;
181
        $results = [];
182
        $failed = false;
183
        foreach ($processes as $key => $process) {
184
            if ($failed)
185
                break;
186
            $c = function ($error = null, $result = null) use ($key, &$results, $processes, $callback, &$failed) {
187
                if ($failed)
188
                    return;
189
                if ($error) {
190
                    $failed = true;
191
                    $callback($error);
192
                    return;
193
                }
194
                $results[$key] = $result;
195
                if (count($results) == count($processes)) {
196
                    $callback(null, $results);
197
                }
198
            };
199
            $this->_await($process, $c);
200
        }
201
        if (!empty($this->guzzlePromises)) {
202
            guzzleAll($this->guzzlePromises)->wait(false);
203
            $this->guzzlePromises = [];
204
            $this->parallelGuzzleLoading = false;
205
        }
206
    }
207
208
    public function _await($process, callable $callback): void
209
    {
210
        $this->_handle($process, $callback, -1);
211
    }
212
213
    /**
214
     * Sets a logger instance on the object.
215
     *
216
     * @param LoggerInterface $logger
217
     *
218
     * @return void
219
     */
220
    protected function _setLogger(LoggerInterface $logger)
221
    {
222
        $this->logger = $logger;
223
    }
224
225
    private function makePromise()
226
    {
227
        $resolver = $rejector = null;
228
        $promise = new Promise(function ($resolve, $reject, $notify) use (&$resolver, &$rejector) {
229
            $resolver = $resolve;
230
            $rejector = $reject;
231
        });
232
        return [$promise, $resolver, $rejector];
233
    }
234
235
    protected function _handle($process, callable $callback, int $depth = 0): void
236
    {
237
        $arguments = [];
238
        $func = [];
239
        if (is_array($process) && count($process) > 1) {
240
            $copy = $process;
241
            $func[] = array_shift($copy);
242
            if (is_callable($func[0])) {
243
                $func = $func[0];
244
            } else {
245
                $func[] = array_shift($copy);
246
            }
247
            $arguments = $copy;
248
        } else {
249
            $func = $process;
250
        }
251
        if (is_callable($func)) {
252
            $this->_handleCallback($func, $arguments, $callback, $depth);
253
        } elseif ($process instanceof Generator) {
254
            $this->_handleGenerator($process, $callback, 1 + $depth);
255
        } elseif (is_object($process) && $implements = array_intersect(class_implements($process),
256
                Async::$knownPromises)) {
257
            $this->_handlePromise($process, array_shift($implements), $callback, $depth);
258
        } else {
259
            $callback(null, $process);
260
        }
261
    }
262
263
264
    protected function _handleCallback(callable $callable, array $parameters, callable $callback, int $depth = 0)
265
    {
266
        $this->logCallback($callable, $parameters, $depth);
267
        try {
268
            if (is_array($callable)) {
269
                $rf = new ReflectionMethod($callable[0], $callable[1]);
270
            } elseif (is_string($callable)) {
271
                $rf = new ReflectionFunction($callable);
272
            } elseif (is_a($callable, 'Closure') || is_callable($callable, '__invoke')) {
0 ignored issues
show
Bug introduced by
'__invoke' of type string is incompatible with the type boolean expected by parameter $syntax_only of is_callable(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

272
            } elseif (is_a($callable, 'Closure') || is_callable($callable, /** @scrutinizer ignore-type */ '__invoke')) {
Loading history...
Bug introduced by
$callable of type callable is incompatible with the type object|string expected by parameter $object of is_a(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

272
            } elseif (is_a(/** @scrutinizer ignore-type */ $callable, 'Closure') || is_callable($callable, '__invoke')) {
Loading history...
273
                $ro = new ReflectionObject($callable);
0 ignored issues
show
Bug introduced by
$callable of type callable is incompatible with the type object expected by parameter $argument of ReflectionObject::__construct(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

273
                $ro = new ReflectionObject(/** @scrutinizer ignore-type */ $callable);
Loading history...
274
                $rf = $ro->getMethod('__invoke');
275
            }
276
            $current = count($parameters);
277
            $total = $rf->getNumberOfParameters();
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $rf does not seem to be defined for all execution paths leading up to this point.
Loading history...
278
            $ps = $rf->getParameters();
279
            if ($current + 1 < $total) {
280
                for ($i = $current; $i < $total - 1; $i++) {
281
                    $parameters[$i] = $ps[$i]->isDefaultValueAvailable() ? $ps[$i]->getDefaultValue() : null;
282
                }
283
            }
284
        } catch (ReflectionException $e) {
285
            //ignore
286
        }
287
        $parameters[] = $callback;
288
        call_user_func_array($callable, $parameters);
289
    }
290
291
    protected function _handleGenerator(Generator $flow, callable $callback, int $depth = 0)
292
    {
293
        $this->logGenerator($flow, $depth - 1);
294
        try {
295
            if (!$flow->valid()) {
296
                $callback(null, $flow->getReturn());
297
                if (!empty($flow->later)) {
298
                    $this->_awaitAll($flow->later, function ($error = null, $results = null) {
0 ignored issues
show
Bug introduced by
The property later does not seem to exist on Generator.
Loading history...
299
                    });
300
                    unset($flow->later);
301
                }
302
                return;
303
            }
304
            $value = $flow->current();
305
            $actions = $this->parse($flow->key() ?: Async::await);
306
            $next = function ($error = null, $result = null) use ($flow, $actions, $callback, $depth) {
307
                $value = $error ?: $result;
308
                if ($value instanceof Throwable) {
309
                    if (isset($actions['throw']) && is_a($value, $actions['throw'])) {
310
                        $flow->throw($value);
0 ignored issues
show
Bug introduced by
The method throw() does not exist on Generator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

310
                        $flow->/** @scrutinizer ignore-call */ 
311
                               throw($value);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
311
                        $this->_handleGenerator($flow, $callback, $depth);
312
                        return;
313
                    }
314
                    $callback($value, null);
315
                    return;
316
                }
317
                $flow->send($value);
318
                $this->_handleGenerator($flow, $callback, $depth);
319
            };
320
            if (key_exists(self::later, $actions)) {
321
                if (!isset($flow->later)) {
322
                    $flow->later = [];
323
                }
324
                if ($this->logger) {
325
                    $this->logger->info('later task scheduled', compact('depth'));
326
                }
327
                $flow->later[] = $value;
328
                return $next(null, $value);
329
            }
330
            if (key_exists(self::parallel, $actions)) {
331
                if (!isset($flow->parallel)) {
332
                    $flow->parallel = [];
0 ignored issues
show
Bug introduced by
The property parallel does not seem to exist on Generator.
Loading history...
333
                }
334
                $flow->parallel[] = $value;
335
                if (!isset($this->action)) {
336
                    $this->action = [];
0 ignored issues
show
Bug Best Practice introduced by
The property action does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
337
                }
338
                $this->action[] = self::parallel;
339
                return $next(null, $value);
340
            }
341
            if (key_exists(self::all, $actions)) {
342
                $tasks = Async::parallel === $value && isset($flow->parallel) ? $flow->parallel : $value;
343
                unset($flow->parallel);
344
                if (is_array($tasks) && count($tasks)) {
345
                    if ($this->logger) {
346
                        $this->logger->info(
347
                            sprintf("all {%d} tasks awaited.", count($tasks)),
348
                            compact('depth')
349
                        );
350
                    }
351
                    return $this->_awaitAll($tasks, $next);
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->_awaitAll($tasks, $next) targeting LogicalSteps\Async\Async::_awaitAll() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
352
                }
353
                return $next(null, []);
354
            }
355
            $this->_handle($value, $next, $depth);
356
        } catch (Throwable $throwable) {
357
            $callback($throwable, null);
358
        }
359
    }
360
361
    /**
362
     * Handle known promise interfaces
363
     *
364
     * @param \React\Promise\PromiseInterface|\GuzzleHttp\Promise\PromiseInterface|\Amp\Promise|\Http\Promise\Promise $knownPromise
365
     * @param string $interface
366
     * @param callable $callback
367
     * @param int $depth
368
     * @return void
369
     * @throws \Exception
370
     */
371
    protected function _handlePromise($knownPromise, string $interface, callable $callback, int $depth = 0)
372
    {
373
        $this->logPromise($knownPromise, $interface, $depth);
374
        $resolver = function ($result) use ($callback) {
375
            $callback(null, $result);
376
        };
377
        $rejector = function ($error) use ($callback) {
378
            $callback($error, null);
379
        };
380
        switch ($interface) {
381
            case static::PROMISE_REACT:
382
                $knownPromise->then($resolver, $rejector);
0 ignored issues
show
Bug introduced by
The method then() does not exist on Amp\Promise. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

382
                $knownPromise->/** @scrutinizer ignore-call */ 
383
                               then($resolver, $rejector);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
383
                break;
384
            case static::PROMISE_GUZZLE:
385
                $knownPromise->then($resolver, $rejector);
386
                if ($this->waitForGuzzleAndHttplug) {
387
                    if ($this->parallelGuzzleLoading) {
388
                        $this->guzzlePromises[] = $knownPromise;
389
                    } else {
390
                        $knownPromise->wait(false);
0 ignored issues
show
Bug introduced by
The method wait() does not exist on Amp\Promise. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

390
                        $knownPromise->/** @scrutinizer ignore-call */ 
391
                                       wait(false);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
Bug introduced by
The method wait() does not exist on React\Promise\PromiseInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

390
                        $knownPromise->/** @scrutinizer ignore-call */ 
391
                                       wait(false);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
391
                    }
392
                }
393
                break;
394
            case static::PROMISE_HTTP:
395
                $knownPromise->then($resolver, $rejector);
396
                if ($this->waitForGuzzleAndHttplug) {
397
                    $knownPromise->wait(false);
398
                }
399
                break;
400
            case static::PROMISE_AMP:
401
                $knownPromise->onResolve(
0 ignored issues
show
Bug introduced by
The method onResolve() does not exist on GuzzleHttp\Promise\PromiseInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

401
                $knownPromise->/** @scrutinizer ignore-call */ 
402
                               onResolve(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
Bug introduced by
The method onResolve() does not exist on Http\Promise\Promise. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

401
                $knownPromise->/** @scrutinizer ignore-call */ 
402
                               onResolve(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
Bug introduced by
The method onResolve() does not exist on React\Promise\PromiseInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

401
                $knownPromise->/** @scrutinizer ignore-call */ 
402
                               onResolve(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
402
                    function ($error = null, $result = null) use ($resolver, $rejector) {
403
                        $error ? $rejector($error) : $resolver($result);
404
                    });
405
                break;
406
        }
407
    }
408
409
    private function handleCommands(Generator $flow, &$value, callable $callback, int $depth): bool
410
    {
411
        $commands = $this->parse($flow->key());
412
        if ($value instanceof Throwable) {
413
            if (isset($commands['throw']) && is_a($value, $commands['throw'])) {
414
                $flow->throw($value);
415
                $this->_handleGenerator($flow, $callback, $depth);
416
                return true; //stop
417
            }
418
            $callback($value, null);
419
            return true; //stop
420
        }
421
        if (isset($commands[self::parallel])) {
422
            if (!isset($flow->parallel)) {
423
                $flow->parallel = [];
0 ignored issues
show
Bug introduced by
The property parallel does not seem to exist on Generator.
Loading history...
424
            }
425
            $flow->parallel [] = $value;
426
            return false; //continue
427
        }
428
429
        if (isset($commands[self::all])) {
430
            if (!isset($flow->parallel)) {
431
                $callback(null, []);
432
                return true; //stop
433
            }
434
            $this->_awaitAll(
435
                $flow->parallel,
436
                function ($error = null, $all = null) use ($flow, $callback, $depth) {
437
                    if ($error) {
438
                        $callback($error, false);
439
                        return;
440
                    }
441
                    $flow->send($all);
442
                    $this->_handleGenerator($flow, $callback, $depth);
443
                }
444
            );
445
            return true; //stop
446
        }
447
448
        return false; //continue
449
    }
450
451
    private function parse(string $command): array
452
    {
453
        $arr = [];
454
        if (strlen($command)) {
455
            parse_str(str_replace(['|', ':'], ['&', '='], $command), $arr);
456
        }
457
        return $arr;
458
    }
459
460
    private function action()
461
    {
462
        if (!empty($this->action)) {
463
            return array_shift($this->action);
464
        }
465
        return self::await;
466
    }
467
468
    private function logCallback(callable $callable, array $parameters, int $depth = 0)
469
    {
470
        if ($depth < 0 || !$this->logger) {
471
            return;
472
        }
473
        if (is_array($callable)) {
474
            $name = $callable[0];
475
            if (is_object($name)) {
476
                $name = '$' . lcfirst(get_class($name)) . '->' . $callable[1];
477
            } else {
478
                $name .= '::' . $callable[1];
479
            }
480
481
        } else {
482
            if (is_string($callable)) {
483
                $name = $callable;
484
            } elseif ($callable instanceof Closure) {
485
                $name = '$closure';
486
            } else {
487
                $name = '$callable';
488
            }
489
        }
490
        $this->logger->info(
491
            sprintf("%s %s%s", $this->action(), $name, $this->format($parameters)),
492
            compact('depth')
493
        );
494
    }
495
496
    private function logPromise($promise, string $interface, int $depth)
497
    {
498
        if ($depth < 0 || !$this->logger) {
499
            return;
500
        }
501
        switch ($interface) {
502
            case static::PROMISE_REACT:
503
                $type = 'react';
504
                break;
505
            case static::PROMISE_GUZZLE:
506
                $type = 'guzzle';
507
                break;
508
            case static::PROMISE_HTTP:
509
                $type = 'httplug';
510
                break;
511
            case static::PROMISE_AMP:
512
                $type = 'amp';
513
                break;
514
        }
515
        $this->logger->info(
516
            sprintf("%s \$%sPromise;", $this->action(), $type),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $type does not seem to be defined for all execution paths leading up to this point.
Loading history...
517
            compact('depth')
518
        );
519
    }
520
521
    private function logGenerator(Generator $generator, int $depth = 0)
522
    {
523
        if ($depth < 0 || !$generator->valid() || !$this->logger) {
524
            return;
525
        }
526
        $info = new ReflectionGenerator($generator);
527
        $this->logReflectionFunction($info->getFunction(), $depth);
528
    }
529
530
    private function format($parameters)
531
    {
532
        return '(' . substr(json_encode($parameters), 1, -1) . ');';
533
    }
534
535
    private function logReflectionFunction(ReflectionFunctionAbstract $function, int $depth = 0)
536
    {
537
        if ($function instanceof ReflectionMethod) {
538
            $name = $function->getDeclaringClass()->getShortName();
539
            if ($function->isStatic()) {
540
                $name .= '::' . $function->name;
541
            } else {
542
                $name = '$' . lcfirst($name) . '->' . $function->name;
543
            }
544
        } elseif ($function->isClosure()) {
545
            $name = '$closure';
546
        } else {
547
            $name = $function->name;
548
        }
549
        $args = [];
550
        foreach ($function->getParameters() as $parameter) {
551
            $args[] = '$' . $parameter->name;
552
        }
553
        $this->logger->info(
554
            sprintf("%s %s(%s);", $this->action(), $name, implode(', ', $args)),
555
            compact('depth')
556
        );
557
    }
558
}
559