RabbitMq3Worker   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 73
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 5
Bugs 0 Features 0
Metric Value
eloc 39
c 5
b 0
f 0
dl 0
loc 73
ccs 0
cts 33
cp 0
rs 10
wmc 7

3 Methods

Rating   Name   Duplication   Size   Complexity  
A run() 0 16 2
A __construct() 0 12 1
A execute() 0 28 4
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/rabbitmq3-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\RabbitMq3\Job;
10
11
use Daikon\AsyncJob\Job\JobDefinitionInterface;
12
use Daikon\AsyncJob\Job\JobDefinitionMap;
13
use Daikon\AsyncJob\Metadata\JobMetadataEnricher;
14
use Daikon\AsyncJob\Worker\WorkerInterface;
15
use Daikon\Interop\Assertion;
16
use Daikon\Interop\RuntimeException;
17
use Daikon\MessageBus\Channel\ChannelInterface;
18
use Daikon\MessageBus\Envelope;
19
use Daikon\MessageBus\MessageBusInterface;
20
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
21
use PhpAmqpLib\Channel\AMQPChannel;
22
use PhpAmqpLib\Message\AMQPMessage;
23
use Psr\Log\LoggerInterface;
24
25
final class RabbitMq3Worker implements WorkerInterface
26
{
27
    private RabbitMq3Connector $connector;
28
29
    private MessageBusInterface $messageBus;
30
31
    private JobDefinitionMap $jobDefinitionMap;
32
33
    private LoggerInterface $logger;
34
35
    private array $settings;
36
37
    public function __construct(
38
        RabbitMq3Connector $connector,
39
        MessageBusInterface $messageBus,
40
        JobDefinitionMap $jobDefinitionMap,
41
        LoggerInterface $logger,
42
        array $settings = []
43
    ) {
44
        $this->connector = $connector;
45
        $this->messageBus = $messageBus;
46
        $this->jobDefinitionMap = $jobDefinitionMap;
47
        $this->logger = $logger;
48
        $this->settings = $settings;
49
    }
50
51
    public function run(array $parameters = []): void
52
    {
53
        $queue = $parameters['queue'];
54
        Assertion::notBlank($queue, 'Queue name must not be blank.');
55
56
        $messageHandler = function (AMQPMessage $amqpMessage): void {
57
            $this->execute($amqpMessage);
58
        };
59
60
        /** @var AMQPChannel $channel */
61
        $channel = $this->connector->getConnection()->channel();
62
        $channel->basic_qos(0, 1, false);
63
        $channel->basic_consume($queue, '', true, false, false, false, $messageHandler);
64
65
        while (count($channel->callbacks)) {
66
            $channel->wait();
67
        }
68
    }
69
70
    private function execute(AMQPMessage $amqpMessage): void
71
    {
72
        $envelope = Envelope::fromNative(json_decode($amqpMessage->body, true));
73
        $metadata = $envelope->getMetadata();
74
75
        $jobKey = (string)$metadata->get(JobMetadataEnricher::JOB);
76
        Assertion::notBlank($jobKey, 'Job key must not be blank.');
77
        if (!$this->jobDefinitionMap->has($jobKey)) {
78
            throw new RuntimeException("Job definition '$jobKey' not found.");
79
        }
80
81
        try {
82
            $this->messageBus->receive($envelope);
83
            $amqpMessage->ack();
84
        } catch (RuntimeException $error) {
85
            /** @var JobDefinitionInterface $job */
86
            $job = $this->jobDefinitionMap->get($jobKey);
87
            if ($job->getStrategy()->canRetry($envelope)) {
88
                $this->messageBus->publish(
89
                    $envelope->getMessage(),
90
                    (string)$metadata->get(ChannelInterface::METADATA_KEY),
91
                    $metadata
92
                );
93
            } else {
94
                //@todo add message/metadata to error context
95
                $this->logger->error($error->getMessage(), ['exception' => $error->getTrace()]);
96
            }
97
            $amqpMessage->nack();
98
        }
99
    }
100
}
101