Completed
Push — master ( 987339...3717b7 )
by Mike
02:45
created

QueueWorker   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 45
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 89.29%

Importance

Changes 5
Bugs 0 Features 1
Metric Value
wmc 10
lcom 1
cbo 6
dl 0
loc 45
ccs 25
cts 28
cp 0.8929
rs 10
c 5
b 0
f 1

3 Methods

Rating   Name   Duplication   Size   Complexity  
B work() 0 11 6
A __construct() 0 4 1
A iterate() 0 22 3
1
<?php
2
3
namespace MGDigital\BusQue;
4
5
use MGDigital\BusQue\Exception\TimeoutException;
6
use Psr\Log\LogLevel;
7
8
class QueueWorker
9
{
10
11
    private $implementation;
12
13 3
    public function __construct(Implementation $implementation)
14
    {
15 3
        $this->implementation = $implementation;
16 3
    }
17
18 2
    public function work(string $queueName, int $n = null, int $time = null)
19
    {
20 2
        $stopwatchStart = time();
21 2
        while ($n === null || $n > 0) {
22 2
            $this->iterate($queueName, $time);
23 2
            $n === null || $n--;
24 2
            if ($time !== null && (time() - $stopwatchStart >= $time)) {
25
                break;
26
            }
27
        }
28 2
    }
29
30 2
    private function iterate(string $queueName, int $time = null)
31
    {
32
        try {
33 2
            $received = $this->implementation->getQueueDriver()
34 2
                ->awaitCommand($queueName, $time);
35
        } catch (TimeoutException $e) {
36
            return;
37
        }
38 2
        $command = $this->implementation->getCommandSerializer()
39 2
            ->unserialize($received->getSerialized());
40 2
        $this->implementation->getLogger()->debug('Command received', compact('command'));
41
        try {
42 2
            $this->implementation->getCommandBusAdapter()
43 2
                ->handle($command, true);
44 2
            $this->implementation->getLogger()->info('Command handled', compact('command'));
45 1
        } catch (\Throwable $exception) {
46 1
            $this->implementation->getLogger()->error('Command failed', compact('command', 'exception'));
47 2
        } finally {
48 2
            $this->implementation->getQueueDriver()
49 2
                ->completeCommand($received->getQueueName(), $received->getId());
50
        }
51 2
    }
52
}
53