RabbitMqWorker::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 6
ccs 4
cts 4
cp 1
rs 9.4285
c 1
b 0
f 0
cc 1
eloc 3
nc 1
nop 5
crap 1
1
<?php
2
3
namespace Gendoria\CommandQueueRabbitMqDriverBundle\Worker;
4
5
use Exception;
6
use Gendoria\CommandQueue\CommandProcessor\CommandProcessorInterface;
7
use Gendoria\CommandQueue\ProcessorFactory\Exception\ProcessorNotFoundException;
8
use Gendoria\CommandQueue\ProcessorFactory\ProcessorFactoryInterface;
9
use Gendoria\CommandQueue\Serializer\Exception\UnserializeErrorException;
10
use Gendoria\CommandQueue\Serializer\SerializedCommandData;
11
use Gendoria\CommandQueue\Serializer\SerializerInterface;
12
use Gendoria\CommandQueue\Worker\Exception\ProcessorErrorException;
13
use Gendoria\CommandQueueBundle\Worker\BaseSymfonyWorker;
14
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
15
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
16
use PhpAmqpLib\Message\AMQPMessage;
17
use Psr\Log\LoggerInterface;
18
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
19
20
/**
21
 * Command queue worker listening on commands form RabbitMQ channel.
22
 *
23
 * @author Tomasz Struczyński <[email protected]>
24
 */
25
class RabbitMqWorker extends BaseSymfonyWorker implements ConsumerInterface
26
{
27
28
    /**
29
     * Rabbit MQ worker subsystem name.
30
     * 
31
     * @var string
32
     */
33
    const SUBSYSTEM_NAME = "RabbitMqWorker";
34
35
    /**
36
     * Reschedule producer instance.
37
     *
38
     * @var ProducerInterface
39
     */
40
    private $rescheduleProducer;
41
42
    /**
43
     * Class constructor.
44
     *
45
     * @param EventDispatcherInterface $eventDispatcher    Symfony event dispatcher.
46
     * @param ProcessorFactoryInterface         $processorFactory
47
     * @param SerializerInterface               $serializer
48
     * @param ProducerInterface                 $rescheduleProducer
49
     * @param LoggerInterface          $logger             Logger instance.
50
     */
51 7
    public function __construct(EventDispatcherInterface $eventDispatcher, ProcessorFactoryInterface $processorFactory, SerializerInterface $serializer, ProducerInterface $rescheduleProducer, LoggerInterface $logger = null)
52
    {
53 7
        parent::__construct($processorFactory, $serializer, $eventDispatcher, $logger);
54
55 7
        $this->rescheduleProducer = $rescheduleProducer;
56 7
    }
57
    
58
    /**
59
     * Process single message received from RabbitMq server.
60
     *
61
     * @param AMQPMessage $msg
62
     *
63
     * @return null|integer Return code, dictating further message status.
64
     */
65 7
    public function execute(AMQPMessage $msg)
66
    {
67
        //We try to process message. On known errors we try to reschedule. On unknown - we simply reject.
68
        try {
69 7
            $this->process($msg);
70 7
        } catch (ProcessorErrorException $e) {
71 1
            $this->maybeReschedule($msg, $e, $e->getProcessor());
72 1
            return self::MSG_REJECT;
73 5
        } catch (ProcessorNotFoundException $e) {
74 1
            $this->maybeReschedule($msg, $e);
75 1
            return self::MSG_REJECT;
76 4
        } catch (UnserializeErrorException $e) {
77 4
            $this->maybeReschedule($msg, $e);
78 4
            return self::MSG_REJECT;
79
        }
80 2
        return self::MSG_ACK;
81
    }
82
83
    /**
84
     * @param AMQPMessage $commandData
85
     * {@inheritdoc}
86
     */
87 7
    protected function getSerializedCommandData($commandData)
88
    {
89
        /* @var $commandData AMQPMessage */
90 7
        $headers = $commandData->get('application_headers')->getNativeData();
91 7
        if (empty($headers['x-class-name'])) {
92 3
            throw new UnserializeErrorException($commandData, "Class name header 'x-class-name' not found");
93
        }
94 4
        return new SerializedCommandData($commandData->body, $headers['x-class-name']);
95
    }    
96
97 7
    public function getSubsystemName()
98
    {
99 7
        return self::SUBSYSTEM_NAME;
100
    }
101
102
    /**
103
     * Send message for rescheduler, if maximum number of tries has not been exceeded.
104
     *
105
     * @param AMQPMessage               $msg
106
     * @param Exception                 $e
107
     * @param CommandProcessorInterface $processor
108
     */
109 6
    private function maybeReschedule(AMQPMessage $msg, Exception $e, CommandProcessorInterface $processor = null)
110
    {
111 6
        $triesNum = 10;
112 6
        $headers = $msg->get('application_headers')->getNativeData();
113 6
        $retryCount = $this->getRetryCount($headers);
114 6
        $retry = ($retryCount < $triesNum - 1);
115 6
        $resheduleInS = (5 * $retryCount + 10);
116
117 6
        $this->logger->error(
118 6
            sprintf(
119 6
                'Error while executing processor (retry count: %d - %s): %s', $retryCount + 1, $retry ? 'retry in ' . $resheduleInS . 's' : 'reject', $e->getMessage()
120 6
            ), array($e->getTraceAsString(), $this, $processor)
121 6
        );
122
123 6
        if ($retry) {
124 4
            $this->rescheduleProducer->publish(
125 4
                $msg->body, (string)$msg->delivery_info['routing_key'], array_merge(
126 4
                    $msg->get_properties(), array('expiration' => $resheduleInS * 1000)
127 4
                )
128 4
            );
129 4
        }
130 6
    }
131
132
    /**
133
     * Get retry count.
134
     * 
135
     * @param array $headers
136
     * @return integer
137
     */
138 6
    private function getRetryCount($headers)
139
    {
140 6
        if (!empty($headers['x-death'])) {
141 2
            if (!empty($headers['x-death'][0]['count'])) {
142 1
                $retryCount = $headers['x-death'][0]['count'];
143 1
            } else {
144 1
                $retryCount = count($headers['x-death']);
145
            }
146 2
        } else {
147 4
            $retryCount = 0;
148
        }
149 6
        return $retryCount;
150
    }
151
}
152