|
1
|
|
|
<?php |
|
2
|
|
|
|
|
3
|
|
|
namespace Gendoria\CommandQueueRabbitMqDriverBundle\Worker; |
|
4
|
|
|
|
|
5
|
|
|
use Exception; |
|
6
|
|
|
use Gendoria\CommandQueue\CommandProcessor\CommandProcessorInterface; |
|
7
|
|
|
use Gendoria\CommandQueue\ProcessorFactoryInterface; |
|
8
|
|
|
use Gendoria\CommandQueue\ProcessorNotFoundException; |
|
9
|
|
|
use Gendoria\CommandQueue\Worker\Exception\ProcessorErrorException; |
|
10
|
|
|
use Gendoria\CommandQueue\Worker\Exception\TranslateErrorException; |
|
11
|
|
|
use Gendoria\CommandQueueBundle\Worker\BaseSymfonyWorker; |
|
12
|
|
|
use InvalidArgumentException; |
|
13
|
|
|
use JMS\Serializer\Serializer; |
|
14
|
|
|
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; |
|
15
|
|
|
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; |
|
16
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
|
17
|
|
|
use Psr\Log\LoggerInterface; |
|
18
|
|
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
|
19
|
|
|
|
|
20
|
|
|
/** |
|
21
|
|
|
* Command queue worker listening on commands form RabbitMQ channel. |
|
22
|
|
|
* |
|
23
|
|
|
* @author Tomasz Struczyński <[email protected]> |
|
24
|
|
|
*/ |
|
25
|
|
|
class RabbitMqWorker extends BaseSymfonyWorker implements ConsumerInterface |
|
26
|
|
|
{ |
|
27
|
|
|
|
|
28
|
|
|
/** |
|
29
|
|
|
* Rabbit MQ worker subsystem name. |
|
30
|
|
|
* |
|
31
|
|
|
* @var string |
|
32
|
|
|
*/ |
|
33
|
|
|
const SUBSYSTEM_NAME = "RabbitMqworker"; |
|
34
|
|
|
|
|
35
|
|
|
/** |
|
36
|
|
|
* Serializer instance. |
|
37
|
|
|
* |
|
38
|
|
|
* @var Serializer |
|
39
|
|
|
*/ |
|
40
|
|
|
private $serializer; |
|
41
|
|
|
|
|
42
|
|
|
/** |
|
43
|
|
|
* Reschedule producer instance. |
|
44
|
|
|
* |
|
45
|
|
|
* @var ProducerInterface |
|
46
|
|
|
*/ |
|
47
|
|
|
private $rescheduleProducer; |
|
48
|
|
|
|
|
49
|
|
|
/** |
|
50
|
|
|
* Class constructor. |
|
51
|
|
|
* |
|
52
|
|
|
* @param EventDispatcherInterface $eventDispatcher Symfony event dispatcher. |
|
53
|
|
|
* @param ProcessorFactoryInterface $processorFactory |
|
54
|
|
|
* @param Serializer $serializer |
|
55
|
|
|
* @param ProducerInterface $rescheduleProducer |
|
56
|
|
|
* @param LoggerInterface $logger Logger instance. |
|
57
|
|
|
*/ |
|
58
|
|
|
public function __construct(EventDispatcherInterface $eventDispatcher, ProcessorFactoryInterface $processorFactory, Serializer $serializer, ProducerInterface $rescheduleProducer, LoggerInterface $logger = null) |
|
59
|
|
|
{ |
|
60
|
|
|
parent::__construct($processorFactory, $eventDispatcher, $logger); |
|
61
|
|
|
|
|
62
|
|
|
$this->serializer = $serializer; |
|
63
|
|
|
$this->rescheduleProducer = $rescheduleProducer; |
|
64
|
|
|
} |
|
65
|
|
|
|
|
66
|
|
|
/** |
|
67
|
|
|
* Process single message received from RabbitMq server. |
|
68
|
|
|
* |
|
69
|
|
|
* @param AMQPMessage $msg |
|
70
|
|
|
* |
|
71
|
|
|
* @return null|integer Return code, dictating further message status. |
|
72
|
|
|
*/ |
|
73
|
|
|
public function execute(AMQPMessage $msg) |
|
74
|
|
|
{ |
|
75
|
|
|
//We try to process message. On known errors we try to reschedule. On unknown - we simply reject. |
|
76
|
|
|
try { |
|
77
|
|
|
$this->process($msg); |
|
78
|
|
|
} catch (ProcessorErrorException $e) { |
|
79
|
|
|
$this->maybeReschedule($msg, $e, $e->getProcessor()); |
|
80
|
|
|
return self::MSG_REJECT; |
|
81
|
|
|
} catch (ProcessorNotFoundException $e) { |
|
82
|
|
|
$this->maybeReschedule($msg, $e); |
|
83
|
|
|
return self::MSG_REJECT; |
|
84
|
|
|
} catch (TranslateErrorException $e) { |
|
85
|
|
|
$this->maybeReschedule($msg, $e); |
|
86
|
|
|
return self::MSG_REJECT; |
|
87
|
|
|
} catch (Exception $e) { |
|
88
|
|
|
return self::MSG_REJECT; |
|
89
|
|
|
} |
|
90
|
|
|
} |
|
91
|
|
|
|
|
92
|
|
|
protected function translateCommand($commandData) |
|
93
|
|
|
{ |
|
94
|
|
|
/* @var $commandData AMQPMessage */ |
|
95
|
|
|
$headers = $commandData->get('application_headers')->getNativeData(); |
|
96
|
|
|
if (empty($headers['x-class-name'])) { |
|
97
|
|
|
throw new InvalidArgumentException("Class name header 'x-class-name' not found"); |
|
98
|
|
|
} |
|
99
|
|
|
return $this->serializer->deserialize( |
|
|
|
|
|
|
100
|
|
|
$commandData->body, $headers['x-class-name'], 'json' |
|
101
|
|
|
); |
|
102
|
|
|
} |
|
103
|
|
|
|
|
104
|
|
|
public function getSubsystemName() |
|
105
|
|
|
{ |
|
106
|
|
|
return self::SUBSYSTEM_NAME; |
|
107
|
|
|
} |
|
108
|
|
|
|
|
109
|
|
|
/** |
|
110
|
|
|
* Send message for rescheduler, if maximum number of tries has not been exceeded. |
|
111
|
|
|
* |
|
112
|
|
|
* @param AMQPMessage $msg |
|
113
|
|
|
* @param Exception $e |
|
114
|
|
|
* @param CommandProcessorInterface $processor |
|
115
|
|
|
*/ |
|
116
|
|
|
private function maybeReschedule(AMQPMessage $msg, Exception $e, CommandProcessorInterface $processor = null) |
|
117
|
|
|
{ |
|
118
|
|
|
$triesNum = 10; |
|
119
|
|
|
$headers = $msg->get('application_headers')->getNativeData(); |
|
120
|
|
|
$retryCount = $this->getRetryCount($headers); |
|
121
|
|
|
$retry = ($retryCount < $triesNum - 1); |
|
122
|
|
|
$resheduleInS = (5 * $retryCount + 10); |
|
123
|
|
|
|
|
124
|
|
|
$this->logger->error( |
|
125
|
|
|
sprintf( |
|
126
|
|
|
'Error while executing processor (retry count: %d - %s): %s', $retryCount + 1, $retry ? 'retry in ' . $resheduleInS . 's' : 'reject', $e->getMessage() |
|
127
|
|
|
), array($e->getTraceAsString(), $this, $processor) |
|
128
|
|
|
); |
|
129
|
|
|
|
|
130
|
|
|
if ($retry) { |
|
131
|
|
|
$this->rescheduleProducer->publish( |
|
132
|
|
|
$msg->body, $msg->delivery_info['routing_key'], array_merge( |
|
|
|
|
|
|
133
|
|
|
$msg->get_properties(), array('expiration' => $resheduleInS * 1000) |
|
134
|
|
|
) |
|
135
|
|
|
); |
|
136
|
|
|
} |
|
137
|
|
|
} |
|
138
|
|
|
|
|
139
|
|
|
/** |
|
140
|
|
|
* Get retry count. |
|
141
|
|
|
* |
|
142
|
|
|
* @param array $headers |
|
143
|
|
|
* @return integer |
|
144
|
|
|
*/ |
|
145
|
|
|
private function getRetryCount($headers) |
|
146
|
|
|
{ |
|
147
|
|
|
if (!empty($headers['x-death'])) { |
|
148
|
|
|
if (!empty($headers['x-death'][0]['count'])) { |
|
149
|
|
|
$retryCount = $headers['x-death'][0]['count']; |
|
150
|
|
|
} else { |
|
151
|
|
|
$retryCount = count($headers['x-death']); |
|
152
|
|
|
} |
|
153
|
|
|
} else { |
|
154
|
|
|
$retryCount = 0; |
|
155
|
|
|
} |
|
156
|
|
|
return $retryCount; |
|
157
|
|
|
} |
|
158
|
|
|
|
|
159
|
|
|
} |
|
160
|
|
|
|
If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.
Let’s take a look at an example:
Our function
my_functionexpects aPostobject, and outputs the author of the post. The base classPostreturns a simple string and outputting a simple string will work just fine. However, the child classBlogPostwhich is a sub-type ofPostinstead decided to return anobject, and is therefore violating the SOLID principles. If aBlogPostwere passed tomy_function, PHP would not complain, but ultimately fail when executing thestrtouppercall in its body.