Passed
Push — master ( 22ac3b...c825ad )
by Mr
07:18
created

RabbitMq3Worker::retry()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/rabbitmq3-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\RabbitMq3\Job;
10
11
use Assert\Assertion;
12
use Daikon\AsyncJob\Job\JobDefinitionInterface;
13
use Daikon\AsyncJob\Job\JobDefinitionMap;
14
use Daikon\AsyncJob\Worker\WorkerInterface;
15
use Daikon\Interop\RuntimeException;
16
use Daikon\MessageBus\Envelope;
17
use Daikon\MessageBus\MessageBusInterface;
18
use Daikon\Metadata\MetadataInterface;
19
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
20
use PhpAmqpLib\Channel\AMQPChannel;
21
use PhpAmqpLib\Message\AMQPMessage;
22
use Psr\Log\LoggerInterface;
23
24
final class RabbitMq3Worker implements WorkerInterface
25
{
26
    private RabbitMq3Connector $connector;
27
28
    private MessageBusInterface $messageBus;
29
30
    private JobDefinitionMap $jobDefinitionMap;
31
32
    private LoggerInterface $logger;
33
34
    private array $settings;
35
36
    public function __construct(
37
        RabbitMq3Connector $connector,
38
        MessageBusInterface $messageBus,
39
        JobDefinitionMap $jobDefinitionMap,
40
        LoggerInterface $logger,
41
        array $settings = []
42
    ) {
43
        $this->connector = $connector;
44
        $this->messageBus = $messageBus;
45
        $this->jobDefinitionMap = $jobDefinitionMap;
46
        $this->logger = $logger;
47
        $this->settings = $settings;
48
    }
49
50
    public function run(array $parameters = []): void
51
    {
52
        $queue = $parameters['queue'];
53
        Assertion::notBlank($queue);
54
55
        $messageHandler = function (AMQPMessage $amqpMessage): void {
56
            $this->execute($amqpMessage);
57
        };
58
59
        /** @var AMQPChannel $channel */
60
        $channel = $this->connector->getConnection()->channel();
61
        $channel->basic_qos(0, 1, false);
62
        $channel->basic_consume($queue, '', true, false, false, false, $messageHandler);
63
64
        while (count($channel->callbacks)) {
65
            $channel->wait();
66
        }
67
    }
68
69
    private function execute(AMQPMessage $amqpMessage): void
70
    {
71
        $deliveryInfo = $amqpMessage->delivery_info;
72
        $channel = $deliveryInfo['channel'];
73
        $deliveryTag = $deliveryInfo['delivery_tag'];
74
75
        $envelope = Envelope::fromNative(json_decode($amqpMessage->body, true));
76
        $metadata = $envelope->getMetadata();
77
        $jobName = (string)$metadata->get('job');
78
79
        Assertion::notBlank($jobName, 'Worker job name must not be blank.');
80
        /** @var JobDefinitionInterface $job */
81
        $job = $this->jobDefinitionMap->get($jobName);
82
        Assertion::isInstanceOf($job, JobDefinitionInterface::class, "Job definition '$jobName' not found");
83
84
        try {
85
            $this->messageBus->receive($envelope);
86
        } catch (RuntimeException $error) {
87
            $message = $envelope->getMessage();
88
            if ($job->getStrategy()->canRetry($envelope)) {
89
                $retries = $metadata->get('_retries', 0);
90
                /** @var MetadataInterface $metadata */
91
                $metadata = $metadata
92
                    ->with('_retries', ++$retries)
93
                    ->with('_expiration', $job->getStrategy()->getRetryInterval($envelope));
94
                $this->messageBus->publish($message, (string)$metadata->get('_channel'), $metadata);
95
            } else {
96
                //@todo add message/metadata to error context
97
                $this->logger->error("Failed handling job '$jobName'", ['exception' => $error]);
98
            }
99
        }
100
101
        $channel->basic_ack($deliveryTag);
102
    }
103
}
104