1 | <?php |
||||
2 | |||||
3 | declare(strict_types=1); |
||||
4 | |||||
5 | namespace Umbrellio\TableSync\Rabbit; |
||||
6 | |||||
7 | use PhpAmqpLib\Message\AMQPMessage; |
||||
8 | use PhpAmqpLib\Wire\AMQPArray; |
||||
9 | use PhpAmqpLib\Wire\AMQPTable; |
||||
10 | use function Safe\json_decode; |
||||
11 | use Umbrellio\TableSync\Messages\PublishMessage; |
||||
12 | use Umbrellio\TableSync\Messages\ReceivedMessage; |
||||
13 | |||||
14 | class MessageBuilder |
||||
15 | { |
||||
16 | public const EVENT_NAME = 'table_sync'; |
||||
17 | private $publishMessageConfig; |
||||
18 | |||||
19 | 11 | public function __construct(Config\PublishMessage $publishMessageConfig) |
|||
20 | { |
||||
21 | 11 | $this->publishMessageConfig = $publishMessageConfig; |
|||
22 | } |
||||
23 | |||||
24 | public function buildReceivedMessage(AMQPMessage $message): ReceivedMessage |
||||
25 | { |
||||
26 | [ |
||||
27 | 'event' => $event, |
||||
28 | 'model' => $model, |
||||
29 | 'attributes' => $attributes, |
||||
30 | 'version' => $version, |
||||
31 | 'metadata' => $metadata, |
||||
32 | ] = json_decode($message->body, true); |
||||
33 | |||||
34 | $appId = $message->get('app_id'); |
||||
35 | $headers = $message->has('application_headers') |
||||
36 | ? $this->headersToArray($message->get('application_headers')) |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
37 | : []; |
||||
38 | |||||
39 | return new ReceivedMessage($event, $model, $attributes, $version, $metadata, $appId, $headers); |
||||
0 ignored issues
–
show
It seems like
$appId can also be of type PhpAmqpLib\Channel\AMQPChannel ; however, parameter $appId of Umbrellio\TableSync\Mess...dMessage::__construct() does only seem to accept string , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
40 | } |
||||
41 | |||||
42 | 11 | public function buildForPublishing(PublishMessage $message): AMQPMessage |
|||
43 | { |
||||
44 | 11 | return new AMQPMessage( |
|||
45 | 11 | $this->buildBody($message), |
|||
46 | 11 | [ |
|||
47 | 11 | 'content_type' => 'application/json', |
|||
48 | 11 | 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, |
|||
49 | 11 | 'app_id' => $this->publishMessageConfig->appId(), |
|||
50 | 11 | 'type' => self::EVENT_NAME, |
|||
51 | 11 | 'application_headers' => $this->publishMessageConfig->headers(), |
|||
52 | 11 | ] |
|||
53 | 11 | ); |
|||
54 | } |
||||
55 | |||||
56 | private function headersToArray(AMQPTable $table): array |
||||
57 | { |
||||
58 | $headers = []; |
||||
59 | foreach ($table as $key => $value) { |
||||
60 | if ($value[1] instanceof AMQPArray) { |
||||
61 | $headers[$key] = $value[1]->getNativeData(); |
||||
62 | } else { |
||||
63 | $headers[$key] = $value[1]; |
||||
64 | } |
||||
65 | } |
||||
66 | |||||
67 | return $headers; |
||||
68 | } |
||||
69 | |||||
70 | 11 | private function buildBody(PublishMessage $message): string |
|||
71 | { |
||||
72 | 11 | $event = $message->isDestroyed() ? 'destroy' : 'update'; |
|||
73 | |||||
74 | 11 | $data = [ |
|||
75 | 11 | 'event' => $event, |
|||
76 | 11 | 'model' => $message->className(), |
|||
77 | 11 | 'attributes' => $message->attributes(), |
|||
78 | 11 | 'version' => microtime(true), |
|||
79 | 11 | 'metadata' => [ |
|||
80 | 11 | 'created' => $message->isCreated(), |
|||
81 | 11 | ], |
|||
82 | 11 | ]; |
|||
83 | 11 | return json_encode($data); |
|||
84 | } |
||||
85 | } |
||||
86 |