Completed
Push — master ( 59049f...fab486 )
by Tomasz
12:46 queued 02:08
created

RabbitMqWorker::translateCommand()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 11
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 6
nc 2
nop 1
1
<?php
2
3
namespace Gendoria\CommandQueueRabbitMqDriverBundle\Worker;
4
5
use Exception;
6
use Gendoria\CommandQueue\CommandProcessor\CommandProcessorInterface;
7
use Gendoria\CommandQueue\ProcessorFactoryInterface;
8
use Gendoria\CommandQueue\ProcessorNotFoundException;
9
use Gendoria\CommandQueue\Worker\Exception\ProcessorErrorException;
10
use Gendoria\CommandQueue\Worker\Exception\TranslateErrorException;
11
use Gendoria\CommandQueueBundle\Worker\BaseSymfonyWorker;
12
use InvalidArgumentException;
13
use JMS\Serializer\Serializer;
14
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
15
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
16
use PhpAmqpLib\Message\AMQPMessage;
17
use Psr\Log\LoggerInterface;
18
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
19
20
/**
21
 * Command queue worker listening on commands form RabbitMQ channel.
22
 *
23
 * @author Tomasz Struczyński <[email protected]>
24
 */
25
class RabbitMqWorker extends BaseSymfonyWorker implements ConsumerInterface
26
{
27
28
    /**
29
     * Rabbit MQ worker subsystem name.
30
     * 
31
     * @var string
32
     */
33
    const SUBSYSTEM_NAME = "RabbitMqworker";
34
35
    /**
36
     * Serializer instance.
37
     *
38
     * @var Serializer
39
     */
40
    private $serializer;
41
42
    /**
43
     * Reschedule producer instance.
44
     *
45
     * @var ProducerInterface
46
     */
47
    private $rescheduleProducer;
48
49
    /**
50
     * Class constructor.
51
     *
52
     * @param EventDispatcherInterface $eventDispatcher    Symfony event dispatcher.
53
     * @param ProcessorFactoryInterface         $processorFactory
54
     * @param Serializer               $serializer
55
     * @param ProducerInterface                 $rescheduleProducer
56
     * @param LoggerInterface          $logger             Logger instance.
57
     */
58
    public function __construct(EventDispatcherInterface $eventDispatcher, ProcessorFactoryInterface $processorFactory, Serializer $serializer, ProducerInterface $rescheduleProducer, LoggerInterface $logger = null)
59
    {
60
        parent::__construct($processorFactory, $eventDispatcher, $logger);
61
62
        $this->serializer = $serializer;
63
        $this->rescheduleProducer = $rescheduleProducer;
64
    }
65
66
    /**
67
     * Process single message received from RabbitMq server.
68
     *
69
     * @param AMQPMessage $msg
70
     *
71
     * @return null|integer Return code, dictating further message status.
72
     */
73
    public function execute(AMQPMessage $msg)
74
    {
75
        //We try to process message. On known errors we try to reschedule. On unknown - we simply reject.
76
        try {
77
            $this->process($msg);
78
        } catch (ProcessorErrorException $e) {
79
            $this->maybeReschedule($msg, $e, $e->getProcessor());
80
            return self::MSG_REJECT;
81
        } catch (ProcessorNotFoundException $e) {
82
            $this->maybeReschedule($msg, $e);
83
            return self::MSG_REJECT;
84
        } catch (TranslateErrorException $e) {
85
            $this->maybeReschedule($msg, $e);
86
            return self::MSG_REJECT;
87
        } catch (Exception $e) {
88
            return self::MSG_REJECT;
89
        }
90
    }
91
92
    protected function translateCommand($commandData)
93
    {
94
        /* @var $commandData AMQPMessage */
95
        $headers = $commandData->get('application_headers')->getNativeData();
96
        if (empty($headers['x-class-name'])) {
97
            throw new InvalidArgumentException("Class name header 'x-class-name' not found");
98
        }
99
        return $this->serializer->deserialize(
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->serializer...-class-name'], 'json'); (object|array|integer|double|string|boolean) is incompatible with the return type declared by the abstract method Gendoria\CommandQueue\Wo...orker::translateCommand of type Gendoria\CommandQueue\Command\CommandInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
100
                $commandData->body, $headers['x-class-name'], 'json'
101
        );
102
    }
103
104
    public function getSubsystemName()
105
    {
106
        return self::SUBSYSTEM_NAME;
107
    }
108
109
    /**
110
     * Send message for rescheduler, if maximum number of tries has not been exceeded.
111
     *
112
     * @param AMQPMessage               $msg
113
     * @param Exception                 $e
114
     * @param CommandProcessorInterface $processor
115
     */
116
    private function maybeReschedule(AMQPMessage $msg, Exception $e, CommandProcessorInterface $processor = null)
117
    {
118
        $triesNum = 10;
119
        $headers = $msg->get('application_headers')->getNativeData();
120
        $retryCount = $this->getRetryCount($headers);
121
        $retry = ($retryCount < $triesNum - 1);
122
        $resheduleInS = (5 * $retryCount + 10);
123
124
        $this->logger->error(
125
            sprintf(
126
                'Error while executing processor (retry count: %d - %s): %s', $retryCount + 1, $retry ? 'retry in ' . $resheduleInS . 's' : 'reject', $e->getMessage()
127
            ), array($e->getTraceAsString(), $this, $processor)
128
        );
129
130
        if ($retry) {
131
            $this->rescheduleProducer->publish(
132
                $msg->body, $msg->delivery_info['routing_key'], array_merge(
0 ignored issues
show
Documentation introduced by
$msg->delivery_info['routing_key'] is of type object<PhpAmqpLib\Channel\AMQPChannel>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
133
                    $msg->get_properties(), array('expiration' => $resheduleInS * 1000)
134
                )
135
            );
136
        }
137
    }
138
139
    /**
140
     * Get retry count.
141
     * 
142
     * @param array $headers
143
     * @return integer
144
     */
145
    private function getRetryCount($headers)
146
    {
147
        if (!empty($headers['x-death'])) {
148
            if (!empty($headers['x-death'][0]['count'])) {
149
                $retryCount = $headers['x-death'][0]['count'];
150
            } else {
151
                $retryCount = count($headers['x-death']);
152
            }
153
        } else {
154
            $retryCount = 0;
155
        }
156
        return $retryCount;
157
    }
158
159
}
160