LockingMiddleware::runQueuedJobs()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 3
nc 2
nop 0
1
<?php
2
3
/**
4
 * This file is part of the Cubiche/Bus package.
5
 *
6
 * Copyright (c) Cubiche
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
namespace Cubiche\Core\Bus\Middlewares\Locking;
12
13
use Cubiche\Core\Delegate\Delegate;
14
use Cubiche\Core\Bus\Middlewares\MiddlewareInterface;
15
16
/**
17
 * LockingMiddleware class.
18
 *
19
 * @author Ivannis Suárez Jerez <[email protected]>
20
 */
21
class LockingMiddleware implements MiddlewareInterface
22
{
23
    /**
24
     * @var bool
25
     */
26
    private $isRunning;
27
28
    /**
29
     * @var Delegate[]
30
     */
31
    private $queue = [];
32
33
    /**
34
     * Execute the given message... after other running messages are complete.
35
     *
36
     * @param mixed    $message
37
     * @param callable $next
38
     *
39
     * @throws \Exception
40
     */
41
    public function handle($message, callable $next)
42
    {
43
        $this->queue[] = Delegate::fromClosure(function () use ($message, $next) {
44
            return $next($message);
45
        });
46
47
        if ($this->isRunning) {
48
            return;
49
        }
50
51
        $this->isRunning = true;
52
        try {
53
            $this->runQueuedJobs();
54
        } catch (\Exception $e) {
55
            $this->isRunning = false;
56
            $this->queue = [];
57
58
            throw $e;
59
        }
60
61
        $this->isRunning = false;
62
    }
63
64
    /**
65
     * Process any pending message in the queue.
66
     */
67
    protected function runQueuedJobs()
68
    {
69
        while ($lastEvent = array_shift($this->queue)) {
70
            $lastEvent->__invoke();
71
        }
72
    }
73
}
74