Completed
Push — master ( 012a4c...477c84 )
by Tomasz
06:52
created

RabbitMqWorker   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 128
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 100%

Importance

Changes 4
Bugs 0 Features 0
Metric Value
wmc 14
c 4
b 0
f 0
lcom 1
cbo 8
dl 0
loc 128
ccs 52
cts 52
cp 1
rs 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A execute() 0 17 4
A getSerializedCommandData() 0 9 2
A getSubsystemName() 0 4 1
A maybeReschedule() 0 22 3
A getRetryCount() 0 13 3
1
<?php
2
3
namespace Gendoria\CommandQueueRabbitMqDriverBundle\Worker;
4
5
use Exception;
6
use Gendoria\CommandQueue\CommandProcessor\CommandProcessorInterface;
7
use Gendoria\CommandQueue\ProcessorFactoryInterface;
8
use Gendoria\CommandQueue\ProcessorNotFoundException;
9
use Gendoria\CommandQueue\Serializer\SerializedCommandData;
10
use Gendoria\CommandQueue\Worker\Exception\ProcessorErrorException;
11
use Gendoria\CommandQueue\Worker\Exception\TranslateErrorException;
12
use Gendoria\CommandQueueBundle\Serializer\JmsSerializer;
13
use Gendoria\CommandQueueBundle\Worker\BaseSymfonyWorker;
14
use JMS\Serializer\Serializer;
15
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
16
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
17
use PhpAmqpLib\Message\AMQPMessage;
18
use Psr\Log\LoggerInterface;
19
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
20
21
/**
22
 * Command queue worker listening on commands form RabbitMQ channel.
23
 *
24
 * @author Tomasz Struczyński <[email protected]>
25
 */
26
class RabbitMqWorker extends BaseSymfonyWorker implements ConsumerInterface
27
{
28
29
    /**
30
     * Rabbit MQ worker subsystem name.
31
     * 
32
     * @var string
33
     */
34
    const SUBSYSTEM_NAME = "RabbitMqworker";
35
36
    /**
37
     * Reschedule producer instance.
38
     *
39
     * @var ProducerInterface
40
     */
41
    private $rescheduleProducer;
42
43
    /**
44
     * Class constructor.
45
     *
46
     * @param EventDispatcherInterface $eventDispatcher    Symfony event dispatcher.
47
     * @param ProcessorFactoryInterface         $processorFactory
48
     * @param Serializer               $serializer
49
     * @param ProducerInterface                 $rescheduleProducer
50
     * @param LoggerInterface          $logger             Logger instance.
51
     */
52 8
    public function __construct(EventDispatcherInterface $eventDispatcher, ProcessorFactoryInterface $processorFactory, Serializer $serializer, ProducerInterface $rescheduleProducer, LoggerInterface $logger = null)
53
    {
54 8
        $jmsSerializer = new JmsSerializer($serializer);
55 8
        parent::__construct($processorFactory, $jmsSerializer, $eventDispatcher, $logger);
56
57 8
        $this->rescheduleProducer = $rescheduleProducer;
58 8
    }
59
60
    /**
61
     * Process single message received from RabbitMq server.
62
     *
63
     * @param AMQPMessage $msg
64
     *
65
     * @return null|integer Return code, dictating further message status.
66
     */
67 8
    public function execute(AMQPMessage $msg)
68
    {
69
        //We try to process message. On known errors we try to reschedule. On unknown - we simply reject.
70
        try {
71 8
            $this->process($msg);
72 8
        } catch (ProcessorErrorException $e) {
73 1
            $this->maybeReschedule($msg, $e, $e->getProcessor());
74 1
            return self::MSG_REJECT;
75 6
        } catch (ProcessorNotFoundException $e) {
76 1
            $this->maybeReschedule($msg, $e);
77 1
            return self::MSG_REJECT;
78 5
        } catch (TranslateErrorException $e) {
79 5
            $this->maybeReschedule($msg, $e);
80 6
            return self::MSG_REJECT;
81
        }
82 1
        return self::MSG_ACK;
83
    }
84
85
    /**
86
     * @param AMQPMessage $commandData
87
     * {@inheritdoc}
88
     */
89 8
    protected function getSerializedCommandData($commandData)
90
    {
91
        /* @var $commandData AMQPMessage */
92 8
        $headers = $commandData->get('application_headers')->getNativeData();
93 8
        if (empty($headers['x-class-name'])) {
94 5
            throw new TranslateErrorException($commandData, "Class name header 'x-class-name' not found");
95
        }
96 3
        return new SerializedCommandData($commandData->body, $headers['x-class-name']);
97
    }    
98
99 8
    public function getSubsystemName()
100
    {
101 8
        return self::SUBSYSTEM_NAME;
102
    }
103
104
    /**
105
     * Send message for rescheduler, if maximum number of tries has not been exceeded.
106
     *
107
     * @param AMQPMessage               $msg
108
     * @param Exception                 $e
109
     * @param CommandProcessorInterface $processor
110
     */
111 7
    private function maybeReschedule(AMQPMessage $msg, Exception $e, CommandProcessorInterface $processor = null)
112
    {
113 7
        $triesNum = 10;
114 7
        $headers = $msg->get('application_headers')->getNativeData();
115 7
        $retryCount = $this->getRetryCount($headers);
116 7
        $retry = ($retryCount < $triesNum - 1);
117 7
        $resheduleInS = (5 * $retryCount + 10);
118
119 7
        $this->logger->error(
120 7
            sprintf(
121 7
                'Error while executing processor (retry count: %d - %s): %s', $retryCount + 1, $retry ? 'retry in ' . $resheduleInS . 's' : 'reject', $e->getMessage()
122 7
            ), array($e->getTraceAsString(), $this, $processor)
123 7
        );
124
125 7
        if ($retry) {
126 5
            $this->rescheduleProducer->publish(
127 5
                $msg->body, (string)$msg->delivery_info['routing_key'], array_merge(
128 5
                    $msg->get_properties(), array('expiration' => $resheduleInS * 1000)
129 5
                )
130 5
            );
131 5
        }
132 7
    }
133
134
    /**
135
     * Get retry count.
136
     * 
137
     * @param array $headers
138
     * @return integer
139
     */
140 7
    private function getRetryCount($headers)
141
    {
142 7
        if (!empty($headers['x-death'])) {
143 2
            if (!empty($headers['x-death'][0]['count'])) {
144 1
                $retryCount = $headers['x-death'][0]['count'];
145 1
            } else {
146 1
                $retryCount = count($headers['x-death']);
147
            }
148 2
        } else {
149 5
            $retryCount = 0;
150
        }
151 7
        return $retryCount;
152
    }
153
}
154