MiddlewarePipeline   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 87
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 1
Metric Value
wmc 10
eloc 38
c 1
b 0
f 1
dl 0
loc 87
ccs 43
cts 43
cp 1
rs 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
A process() 0 24 3
A pipe() 0 3 1
A setCurrentPipeline() 0 8 3
A __construct() 0 6 1
A handle() 0 22 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Antidot\React;
6
7
use Antidot\Application\Http\Handler\NextHandler;
8
use Antidot\Application\Http\Middleware\Pipeline;
9
use Psr\Http\Message\ResponseInterface;
10
use Psr\Http\Message\ServerRequestInterface;
11
use Psr\Http\Server\MiddlewareInterface;
12
use Psr\Http\Server\RequestHandlerInterface;
13
use Ramsey\Uuid\Uuid;
14
use SplQueue;
15
use Throwable;
16
use function React\Promise\reject;
17
use function React\Promise\resolve;
18
19
class MiddlewarePipeline implements Pipeline
20
{
21
    /** @var array<SplQueue> */
22
    public array $concurrentPipelines;
23
    /** @var array<MiddlewareInterface> */
24
    private array $middlewareCollection;
25
26
    /**
27
     * @param array<MiddlewareInterface> $middlewareCollection
28
     * @param array<SplQueue> $concurrentPipelines
29
     */
30 5
    public function __construct(
31
        array $middlewareCollection = [],
32
        array $concurrentPipelines = []
33
    ) {
34 5
        $this->concurrentPipelines = $concurrentPipelines;
35 5
        $this->middlewareCollection = $middlewareCollection;
36 5
    }
37
38 4
    public function pipe(MiddlewareInterface $middleware): void
39
    {
40 4
        $this->middlewareCollection[] = $middleware;
41 4
    }
42
43 2
    public function handle(ServerRequestInterface $request): ResponseInterface
44
    {
45
        /** @var string $requestId */
46 2
        $requestId = $request->getAttribute('request_id');
47 2
        $this->setCurrentPipeline($requestId);
48
49 2
        return new PromiseResponse(resolve($request)->then(
50 2
            function (ServerRequestInterface $request) {
51
                /** @var string $requestId */
52 2
                $requestId = $request->getAttribute('request_id');
53
                try {
54
                    /** @var MiddlewareInterface $middleware */
55 2
                    $middleware = $this->concurrentPipelines[$requestId]->dequeue();
56
57 2
                    $response = $middleware->process($request, $this);
58 1
                    unset($this->concurrentPipelines[$requestId]);
59
60 1
                    return resolve($response);
61 1
                } catch (Throwable $exception) {
62 1
                    unset($this->concurrentPipelines[$requestId]);
63
64 1
                    return reject($exception);
65
                }
66 2
            }
67
        ));
68
    }
69
70 2
    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
71
    {
72
        /** @var ?string $requestId */
73 2
        $requestId = $request->getAttribute('request_id');
74 2
        if (!$requestId) {
75 1
            $requestId = Uuid::uuid4()->toString();
76 1
            $request = $request->withAttribute('request_id', $requestId);
77
        }
78 2
        $this->setCurrentPipeline($requestId);
79
80 2
        return new PromiseResponse(resolve($request)
81 2
            ->then(function (ServerRequestInterface $request) use ($handler) {
82
                /** @var string $requestId */
83 2
                $requestId = $request->getAttribute('request_id');
84
                try {
85
                    /** @var SplQueue<MiddlewareInterface> $queue */
86 2
                    $queue = $this->concurrentPipelines[$requestId];
87 2
                    $next = new NextHandler($queue, $handler);
88
89 2
                    return resolve($next->handle($request));
90 1
                } catch (Throwable $exception) {
91 1
                    unset($this->concurrentPipelines[$requestId]);
92
93 1
                    return reject($exception);
94
                }
95 2
            }));
96
    }
97
98 4
    private function setCurrentPipeline(string $requestId): void
99
    {
100 4
        if (empty($this->concurrentPipelines[$requestId])) {
101 4
            $queue = new SplQueue();
102 4
            foreach ($this->middlewareCollection as $middlewareName) {
103 4
                $queue->enqueue($middlewareName);
104
            }
105 4
            $this->concurrentPipelines[$requestId] = $queue;
106
        }
107 4
    }
108
}
109