QueueWorker   A
last analyzed

Complexity

Total Complexity 12

Size/Duplication

Total Lines 53
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 80%

Importance

Changes 7
Bugs 0 Features 1
Metric Value
wmc 12
c 7
b 0
f 1
lcom 1
cbo 7
dl 0
loc 53
ccs 28
cts 35
cp 0.8
rs 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
B work() 0 20 9
A iterate() 0 21 2
1
<?php
2
3
namespace MGDigital\BusQue;
4
5
use MGDigital\BusQue\Exception\ConcurrencyException;
6
use MGDigital\BusQue\Exception\DriverException;
7
use MGDigital\BusQue\Exception\TimeoutException;
8
9
class QueueWorker
10
{
11
12
    private $implementation;
13
14 3
    public function __construct(Implementation $implementation)
15
    {
16 3
        $this->implementation = $implementation;
17 3
    }
18
19 2
    public function work(string $queueName, int $n = null, int $time = null)
20
    {
21 2
        $stopwatchStart = time();
22 2
        while ($n === null || $n > 0) {
23
            try {
24 2
                $this->iterate($queueName, $time);
25 2
                $n === null || $n--;
26
            } catch (DriverException $exception) {
27
                $this->implementation->getLogger()
28
                    ->error($exception->getMessage(), compact('exception'));
29
            } catch (ConcurrencyException $exception) {
30
                // do nothing
31
            } catch (TimeoutException $exception) {
32
                break;
33
            }
34 2
            if ($time !== null && (time() - $stopwatchStart >= $time)) {
35
                break;
36
            }
37
        }
38 2
    }
39
40 2
    private function iterate(string $queueName, int $time = null)
41
    {
42 2
        $received = $this->implementation->getQueueDriver()
43 2
            ->awaitCommand($queueName, $time);
44 2
        $command = $this->implementation->getCommandSerializer()
45 2
            ->unserialize($received->getSerialized());
46 2
        $this->implementation->getLogger()
47 2
            ->debug('Command received', compact('command'));
48
        try {
49 2
            $this->implementation->getCommandBusAdapter()
50 2
                ->handle($command, true);
51 2
            $this->implementation->getLogger()
52 2
                ->info('Command handled', compact('command'));
53 1
        } catch (\Throwable $exception) {
54 1
            $this->implementation->getLogger()
55 1
                ->error('Command failed', compact('command', 'exception'));
56 2
        } finally {
57 2
            $this->implementation->getQueueDriver()
58 2
                ->completeCommand($received->getQueueName(), $received->getId());
59
        }
60 2
    }
61
}
62