1
|
|
|
<?php |
2
|
|
|
namespace Ajir\RabbitMqSqlBundle\AMQP; |
3
|
|
|
|
4
|
|
|
use Ajir\RabbitMqSqlBundle\DataStructure\Message\AMQPMessageInterface; |
5
|
|
|
use Ajir\RabbitMqSqlBundle\DataTransformer\DataTransformerInterface; |
6
|
|
|
use Ajir\RabbitMqSqlBundle\Persister\PersisterInterface; |
7
|
|
|
use Exception; |
8
|
|
|
use InvalidArgumentException; |
9
|
|
|
use JMS\Serializer\SerializerInterface; |
10
|
|
|
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; |
11
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
12
|
|
|
use Psr\Log\LoggerInterface; |
13
|
|
|
|
14
|
|
|
/** |
15
|
|
|
* Class SqlConsumer |
16
|
|
|
* |
17
|
|
|
* @author Florian Ajir <[email protected]> |
18
|
|
|
*/ |
19
|
|
|
class SqlConsumer implements ConsumerInterface |
20
|
|
|
{ |
21
|
|
|
const DEFAULT_MESSAGE_CLASS = 'Ajir\RabbitMqSqlBundle\DataStructure\Message\AMQPMessage'; |
22
|
|
|
const JSON_FORMAT = 'json'; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* @var LoggerInterface |
26
|
|
|
*/ |
27
|
|
|
protected $logger; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* @var PersisterInterface |
31
|
|
|
*/ |
32
|
|
|
private $persister; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @var DataTransformerInterface |
36
|
|
|
*/ |
37
|
|
|
private $transformer; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* @var SerializerInterface |
41
|
|
|
*/ |
42
|
|
|
private $serializer; |
43
|
|
|
|
44
|
|
|
/** |
45
|
|
|
* @var array |
46
|
|
|
*/ |
47
|
|
|
private $ignoredTypes; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* @var string |
51
|
|
|
*/ |
52
|
|
|
private $msgClass; |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* @var string |
56
|
|
|
*/ |
57
|
|
|
private $format; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* @param DataTransformerInterface $transformer |
61
|
|
|
* @param PersisterInterface $persister |
62
|
|
|
* @param SerializerInterface $serializer |
63
|
|
|
* @param string[] $ignoredTypes |
64
|
|
|
* @param string $msgClass |
65
|
|
|
* @param string $format |
66
|
|
|
*/ |
67
|
4 |
|
public function __construct( |
68
|
|
|
DataTransformerInterface $transformer, |
69
|
|
|
PersisterInterface $persister, |
70
|
|
|
SerializerInterface $serializer, |
71
|
|
|
array $ignoredTypes = array(), |
72
|
|
|
$msgClass = self::DEFAULT_MESSAGE_CLASS, |
73
|
|
|
$format = self::JSON_FORMAT |
74
|
|
|
) |
75
|
|
|
{ |
76
|
4 |
|
$this->transformer = $transformer; |
77
|
4 |
|
$this->persister = $persister; |
78
|
4 |
|
$this->serializer = $serializer; |
79
|
4 |
|
$this->msgClass = $msgClass; |
80
|
4 |
|
$this->format = $format; |
81
|
4 |
|
$this->ignoredTypes = $ignoredTypes; |
82
|
4 |
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @param LoggerInterface $logger |
86
|
|
|
*/ |
87
|
4 |
|
public function setLogger(LoggerInterface $logger) |
88
|
|
|
{ |
89
|
4 |
|
$this->logger = $logger; |
90
|
4 |
|
} |
91
|
|
|
|
92
|
|
|
/** |
93
|
|
|
* @param AMQPMessage $message |
94
|
|
|
* |
95
|
|
|
* @return bool Execution status (true if everything's of, false if message should be re-queued) |
96
|
|
|
* @throws Exception on applicative error |
97
|
|
|
*/ |
98
|
4 |
|
public function execute(AMQPMessage $message) |
99
|
|
|
{ |
100
|
4 |
|
$message = $this->deserializeMessage($message); |
101
|
4 |
|
$this->logReceivedMessage($message); |
102
|
4 |
|
$consume = true; |
103
|
4 |
|
$type = $message->getType(); |
104
|
4 |
|
$data = $message->getData(); |
105
|
4 |
|
if (false === $this->isTypeIgnored($type)) { |
106
|
|
|
try { |
107
|
3 |
|
$assoc = json_decode($data, true); |
108
|
3 |
|
$data = $this->transformer->prepare($type, $assoc); |
109
|
1 |
|
$this->persister->persist($data); |
110
|
2 |
|
} catch (InvalidArgumentException $exception) { |
111
|
1 |
|
$this->logMessageInvalid($message, $exception); |
112
|
1 |
|
$consume = false; |
113
|
1 |
|
} catch (Exception $exception) { |
114
|
1 |
|
$this->logMessageError($message, $exception); |
115
|
1 |
|
throw $exception; |
116
|
|
|
} |
117
|
|
|
} |
118
|
|
|
|
119
|
3 |
|
return $consume; |
120
|
|
|
} |
121
|
|
|
|
122
|
|
|
/** |
123
|
|
|
* @param string $type |
124
|
|
|
* @return bool |
125
|
|
|
*/ |
126
|
4 |
|
private function isTypeIgnored($type) |
127
|
|
|
{ |
128
|
4 |
|
return in_array(strtolower($type), $this->ignoredTypes); |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* @param AMQPMessage $message |
133
|
|
|
* @return AMQPMessageInterface |
134
|
|
|
*/ |
135
|
4 |
|
private function deserializeMessage(AMQPMessage $message) |
136
|
|
|
{ |
137
|
|
|
return $this |
|
|
|
|
138
|
4 |
|
->serializer |
139
|
4 |
|
->deserialize( |
140
|
4 |
|
$message->getBody(), |
141
|
4 |
|
$this->msgClass, |
142
|
4 |
|
$this->format |
143
|
|
|
); |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
/** |
147
|
|
|
* @param AMQPMessageInterface $message |
148
|
|
|
*/ |
149
|
4 |
|
private function logReceivedMessage(AMQPMessageInterface $message) |
150
|
|
|
{ |
151
|
4 |
|
if (null !== $this->logger) { |
152
|
|
|
// log message |
153
|
4 |
|
$this->logger->info( |
154
|
4 |
|
'Message received from SQL Consumer', |
155
|
|
|
array( |
156
|
4 |
|
'type' => $message->getType(), |
157
|
4 |
|
'data' => $message->getData() |
158
|
|
|
) |
159
|
|
|
); |
160
|
|
|
} |
161
|
4 |
|
} |
162
|
|
|
|
163
|
|
|
/** |
164
|
|
|
* @param AMQPMessageInterface $message |
165
|
|
|
* @param InvalidArgumentException $exception |
166
|
|
|
*/ |
167
|
1 |
View Code Duplication |
private function logMessageInvalid(AMQPMessageInterface $message, InvalidArgumentException $exception) |
|
|
|
|
168
|
|
|
{ |
169
|
1 |
|
if (null !== $this->logger) { |
170
|
1 |
|
$this->logger->warning( |
171
|
1 |
|
'Message invalid', |
172
|
|
|
array( |
173
|
1 |
|
'type' => $message->getType(), |
174
|
1 |
|
'data' => $message->getData(), |
175
|
1 |
|
'exception' => $exception |
176
|
|
|
) |
177
|
|
|
); |
178
|
|
|
} |
179
|
1 |
|
} |
180
|
|
|
|
181
|
|
|
/** |
182
|
|
|
* @param AMQPMessageInterface $message |
183
|
|
|
* @param Exception $exception |
184
|
|
|
*/ |
185
|
1 |
View Code Duplication |
private function logMessageError(AMQPMessageInterface $message, Exception $exception) |
|
|
|
|
186
|
|
|
{ |
187
|
1 |
|
if (null !== $this->logger) { |
188
|
1 |
|
$this->logger->error( |
189
|
1 |
|
'Consumer SQL Exception', |
190
|
|
|
array( |
191
|
1 |
|
'type' => $message->getType(), |
192
|
1 |
|
'data' => $message->getData(), |
193
|
1 |
|
'exception' => $exception |
194
|
|
|
) |
195
|
|
|
); |
196
|
|
|
} |
197
|
1 |
|
} |
198
|
|
|
} |
199
|
|
|
|
If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.
Let’s take a look at an example:
Our function
my_function
expects aPost
object, and outputs the author of the post. The base classPost
returns a simple string and outputting a simple string will work just fine. However, the child classBlogPost
which is a sub-type ofPost
instead decided to return anobject
, and is therefore violating the SOLID principles. If aBlogPost
were passed tomy_function
, PHP would not complain, but ultimately fail when executing thestrtoupper
call in its body.