Passed
Push — master ( 3ce552...8bffb5 )
by Mr
02:02
created

RabbitMq3Worker   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 77
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 77
rs 10
c 0
b 0
f 0
wmc 8

5 Methods

Rating   Name   Duplication   Size   Complexity  
A fail() 0 4 1
A run() 0 15 2
B execute() 0 24 3
A retry() 0 2 1
A __construct() 0 10 1
1
<?php
2
3
namespace Daikon\RabbitMq3\Job;
4
5
use Assert\Assertion;
6
use Daikon\AsyncJob\Event\JobFailed;
7
use Daikon\AsyncJob\Job\JobMap;
8
use Daikon\AsyncJob\Worker\WorkerInterface;
9
use Daikon\MessageBus\Envelope;
10
use Daikon\MessageBus\EnvelopeInterface;
11
use Daikon\MessageBus\MessageBusInterface;
12
use Daikon\MessageBus\Metadata\Metadata;
13
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
14
use PhpAmqpLib\Message\AMQPMessage;
15
16
final class RabbitMq3Worker implements WorkerInterface
17
{
18
    private $connector;
19
20
    private $messageBus;
21
22
    private $jobMap;
23
24
    private $settings;
25
26
    public function __construct(
27
        RabbitMq3Connector $connector,
28
        MessageBusInterface $messageBus,
29
        JobMap $jobMap,
30
        array $settings = []
31
    ) {
32
        $this->connector = $connector;
33
        $this->messageBus = $messageBus;
34
        $this->jobMap = $jobMap;
35
        $this->settings = $settings;
36
    }
37
38
    public function run(array $parameters = []): void
39
    {
40
        $queue = $parameters['queue'];
41
        Assertion::notBlank($queue);
42
43
        $messageHandler = function (AMQPMessage $message) {
44
            $this->execute($message);
45
        };
46
47
        $channel = $this->connector->getConnection()->channel();
48
        $channel->basic_qos(null, 1, null);
49
        $channel->basic_consume($queue, false, true, false, false, false, $messageHandler);
0 ignored issues
show
Bug introduced by
false of type false is incompatible with the type string expected by parameter $consumer_tag of PhpAmqpLib\Channel\AMQPChannel::basic_consume(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

49
        $channel->basic_consume($queue, /** @scrutinizer ignore-type */ false, true, false, false, false, $messageHandler);
Loading history...
50
51
        while (count($channel->callbacks)) {
52
            $channel->wait();
53
        }
54
    }
55
56
    private function execute(AMQPMessage $message): void
57
    {
58
        $deliveryInfo = $message->delivery_info;
59
        $channel = $deliveryInfo['channel'];
60
        $deliveryTag = $deliveryInfo['delivery_tag'];
61
62
        $envelope = Envelope::fromArray(json_decode($message->body, true));
63
        $metadata = $envelope->getMetadata();
64
        $job = $this->jobMap->get($metadata->get('job'));
65
66
        try {
67
            $this->messageBus->receive($envelope);
68
        } catch (\Exception $error) {
69
            if ($job->getStrategy()->canRetry()) {
70
                $this->retry($envelope);
71
            } else {
72
                $this->fail(
73
                    $envelope,
74
                    $metadata->with('_errorMessage', $error->getMessage())
75
                );
76
            }
77
        }
78
79
        $channel->basic_ack($deliveryTag);
80
    }
81
82
    private function retry(EnvelopeInterface $envelope): void
0 ignored issues
show
Unused Code introduced by
The parameter $envelope is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

82
    private function retry(/** @scrutinizer ignore-unused */ EnvelopeInterface $envelope): void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
83
    {
84
        //work out retry mechanism
85
        //republish to message bus with expiration?
86
        //defer in transport?
87
    }
88
89
    private function fail(EnvelopeInterface $envelope, Metadata $metadata): void
90
    {
91
        $jobFailed = JobFailed::fromArray(['failed_message' => $envelope->getMessage()]);
92
        $this->messageBus->publish($jobFailed, 'logging', $metadata);
93
    }
94
}