Passed
Push — master ( 6f5612...3ce552 )
by Mr
02:56
created

RabbitMq3Worker   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 58
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 58
rs 10
c 0
b 0
f 0
wmc 5

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 10 1
A run() 0 15 2
A execute() 0 18 2
1
<?php
2
3
namespace Daikon\RabbitMq3\Worker;
4
5
use Assert\Assertion;
6
use Daikon\AsyncJob\Job\JobMap;
7
use Daikon\AsyncJob\Worker\WorkerInterface;
8
use Daikon\MessageBus\Envelope;
9
use Daikon\MessageBus\MessageBusInterface;
10
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
final class RabbitMq3Worker implements WorkerInterface
14
{
15
    private $connector;
16
17
    private $messageBus;
18
19
    private $jobMap;
20
21
    private $settings;
22
23
    public function __construct(
24
        RabbitMq3Connector $connector,
25
        MessageBusInterface $messageBus,
26
        JobMap $jobMap,
27
        array $settings = []
28
    ) {
29
        $this->connector = $connector;
30
        $this->messageBus = $messageBus;
31
        $this->jobMap = $jobMap;
32
        $this->settings = $settings;
33
    }
34
35
    public function run(): void
36
    {
37
        $queue = $this->settings['queue'];
38
        Assertion::notBlank($queue);
39
40
        $messageHandler = function (AMQPMessage $message) {
41
            $this->execute($message);
42
        };
43
44
        $channel = $this->connector->getConnection()->channel();
45
        $channel->basic_qos(null, 1, null);
46
        $channel->basic_consume($queue, false, true, false, false, false, $messageHandler);
0 ignored issues
show
Bug introduced by
false of type false is incompatible with the type string expected by parameter $consumer_tag of PhpAmqpLib\Channel\AMQPChannel::basic_consume(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

46
        $channel->basic_consume($queue, /** @scrutinizer ignore-type */ false, true, false, false, false, $messageHandler);
Loading history...
47
48
        while (count($channel->callbacks)) {
49
            $channel->wait();
50
        }
51
    }
52
53
    private function execute(AMQPMessage $message): void
54
    {
55
        $deliveryInfo = $message->delivery_info;
56
        $channel = $deliveryInfo['channel'];
57
        $deliveryTag = $deliveryInfo['delivery_tag'];
58
        $envelope = Envelope::fromArray(json_decode($message->body, true));
59
        $metadata = $envelope->getMetadata();
60
        $job = $this->jobMap->get($metadata->get('job'));
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
61
62
        //@todo possibly move execution to the job
63
        try {
64
            $this->messageBus->receive($envelope);
65
        } catch (\Exception $error) {
66
            //@todo manage job retry and failure
67
            throw $error;
68
        }
69
70
        $channel->basic_ack($deliveryTag);
71
    }
72
}