Completed
Push — master ( 00a920...bb9783 )
by Tomasz
06:01 queued 03:05
created

RabbitMqWorker::translateCommand()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 16
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 4.128

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 16
ccs 8
cts 10
cp 0.8
rs 9.2
cc 4
eloc 10
nc 4
nop 1
crap 4.128
1
<?php
2
3
namespace Gendoria\CommandQueueRabbitMqDriverBundle\Worker;
4
5
use Exception;
6
use Gendoria\CommandQueue\Command\CommandInterface;
7
use Gendoria\CommandQueue\CommandProcessor\CommandProcessorInterface;
8
use Gendoria\CommandQueue\ProcessorFactoryInterface;
9
use Gendoria\CommandQueue\ProcessorNotFoundException;
10
use Gendoria\CommandQueue\Worker\Exception\ProcessorErrorException;
11
use Gendoria\CommandQueue\Worker\Exception\TranslateErrorException;
12
use Gendoria\CommandQueueBundle\Worker\BaseSymfonyWorker;
13
use InvalidArgumentException;
14
use JMS\Serializer\Serializer;
15
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
16
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
17
use PhpAmqpLib\Message\AMQPMessage;
18
use Psr\Log\LoggerInterface;
19
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
20
21
/**
22
 * Command queue worker listening on commands form RabbitMQ channel.
23
 *
24
 * @author Tomasz Struczyński <[email protected]>
25
 */
26
class RabbitMqWorker extends BaseSymfonyWorker implements ConsumerInterface
27
{
28
29
    /**
30
     * Rabbit MQ worker subsystem name.
31
     * 
32
     * @var string
33
     */
34
    const SUBSYSTEM_NAME = "RabbitMqworker";
35
36
    /**
37
     * Serializer instance.
38
     *
39
     * @var Serializer
40
     */
41
    private $serializer;
42
43
    /**
44
     * Reschedule producer instance.
45
     *
46
     * @var ProducerInterface
47
     */
48
    private $rescheduleProducer;
49
50
    /**
51
     * Class constructor.
52
     *
53
     * @param EventDispatcherInterface $eventDispatcher    Symfony event dispatcher.
54
     * @param ProcessorFactoryInterface         $processorFactory
55
     * @param Serializer               $serializer
56
     * @param ProducerInterface                 $rescheduleProducer
57
     * @param LoggerInterface          $logger             Logger instance.
58
     */
59 7
    public function __construct(EventDispatcherInterface $eventDispatcher, ProcessorFactoryInterface $processorFactory, Serializer $serializer, ProducerInterface $rescheduleProducer, LoggerInterface $logger = null)
60
    {
61 7
        parent::__construct($processorFactory, $eventDispatcher, $logger);
62
63 7
        $this->serializer = $serializer;
64 7
        $this->rescheduleProducer = $rescheduleProducer;
65 7
    }
66
67
    /**
68
     * Process single message received from RabbitMq server.
69
     *
70
     * @param AMQPMessage $msg
71
     *
72
     * @return null|integer Return code, dictating further message status.
73
     */
74 7
    public function execute(AMQPMessage $msg)
75
    {
76
        //We try to process message. On known errors we try to reschedule. On unknown - we simply reject.
77
        try {
78 7
            $this->process($msg);
79 7
        } catch (ProcessorErrorException $e) {
80 1
            $this->maybeReschedule($msg, $e, $e->getProcessor());
81 1
            return self::MSG_REJECT;
82 5
        } catch (ProcessorNotFoundException $e) {
83 1
            $this->maybeReschedule($msg, $e);
84 1
            return self::MSG_REJECT;
85 4
        } catch (TranslateErrorException $e) {
86 4
            $this->maybeReschedule($msg, $e);
87 4
            return self::MSG_REJECT;
88
        }
89 1
        return self::MSG_ACK;
90
    }
91
92 7
    protected function translateCommand($commandData)
93
    {
94
        /* @var $commandData AMQPMessage */
95 7
        $headers = $commandData->get('application_headers')->getNativeData();
96 7
        if (empty($headers['x-class-name'])) {
97 4
            throw new InvalidArgumentException("Class name header 'x-class-name' not found");
98
        }
99 3
        $command = $this->serializer->deserialize($commandData->body, $headers['x-class-name'], 'json');
100 3
        if (!is_object($command)) {
101
            throw new InvalidArgumentException("Translated command should be an object.");
102
        }
103 3
        if (!$command instanceof CommandInterface) {
104
            throw new InvalidArgumentException("Translated command should implement interface CommandInterface.");
105
        }
106 3
        return $command;
107
    }
108
109 7
    public function getSubsystemName()
110
    {
111 7
        return self::SUBSYSTEM_NAME;
112
    }
113
114
    /**
115
     * Send message for rescheduler, if maximum number of tries has not been exceeded.
116
     *
117
     * @param AMQPMessage               $msg
118
     * @param Exception                 $e
119
     * @param CommandProcessorInterface $processor
120
     */
121 6
    private function maybeReschedule(AMQPMessage $msg, Exception $e, CommandProcessorInterface $processor = null)
122
    {
123 6
        $triesNum = 10;
124 6
        $headers = $msg->get('application_headers')->getNativeData();
125 6
        $retryCount = $this->getRetryCount($headers);
126 6
        $retry = ($retryCount < $triesNum - 1);
127 6
        $resheduleInS = (5 * $retryCount + 10);
128
129 6
        $this->logger->error(
130 6
            sprintf(
131 6
                'Error while executing processor (retry count: %d - %s): %s', $retryCount + 1, $retry ? 'retry in ' . $resheduleInS . 's' : 'reject', $e->getMessage()
132 6
            ), array($e->getTraceAsString(), $this, $processor)
133 6
        );
134
135 6
        if ($retry) {
136 5
            $this->rescheduleProducer->publish(
137 5
                $msg->body, (string)$msg->delivery_info['routing_key'], array_merge(
138 5
                    $msg->get_properties(), array('expiration' => $resheduleInS * 1000)
139 5
                )
140 5
            );
141 5
        }
142 6
    }
143
144
    /**
145
     * Get retry count.
146
     * 
147
     * @param array $headers
148
     * @return integer
149
     */
150 6
    private function getRetryCount($headers)
151
    {
152 6
        if (!empty($headers['x-death'])) {
153 1
            if (!empty($headers['x-death'][0]['count'])) {
154 1
                $retryCount = $headers['x-death'][0]['count'];
155 1
            } else {
156
                $retryCount = count($headers['x-death']);
157
            }
158 1
        } else {
159 5
            $retryCount = 0;
160
        }
161 6
        return $retryCount;
162
    }
163
164
}
165