Completed
Push — master ( 6950c0...f81fef )
by Arul
39:28 queued 24:29
created

Async::awaitAll()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
namespace LogicalSteps\Async;
4
5
use Closure;
6
use Generator;
7
use Psr\Log\LoggerAwareInterface;
8
use Psr\Log\LoggerInterface;
9
use React\EventLoop\LoopInterface;
10
use function React\Promise\all;
11
use React\Promise\Deferred;
12
use React\Promise\Promise;
13
use ReflectionFunctionAbstract;
14
use ReflectionGenerator;
15
use ReflectionMethod;
16
use Throwable;
17
18
class Async implements LoggerAwareInterface
19
{
20
    const PROMISE_REACT = 'React\Promise\PromiseInterface';
21
    const PROMISE_GUZZLE = 'GuzzleHttp\Promise\PromiseInterface';
22
    const PROMISE_HTTP = 'Http\Promise\Promise';
23
    const PROMISE_AMP = 'Amp\Promise';
24
    /**
25
     * @var LoopInterface
26
     */
27
    protected $loop;
28
    /**
29
     * @var LoggerInterface
30
     */
31
    protected $logger;
32
33
    /**
34
     * @var callable that maps to _execute or _executeReactLoop ...
35
     */
36
    protected $exec;
37
38
    public function __construct(LoggerInterface $logger = null)
39
    {
40
        if ($logger) {
41
            $this->logger = $logger;
42
        }
43
        $this->exec = [$this, '_execute'];
44
    }
45
46
    public function await(Generator $flow): Promise
47
    {
48
        if ($this->logger) {
49
            $this->logger->info('start');
50
        }
51
        $deferred = new Deferred();
52
        ($this->exec)($flow, function ($error, $result) use ($deferred) {
53
            if ($this->logger) {
54
                $this->logger->info('end');
55
            }
56
            if ($error) {
57
                return $deferred->reject($error);
58
            }
59
            $deferred->resolve($result);
60
        });
61
62
        return $deferred->promise();
63
    }
64
65
    public function awaitAll(Generator ...$flows): Promise
66
    {
67
        return all(array_map([$this, 'await'], $flows));
68
    }
69
70
    private function _execute(Generator $flow, callable $callback = null, int $depth = 0)
71
    {
72
        $this->_run($flow, $callback, $depth);
73
    }
74
75
    private function _executeReactLoop(Generator $flow, callable $callback = null, int $depth = 0)
76
    {
77
        $this->loop->futureTick(function () use ($flow, $callback, $depth) {
78
            $this->_run($flow, $callback, $depth);
79
        });
80
    }
81
82
    private function _executeAmpLoop(Generator $flow, callable $callback = null, int $depth = 0)
83
    {
84
85
        ('\Amp\Loop::defer')(function () use ($flow, $callback, $depth) {
86
            $this->_run($flow, $callback, $depth);
87
        });
88
    }
89
90
91
    private function _run(Generator $flow, callable $callback = null, int $depth = 0)
92
    {
93
        try {
94
            if (!$flow->valid()) {
95
                $value = $flow->getReturn();
96
                if ($value instanceof Generator) {
97
                    $this->logGenerator($value, $depth);
98
                    ($this->exec)($value, $callback, $depth + 1);
99
                } elseif (is_callable($callback)) {
100
                    $callback(null, $value);
101
                }
102
103
                return;
104
            }
105
            $value = $flow->current();
106
            $args = [];
107
            $func = [];
108
            if (is_array($value) && count($value) > 1) {
109
                $func[] = array_shift($value);
110
                if (is_callable($func[0])) {
111
                    $func = $func[0];
112
                } else {
113
                    $func[] = array_shift($value);
114
                }
115
                $args = $value;
116
            } else {
117
                $func = $value;
118
            }
119
            if (is_callable($func)) {
120
                $this->logCallable($func, $args, $depth);
121 View Code Duplication
                $args[] = function ($error, $result) use ($flow, $callback, $depth) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
122
                    if ($error) {
123
                        if ($this->logger) {
124
                            $this->logger->error((string)$error, compact('depth'));
125
                        }
126
                        if (is_callable($callback)) {
127
                            $callback($error);
128
                        }
129
130
                        return;
131
                    }
132
                    $flow->send($result);
133
                    ($this->exec)($flow, $callback, $depth);
134
                };
135
                call_user_func_array($func, $args);
136
            } elseif ($value instanceof Generator) {
137
                $this->logGenerator($value, $depth);
138 View Code Duplication
                ($this->exec)($value, function ($error, $result) use ($flow, $callback, $depth) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
139
                    if ($error) {
140
                        if ($this->logger) {
141
                            $this->logger->error((string)$error);
142
                        }
143
                        if (is_callable($callback)) {
144
                            $callback($error);
145
                        }
146
147
                        return;
148
                    }
149
                    $flow->send($result);
150
                    ($this->exec)($flow, $callback, $depth);
151
                }, $depth + 1);
152
            } elseif (is_a($value, static::PROMISE_REACT)) {
153
                $this->handlePromise($flow, $callback, $depth, $value, 'react');
0 ignored issues
show
Bug introduced by
It seems like $callback defined by parameter $callback on line 91 can also be of type null; however, LogicalSteps\Async\Async::handlePromise() does only seem to accept callable, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
154
            } elseif (is_a($value, static::PROMISE_GUZZLE)) {
155
                $this->handlePromise($flow, $callback, $depth, $value, 'guzzle');
0 ignored issues
show
Bug introduced by
It seems like $callback defined by parameter $callback on line 91 can also be of type null; however, LogicalSteps\Async\Async::handlePromise() does only seem to accept callable, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
156
                $value->wait(false);
157
            } elseif (is_a($value, static::PROMISE_HTTP)) {
158
                $this->handlePromise($flow, $callback, $depth, $value, 'httplug');
0 ignored issues
show
Bug introduced by
It seems like $callback defined by parameter $callback on line 91 can also be of type null; however, LogicalSteps\Async\Async::handlePromise() does only seem to accept callable, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
159
                $value->wait(false);
160
            } elseif (is_a($value, static::PROMISE_AMP)) {
161
                if ($this->logger) {
162
                    $this->logger->info('await $ampPromise;');
163
                }
164
                $value->onResolve(
165 View Code Duplication
                    function ($error, $result) use ($flow, $callback, $depth) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
166
                        if ($error) {
167
                            if ($this->logger) {
168
                                $this->logger->error((string)$error);
169
                            }
170
                            if (is_callable($callback)) {
171
                                $callback($error);
172
                            }
173
174
                            return;
175
                        }
176
                        $flow->send($result);
177
                        ($this->exec)($flow, $callback, $depth);
178
                    }
179
                );
180
            } else {
181
                $flow->send($value);
182
                ($this->exec)($flow, $callback, $depth);
183
            }
