Passed
Push — master ( a32f43...4dbc55 )
by Herberto
05:27
created

LockingMiddleware::enqueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 2
dl 0
loc 4
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Werkspot\MessageBus\Bus\DeliveryChain\Middleware;
6
7
use Exception;
8
use Werkspot\MessageBus\Bus\DeliveryChain\MiddlewareInterface;
9
use Werkspot\MessageBus\Message\MessageInterface;
10
11
/**
12
 * This middleware isolates the execution of subsequent commands
13
 * so we that they don't interfere on the execution of the
14
 * current command
15
 */
16
final class LockingMiddleware implements MiddlewareInterface
17
{
18
    /**
19
     * @var bool
20
     */
21
    private $executing = false;
22
23
    /**
24
     * @var callable[]
25
     */
26
    private $queue = [];
27
28 2
    public function deliver(MessageInterface $message, callable $next): void
29
    {
30 2
        $this->enqueue($message, $next);
31
32 2
        if ($this->executing) {
33 1
            return;
34
        }
35
36
        try {
37 2
            $this->executing = true;
38 2
            $this->processQueue();
39 1
        } catch (Exception $e) {
40 1
            $this->queue = [];
41 1
            throw $e;
42 1
        } finally {
43 2
            $this->executing = false;
44
        }
45 1
    }
46
47
    /**
48
     * Queues the execution of the next message instead of executing it
49
     * when it's added to the message bus
50
     */
51
    private function enqueue(MessageInterface $message, callable $next): void
52
    {
53 2
        $this->queue[] = function () use ($message, $next): void {
54 2
            $next($message);
55 1
        };
56 2
    }
57
58
    /**
59
     * Process one message at time so we can isolate their execution
60
     */
61 2
    private function processQueue(): void
62
    {
63 2
        while ($resumeMessage = array_shift($this->queue)) {
64 2
            $resumeMessage();
65
        }
66 1
    }
67
}
68