Passed
Push — fix_bc_break ( 0db1fe )
by Koldo
03:23
created

MiddlewarePipeline::process()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 24
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 15
c 1
b 0
f 1
dl 0
loc 24
ccs 15
cts 15
cp 1
rs 9.7666
cc 3
nc 2
nop 2
crap 3
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Antidot\React;
6
7
use Antidot\Application\Http\Handler\NextHandler;
8
use Antidot\Application\Http\Middleware\MiddlewareQueue;
9
use Antidot\Application\Http\Middleware\Pipeline;
10
use Antidot\Application\Http\Middleware\SyncMiddlewareQueue;
11
use Psr\Http\Message\ResponseInterface;
12
use Psr\Http\Message\ServerRequestInterface;
13
use Psr\Http\Server\MiddlewareInterface;
14
use Psr\Http\Server\RequestHandlerInterface;
15
use Ramsey\Uuid\Uuid;
16
use SplQueue;
17
use Throwable;
18
use function React\Promise\reject;
19
use function React\Promise\resolve;
20
21
class MiddlewarePipeline implements Pipeline
22
{
23
    /** @var array<MiddlewareQueue> */
24
    public array $concurrentPipelines;
25
    /** @var array<MiddlewareInterface> */
26
    private array $middlewareCollection;
27
28
    /**
29
     * @param array<MiddlewareInterface> $middlewareCollection
30
     * @param array<MiddlewareQueue> $concurrentPipelines
31
     */
32 5
    public function __construct(
33
        array $middlewareCollection = [],
34
        array $concurrentPipelines = []
35
    ) {
36 5
        $this->concurrentPipelines = $concurrentPipelines;
37 5
        $this->middlewareCollection = $middlewareCollection;
38 5
    }
39
40 4
    public function pipe(MiddlewareInterface $middleware): void
41
    {
42 4
        $this->middlewareCollection[] = $middleware;
43 4
    }
44
45 2
    public function handle(ServerRequestInterface $request): ResponseInterface
46
    {
47
        /** @var string $requestId */
48 2
        $requestId = $request->getAttribute('request_id');
49 2
        $this->setCurrentPipeline($requestId);
50
51 2
        return new PromiseResponse(resolve($request)->then(
52 2
            function (ServerRequestInterface $request) {
53
                /** @var string $requestId */
54 2
                $requestId = $request->getAttribute('request_id');
55
                try {
56
                    /** @var MiddlewareInterface $middleware */
57 2
                    $middleware = $this->concurrentPipelines[$requestId]->dequeue();
58
59 2
                    $response = $middleware->process($request, $this);
60 1
                    unset($this->concurrentPipelines[$requestId]);
61
62 1
                    return resolve($response);
63 1
                } catch (Throwable $exception) {
64 1
                    unset($this->concurrentPipelines[$requestId]);
65
66 1
                    return reject($exception);
67
                }
68 2
            }
69
        ));
70
    }
71
72 2
    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
73
    {
74
        /** @var ?string $requestId */
75 2
        $requestId = $request->getAttribute('request_id');
76 2
        if (!$requestId) {
77 1
            $requestId = Uuid::uuid4()->toString();
78 1
            $request = $request->withAttribute('request_id', $requestId);
79
        }
80 2
        $this->setCurrentPipeline($requestId);
81
82 2
        return new PromiseResponse(resolve($request)
83 2
            ->then(function (ServerRequestInterface $request) use ($handler) {
84
                /** @var string $requestId */
85 2
                $requestId = $request->getAttribute('request_id');
86
                try {
87
                    /** @var MiddlewareQueue $queue */
88 2
                    $queue = $this->concurrentPipelines[$requestId];
89 2
                    $next = new NextHandler($queue, $handler);
90
91 2
                    return resolve($next->handle($request));
92 1
                } catch (Throwable $exception) {
93 1
                    unset($this->concurrentPipelines[$requestId]);
94
95 1
                    return reject($exception);
96
                }
97 2
            }));
98
    }
99
100 4
    private function setCurrentPipeline(string $requestId): void
101
    {
102 4
        if (empty($this->concurrentPipelines[$requestId])) {
103 4
            $queue = new SyncMiddlewareQueue();
104 4
            foreach ($this->middlewareCollection as $middlewareName) {
105 4
                $queue->enqueue($middlewareName);
106
            }
107 4
            $this->concurrentPipelines[$requestId] = $queue;
108
        }
109 4
    }
110
}
111