1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Gendoria\CommandQueueRabbitMqDriverBundle\Worker; |
4
|
|
|
|
5
|
|
|
use Exception; |
6
|
|
|
use Gendoria\CommandQueue\CommandProcessor\CommandProcessorInterface; |
7
|
|
|
use Gendoria\CommandQueue\ProcessorFactoryInterface; |
8
|
|
|
use Gendoria\CommandQueue\ProcessorNotFoundException; |
9
|
|
|
use Gendoria\CommandQueue\Worker\Exception\ProcessorErrorException; |
10
|
|
|
use Gendoria\CommandQueue\Worker\Exception\TranslateErrorException; |
11
|
|
|
use Gendoria\CommandQueueBundle\Worker\BaseSymfonyWorker; |
12
|
|
|
use InvalidArgumentException; |
13
|
|
|
use JMS\Serializer\Serializer; |
14
|
|
|
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; |
15
|
|
|
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; |
16
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
17
|
|
|
use Psr\Log\LoggerInterface; |
18
|
|
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* Command queue worker listening on commands form RabbitMQ channel. |
22
|
|
|
* |
23
|
|
|
* @author Tomasz Struczyński <[email protected]> |
24
|
|
|
*/ |
25
|
|
|
class RabbitMqWorker extends BaseSymfonyWorker implements ConsumerInterface |
26
|
|
|
{ |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* Rabbit MQ worker subsystem name. |
30
|
|
|
* |
31
|
|
|
* @var string |
32
|
|
|
*/ |
33
|
|
|
const SUBSYSTEM_NAME = "RabbitMqworker"; |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* Serializer instance. |
37
|
|
|
* |
38
|
|
|
* @var Serializer |
39
|
|
|
*/ |
40
|
|
|
private $serializer; |
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* Reschedule producer instance. |
44
|
|
|
* |
45
|
|
|
* @var ProducerInterface |
46
|
|
|
*/ |
47
|
|
|
private $rescheduleProducer; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* Class constructor. |
51
|
|
|
* |
52
|
|
|
* @param EventDispatcherInterface $eventDispatcher Symfony event dispatcher. |
53
|
|
|
* @param ProcessorFactoryInterface $processorFactory |
54
|
|
|
* @param Serializer $serializer |
55
|
|
|
* @param ProducerInterface $rescheduleProducer |
56
|
|
|
* @param LoggerInterface $logger Logger instance. |
57
|
|
|
*/ |
58
|
|
|
public function __construct(EventDispatcherInterface $eventDispatcher, ProcessorFactoryInterface $processorFactory, Serializer $serializer, ProducerInterface $rescheduleProducer, LoggerInterface $logger = null) |
59
|
|
|
{ |
60
|
|
|
parent::__construct($processorFactory, $eventDispatcher, $logger); |
61
|
|
|
|
62
|
|
|
$this->serializer = $serializer; |
63
|
|
|
$this->rescheduleProducer = $rescheduleProducer; |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* Process single message received from RabbitMq server. |
68
|
|
|
* |
69
|
|
|
* @param AMQPMessage $msg |
70
|
|
|
* |
71
|
|
|
* @return null|integer Return code, dictating further message status. |
72
|
|
|
*/ |
73
|
|
|
public function execute(AMQPMessage $msg) |
74
|
|
|
{ |
75
|
|
|
//We try to process message. On known errors we try to reschedule. On unknown - we simply reject. |
76
|
|
|
try { |
77
|
|
|
$this->process($msg); |
78
|
|
|
} catch (ProcessorErrorException $e) { |
79
|
|
|
$this->maybeReschedule($msg, $e, $e->getProcessor()); |
80
|
|
|
return self::MSG_REJECT; |
81
|
|
|
} catch (ProcessorNotFoundException $e) { |
82
|
|
|
$this->maybeReschedule($msg, $e); |
83
|
|
|
return self::MSG_REJECT; |
84
|
|
|
} catch (TranslateErrorException $e) { |
85
|
|
|
$this->maybeReschedule($msg, $e); |
86
|
|
|
return self::MSG_REJECT; |
87
|
|
|
} catch (Exception $e) { |
88
|
|
|
return self::MSG_REJECT; |
89
|
|
|
} |
90
|
|
|
} |
91
|
|
|
|
92
|
|
|
protected function translateCommand($commandData) |
93
|
|
|
{ |
94
|
|
|
/* @var $commandData AMQPMessage */ |
95
|
|
|
$headers = $commandData->get('application_headers')->getNativeData(); |
96
|
|
|
if (empty($headers['x-class-name'])) { |
97
|
|
|
throw new InvalidArgumentException("Class name header 'x-class-name' not found"); |
98
|
|
|
} |
99
|
|
|
return $this->serializer->deserialize( |
|
|
|
|
100
|
|
|
$commandData->body, $headers['x-class-name'], 'json' |
101
|
|
|
); |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
public function getSubsystemName() |
105
|
|
|
{ |
106
|
|
|
return self::SUBSYSTEM_NAME; |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
/** |
110
|
|
|
* Send message for rescheduler, if maximum number of tries has not been exceeded. |
111
|
|
|
* |
112
|
|
|
* @param AMQPMessage $msg |
113
|
|
|
* @param Exception $e |
114
|
|
|
* @param CommandProcessorInterface $processor |
115
|
|
|
*/ |
116
|
|
|
private function maybeReschedule(AMQPMessage $msg, Exception $e, CommandProcessorInterface $processor = null) |
117
|
|
|
{ |
118
|
|
|
$triesNum = 10; |
119
|
|
|
$headers = $msg->get('application_headers')->getNativeData(); |
120
|
|
|
$retryCount = $this->getRetryCount($headers); |
121
|
|
|
$retry = ($retryCount < $triesNum - 1); |
122
|
|
|
$resheduleInS = (5 * $retryCount + 10); |
123
|
|
|
|
124
|
|
|
$this->logger->error( |
125
|
|
|
sprintf( |
126
|
|
|
'Error while executing processor (retry count: %d - %s): %s', $retryCount + 1, $retry ? 'retry in ' . $resheduleInS . 's' : 'reject', $e->getMessage() |
127
|
|
|
), array($e->getTraceAsString(), $this, $processor) |
128
|
|
|
); |
129
|
|
|
|
130
|
|
|
if ($retry) { |
131
|
|
|
$this->rescheduleProducer->publish( |
132
|
|
|
$msg->body, $msg->delivery_info['routing_key'], array_merge( |
|
|
|
|
133
|
|
|
$msg->get_properties(), array('expiration' => $resheduleInS * 1000) |
134
|
|
|
) |
135
|
|
|
); |
136
|
|
|
} |
137
|
|
|
} |
138
|
|
|
|
139
|
|
|
/** |
140
|
|
|
* Get retry count. |
141
|
|
|
* |
142
|
|
|
* @param array $headers |
143
|
|
|
* @return integer |
144
|
|
|
*/ |
145
|
|
|
private function getRetryCount($headers) |
146
|
|
|
{ |
147
|
|
|
if (!empty($headers['x-death'])) { |
148
|
|
|
if (!empty($headers['x-death'][0]['count'])) { |
149
|
|
|
$retryCount = $headers['x-death'][0]['count']; |
150
|
|
|
} else { |
151
|
|
|
$retryCount = count($headers['x-death']); |
152
|
|
|
} |
153
|
|
|
} else { |
154
|
|
|
$retryCount = 0; |
155
|
|
|
} |
156
|
|
|
return $retryCount; |
157
|
|
|
} |
158
|
|
|
|
159
|
|
|
} |
160
|
|
|
|
If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.
Let’s take a look at an example:
Our function
my_function
expects aPost
object, and outputs the author of the post. The base classPost
returns a simple string and outputting a simple string will work just fine. However, the child classBlogPost
which is a sub-type ofPost
instead decided to return anobject
, and is therefore violating the SOLID principles. If aBlogPost
were passed tomy_function
, PHP would not complain, but ultimately fail when executing thestrtoupper
call in its body.