MiddlewarePipeline::process()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 3
c 1
b 0
f 0
nc 1
nop 2
dl 0
loc 5
ccs 0
cts 4
cp 0
crap 2
rs 10
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Antidot\React;
6
7
use Psr\Http\Message\ResponseInterface;
8
use Psr\Http\Message\ServerRequestInterface;
9
use Psr\Http\Server\MiddlewareInterface;
10
use Psr\Http\Server\RequestHandlerInterface;
11
use React\EventLoop\LoopInterface;
12
use React\Promise\Promise;
13
use React\Promise\PromiseInterface;
14
use Recoil\React\ReactKernel;
15
use Throwable;
16
use function React\Promise\resolve;
17
18
final class MiddlewarePipeline implements CallablePipeline
19
{
20
    private array $middleware;
21
    private ReactKernel $kernel;
22
23
    public function __construct(LoopInterface $loop)
24
    {
25
        $this->kernel = ReactKernel::create($loop);
26
        $this->middleware = [];
27
    }
28
29
    public function pipe(MiddlewareInterface $middleware): void
30
    {
31
        $func = static function ($middleware) {
32
            return $middleware;
33
        };
34
35
        $this->middleware[] = $func($middleware);
36
    }
37
38
    public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface
39
    {
40
        $stack = $this->createStack($next);
41
        $kernel = $this->kernel;
42
43
        return new Promise(static function ($resolve, $reject) use ($kernel, $request, $next, $stack) {
44
            $kernel->execute(static function () use ($resolve, $reject, $request, $next, $stack) {
45
                try {
46
                    $response = $stack($request, $next);
47
                    if ($response instanceof ResponseInterface) {
48
                        $response = resolve($response);
49
                    }
50
                    $response = (yield $response);
51
                    $resolve($response);
52
                } catch (Throwable $throwable) {
53
                    $reject($throwable);
54
                }
55
            });
56
        });
57
    }
58
59
    private function createStack($next): callable
60
    {
61
        $stack = static function (ServerRequestInterface $request) use ($next) {
62
            $response = $next($request);
63
            if ($response instanceof ResponseInterface) {
64
                $response = resolve($response);
65
            }
66
            return (yield $response);
67
        };
68
69
        $middleware = $this->middleware;
70
        $middleware = array_reverse($middleware);
71
        foreach ($middleware as $mw) {
72
            $mwh = clone $mw;
73
            $stack = static function (ServerRequestInterface $request) use ($stack, $mwh) {
74
                $response = $mwh->process($request, new PassThroughRequestHandler($stack));
75
                if ($response instanceof ResponseInterface) {
76
                    $response = resolve($response);
77
                }
78
                return (yield $response);
79
            };
80
        }
81
82
        return $stack;
83
    }
84
85
    public function handle(ServerRequestInterface $request): ResponseInterface
86
    {
87
        return new PromiseResponse(resolve(clone $this)->then(fn($next) => $this->__invoke($request, $next)));
88
    }
89
90
    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
91
    {
92
        return new PromiseResponse(resolve(static function ($handler) {
93
            return $handler;
94
        })->then(fn($func) => $this->__invoke($request, $func($handler))));
95
    }
96
}
97