Issues (8)

src/Message.php (1 issue)

Severity
1
<?php
2
3
/**
4
 * This file is part of amqp
5
 *
6
 * For the full copyright and license information, please view the LICENSE
7
 * file that was distributed with this source code.
8
 */
9
10
declare(strict_types=1);
11
12
namespace Slick\Amqp;
13
14
use JsonSerializable;
15
use OutOfBoundsException;
16
use PhpAmqpLib\Channel\AMQPChannel;
17
use PhpAmqpLib\Exception\AMQPEmptyDeliveryTagException;
18
use PhpAmqpLib\Message\AMQPMessage;
19
use ReflectionClass;
20
use ReflectionException;
21
use Stringable;
22
23
/**
24
 * Message
25
 *
26
 * @package Slick\Amqp
27
 */
28
class Message
29
{
30
31
    use MessageHeadersMethods;
32
33
    const DELIVERY_MODE    = 'delivery_mode';
34
    const TYPE             = 'type';
35
    const HEADERS          = 'application_headers';
36
    const CONTENT_TYPE     = 'content_type';
37
    const CONTENT_ENCODING = 'content_encoding';
38
    const MESSAGE_ID       = 'message_id';
39
    const CORRELATION_ID   = 'correlation_id';
40
    const REPLY_TO         = 'reply_to';
41
    const EXPIRATION       = 'expiration';
42
    const TIMESTAMP        = 'timestamp';
43
    const USER_ID          = 'user_id';
44
    const APP_ID           = 'app_id';
45
46
    const DELIVERY_MODE_PERSISTENT = AMQPMessage::DELIVERY_MODE_PERSISTENT;
47
    const DELIVERY_MODE_TRANSIENT  = AMQPMessage::DELIVERY_MODE_NON_PERSISTENT;
48
49
    /**
50
     * @var mixed
51
     */
52
    private mixed $body;
53
54
    private AMQPMessage $message;
55
56
    /**
57
     * @var mixed
58
     */
59
    private mixed $parsedContent = null;
60
61
    /** @var array<string, mixed>  */
62
    protected array $headers = [];
63
64
    /**
65
     * Creates a Message
66
     *
67
     * @param mixed $body
68
     * @param array<string, mixed> $properties
69
     */
70
    public function __construct(mixed $body, array $properties = [])
71
    {
72
        $this->body = $body;
73
        $baseProps = [];
74
        if ($contentType = $this->detectContentType($body)) {
75
            $baseProps[self::CONTENT_TYPE] = $contentType;
76
        }
77
        $this->message = new AMQPMessage($this->body, array_merge_recursive($baseProps, $properties));
78
    }
79
80
    /**
81
     * Creates a message from an AMQP message
82
     *
83
     * @param AMQPMessage $amqpMessage
84
     * @return Message
85
     * @throws ReflectionException
86
     */
87
    public static function fromAMQPMessage(AMQPMessage $amqpMessage): Message
88
    {
89
        $reflection = new ReflectionClass(static::class);
90
        /** @var Message $message */
91
        $message = $reflection->newInstanceWithoutConstructor();
92
        $message->message = $amqpMessage;
93
        $message->body = $amqpMessage->getBody();
94
        $message->parseContent();
95
96
        if ($amqpMessage->has(self::HEADERS)) {
97
            foreach ($amqpMessage->get(self::HEADERS) as $key => $value) {
98
                list(, $val) = $value;
99
                $message->headers[$key] = $val;
100
            }
101
        }
102
103
        return $message;
104
    }
105
106
    /**
107
     * payload
108
     *
109
     * @return mixed
110
     */
111
    public function body(): mixed
112
    {
113
        return $this->body;
114
    }
115
116
    /**
117
     * AMQP source Message
118
     *
119
     * @return AMQPMessage
120
     */
121
    public function sourceMessage(): AMQPMessage
122
    {
123
        return $this->message;
124
    }
125
126
    /**
127
     * Look for additional properties in the 'properties' dictionary,
128
     * and if present - the 'delivery_info' dictionary.
129
     *
130
     * @param string $property
131
     * @return mixed|AMQPChannel
132
     */
133
    public function get(string $property): mixed
134
    {
135
        try {
136
            return $this->message->get($property);
137
        } catch (OutOfBoundsException) {
138
            return null;
139
        }
140
    }
141
142
    /**
143
     * Sets a property value
144
     *
145
     * @param string $name The property name (one of the property definition)
146
     * @param mixed $value The property value
147
     * @return Message
148
     */
149
    public function set(string $name, mixed $value): self
150
    {
151
        $this->message->set($name, $value);
152
        return $this;
153
    }
154
155
    /**
156
     * Check whether a property exists in the 'properties' dictionary
157
     * or if present - in the 'delivery_info' dictionary.
158
     *
159
     * @param string $name
160
     * @return bool
161
     */
162
    public function has(string $name): bool
163
    {
164
        return $this->message->has($name);
165
    }
166
167
    /**
168
     * Used channel
169
     *
170
     * @return AMQPChannel|null
171
     */
172
    public function channel(): ?AMQPChannel
173
    {
174
        return $this->message->getChannel();
175
    }
176
177
    /**
178
     * Check if message is redelivered
179
     *
180
     * @return bool
181
     */
182
    public function isRedelivered(): bool
183
    {
184
        return (bool) $this->message->isRedelivered();
185
    }
186
187
    /**
188
     * The exchange name that routed this message
189
     *
190
     * @return string|null
191
     */
192
    public function exchange(): ?string
193
    {
194
        return $this->message->getExchange();
195
    }
196
197
    /**
198
     * Routing key string used on topic exchanges
199
     *
200
     * @return string|null
201
     */
202
    public function routingKey(): ?string
203
    {
204
        return $this->message->getRoutingKey();
205
    }
206
207
    /**
208
     * consumerTag
209
     *
210
     * @return string|null
211
     */
212
    public function consumerTag(): ?string
213
    {
214
        return $this->message->getConsumerTag();
215
    }
216
217
    /**
218
     * Sets message with consumer tag
219
     *
220
     * @param string $consumerTag
221
     * @return $this
222
     */
223
    public function withConsumerTag(string $consumerTag): self
224
    {
225
        $this->message->setConsumerTag($consumerTag);
226
        return $this;
227
    }
228
229
    /**
230
     * parsedBody
231
     *
232
     * @return mixed|object|string
233
     */
234
    public function parsedBody(): mixed
235
    {
236
        return $this->parsedContent;
237
    }
238
239
    /**
240
     * deliveryTag
241
     *
242
     * @return int
243
     */
244
    public function deliveryTag(): int
245
    {
246
        try {
247
            return $this->message->getDeliveryTag();
248
        } catch (AMQPEmptyDeliveryTagException) {
249
            return -1;
250
        }
251
    }
252
253
    /**
254
     * Detects content type
255
     *
256
     * @param mixed $content
257
     * @return string|null
258
     */
259
    private function detectContentType(mixed $content): ?string
260
    {
261
        if ($content instanceof JsonSerializable) {
262
            $encodeJson = json_encode($content);
263
            $this->parsedContent = is_string($encodeJson) ? json_decode($encodeJson) : null;
0 ignored issues
show
The condition is_string($encodeJson) is always true.
Loading history...
264
            $this->body = json_encode($content);
265
            return 'application/json';
266
        }
267
268
        if ($content instanceof Stringable) {
269
            $this->parsedContent = (string) $content;
270
            $this->body = $this->parsedContent;
271
            return 'text/plain';
272
        }
273
274
        return null;
275
    }
276
277
    protected function parseContent(): void
278
    {
279
        if (!$this->has(self::CONTENT_TYPE)) {
280
            $this->parsedContent = $this->body;
281
            return;
282
        }
283
284
        $regex = '/^.*(json).*$/i';
285
        if (!preg_match($regex, $this->get(self::CONTENT_TYPE))) {
286
            return;
287
        }
288
289
        $this->parsedContent = json_decode($this->message->getBody());
290
    }
291
}
292