umbrellio /
php-table-sync
| 1 | <?php |
||||
| 2 | |||||
| 3 | declare(strict_types=1); |
||||
| 4 | |||||
| 5 | namespace Umbrellio\TableSync\Rabbit; |
||||
| 6 | |||||
| 7 | use PhpAmqpLib\Message\AMQPMessage; |
||||
| 8 | use PhpAmqpLib\Wire\AMQPArray; |
||||
| 9 | use PhpAmqpLib\Wire\AMQPTable; |
||||
| 10 | use function Safe\json_decode; |
||||
| 11 | use Umbrellio\TableSync\Messages\PublishMessage; |
||||
| 12 | use Umbrellio\TableSync\Messages\ReceivedMessage; |
||||
| 13 | |||||
| 14 | class MessageBuilder |
||||
| 15 | { |
||||
| 16 | public const EVENT_NAME = 'table_sync'; |
||||
| 17 | private $publishMessageConfig; |
||||
| 18 | |||||
| 19 | 11 | public function __construct(Config\PublishMessage $publishMessageConfig) |
|||
| 20 | { |
||||
| 21 | 11 | $this->publishMessageConfig = $publishMessageConfig; |
|||
| 22 | } |
||||
| 23 | |||||
| 24 | public function buildReceivedMessage(AMQPMessage $message): ReceivedMessage |
||||
| 25 | { |
||||
| 26 | [ |
||||
| 27 | 'event' => $event, |
||||
| 28 | 'model' => $model, |
||||
| 29 | 'attributes' => $attributes, |
||||
| 30 | 'version' => $version, |
||||
| 31 | 'metadata' => $metadata, |
||||
| 32 | ] = json_decode($message->body, true); |
||||
| 33 | |||||
| 34 | $appId = $message->get('app_id'); |
||||
| 35 | $headers = $message->has('application_headers') |
||||
| 36 | ? $this->headersToArray($message->get('application_headers')) |
||||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||
| 37 | : []; |
||||
| 38 | |||||
| 39 | return new ReceivedMessage($event, $model, $attributes, $version, $metadata, $appId, $headers); |
||||
|
0 ignored issues
–
show
It seems like
$appId can also be of type PhpAmqpLib\Channel\AMQPChannel; however, parameter $appId of Umbrellio\TableSync\Mess...dMessage::__construct() does only seem to accept string, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 40 | } |
||||
| 41 | |||||
| 42 | 11 | public function buildForPublishing(PublishMessage $message): AMQPMessage |
|||
| 43 | { |
||||
| 44 | 11 | return new AMQPMessage( |
|||
| 45 | 11 | $this->buildBody($message), |
|||
| 46 | 11 | [ |
|||
| 47 | 11 | 'content_type' => 'application/json', |
|||
| 48 | 11 | 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, |
|||
| 49 | 11 | 'app_id' => $this->publishMessageConfig->appId(), |
|||
| 50 | 11 | 'type' => self::EVENT_NAME, |
|||
| 51 | 11 | 'application_headers' => $this->publishMessageConfig->headers(), |
|||
| 52 | 11 | ] |
|||
| 53 | 11 | ); |
|||
| 54 | } |
||||
| 55 | |||||
| 56 | private function headersToArray(AMQPTable $table): array |
||||
| 57 | { |
||||
| 58 | $headers = []; |
||||
| 59 | foreach ($table as $key => $value) { |
||||
| 60 | if ($value[1] instanceof AMQPArray) { |
||||
| 61 | $headers[$key] = $value[1]->getNativeData(); |
||||
| 62 | } else { |
||||
| 63 | $headers[$key] = $value[1]; |
||||
| 64 | } |
||||
| 65 | } |
||||
| 66 | |||||
| 67 | return $headers; |
||||
| 68 | } |
||||
| 69 | |||||
| 70 | 11 | private function buildBody(PublishMessage $message): string |
|||
| 71 | { |
||||
| 72 | 11 | $event = $message->isDestroyed() ? 'destroy' : 'update'; |
|||
| 73 | |||||
| 74 | 11 | $data = [ |
|||
| 75 | 11 | 'event' => $event, |
|||
| 76 | 11 | 'model' => $message->className(), |
|||
| 77 | 11 | 'attributes' => $message->attributes(), |
|||
| 78 | 11 | 'version' => microtime(true), |
|||
| 79 | 11 | 'metadata' => [ |
|||
| 80 | 11 | 'created' => $message->isCreated(), |
|||
| 81 | 11 | ], |
|||
| 82 | 11 | ]; |
|||
| 83 | 11 | return json_encode($data); |
|||
| 84 | } |
||||
| 85 | } |
||||
| 86 |