184
185
        } catch
186
        (Throwable $t) {
187
            $flow->throw($t);
188
            ($this->exec)($flow);
189
        }
190
    }
191
192
    /**
193
     * Handle known promise interfaces
194
     *
195
     * @param Generator $flow
196
     * @param callable $callback
197
     * @param int $depth
198
     * @param \React\Promise\PromiseInterface|\GuzzleHttp\Promise\PromiseInterface $value
199
     * @param string $type
200
     */
201
    private function handlePromise(Generator $flow, callable $callback, int $depth, $value, string $type)
202
    {
203
        if ($this->logger) {
204
            $this->logger->info('await $' . $type . 'Promise;');
205
        }
206
        $value->then(
207
            function ($result) use ($flow, $callback, $depth) {
208
                $flow->send($result);
209
                ($this->exec)($flow, $callback, $depth);
210
            },
211
            function ($error) use ($callback, $depth) {
212
                if ($this->logger) {
213
                    $this->logger->error((string)$error, compact('depth'));
214
                }
215
                if (is_callable($callback)) {
216
                    $callback($error);
217
                }
218
            }
219
        );
220
    }
221
222
    private function logCallable(callable $callable, array $arguments, int $depth = 0)
223
    {
224
        if (!$this->logger) {
225
            return;
226
        }
227
        if (is_array($callable)) {
228
            $name = $callable[0];
229
            if (is_object($name)) {
230
                $name = '$' . lcfirst(get_class($name)) . '->' . $callable[1];
231
            } else {
232
                $name .= '::' . $callable[1];
233
            }
234
235
        } else {
236
            if (is_string($callable)) {
237
                $name = $callable;
238
            } elseif ($callable instanceof Closure) {
239
                $name = '$closure';
240
            } else {
241
                $name = '$callable';
242
            }
243
        }
244
        $this->logger->info('await ' . $name . $this->format($arguments), compact('depth'));
245
246
    }
247
248
    private function logReflectionFunction(ReflectionFunctionAbstract $function, int $depth = 0)
249
    {
250
        if ($function instanceof ReflectionMethod) {
251
            $name = $function->getDeclaringClass()->getShortName();
252
            if ($function->isStatic()) {
253
                $name .= '::' . $function->name;
254
            } else {
255
                $name = '$' . lcfirst($name) . '->' . $function->name;
256
            }
257
        } elseif ($function->isClosure()) {
258
            $name = '$closure';
259
        } else {
260
            $name = $function->name;
261
        }
262
        $args = [];
263
        foreach ($function->getParameters() as $parameter) {
264
            $args[] = '$' . $parameter->name;
265
        }
266
        $this->logger->info('await ' . $name . '(' . implode(', ', $args) . ');', compact('depth'));
267
    }
268
269
    private function logGenerator(Generator $generator, int $depth = 0)
270
    {
271
        if (!$generator->valid() || !$this->logger) {
272
            return;
273
        }
274
        $info = new ReflectionGenerator($generator);
275
        $this->logReflectionFunction($info->getFunction(), $depth);
276
    }
277
278
    private function format($parameters)
279
    {
280
        return '(' . substr(json_encode($parameters), 1, -1) . ');';
281
    }
282
283
    /**
284
     * A method used to test whether this class is autoloaded.
285
     *
286
     * @return bool
287
     *
288
     * @see \LogicalSteps\Async\Test\DummyTest
289
     */
290
    public function autoloaded()
291
    {
292
        return true;
293
    }
294
295
    /**
296
     * Sets a logger instance on the object.
297
     *
298
     * @param LoggerInterface $logger
299
     *
300
     * @return void
301
     */
302
    public function setLogger(LoggerInterface $logger)
303
    {
304
        $this->logger = $logger;
305
    }
306
307
    /**
308
     * @param LoopInterface $loop
309
     */
310
    public function setLoop(LoopInterface $loop)
311
    {
312
        $this->loop = $loop;
313
        $this->exec = [$this, '_executeReactLoop'];
314
    }
315
316
    public function useAmpLoop()
317
    {
318
        $this->exec = [$this, '_executeAmpLoop'];
319
    }
320
}
321