1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
namespace Simple\Queue\Transport; |
6
|
|
|
|
7
|
|
|
use Throwable; |
8
|
|
|
use Ramsey\Uuid\Uuid; |
9
|
|
|
use DateTimeImmutable; |
10
|
|
|
use Simple\Queue\Status; |
11
|
|
|
use Simple\Queue\Message; |
12
|
|
|
use Doctrine\DBAL\Connection; |
13
|
|
|
use Doctrine\DBAL\Types\Types; |
14
|
|
|
use Simple\Queue\MessageHydrator; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* Class DoctrineDbalTransport |
18
|
|
|
* @package Simple\Queue\Transport |
19
|
|
|
*/ |
20
|
|
|
class DoctrineDbalTransport implements TransportInterface |
21
|
|
|
{ |
22
|
|
|
/** @var Connection */ |
23
|
|
|
private Connection $connection; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* DoctrineDbalTransport constructor. |
27
|
|
|
* @param Connection $connection |
28
|
|
|
*/ |
29
|
26 |
|
public function __construct(Connection $connection) |
30
|
|
|
{ |
31
|
26 |
|
$this->connection = $connection; |
32
|
26 |
|
} |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @inheritDoc |
36
|
|
|
*/ |
37
|
1 |
|
public function init(): void |
38
|
|
|
{ |
39
|
1 |
|
(new DoctrineDbalTableCreator($this->connection))->createDataBaseTable(); |
40
|
1 |
|
} |
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* @inheritDoc |
44
|
|
|
* @throws TransportException |
45
|
|
|
*/ |
46
|
3 |
|
public function fetchMessage(array $queues = []): ?Message |
47
|
|
|
{ |
48
|
3 |
|
$nowTime = time(); |
49
|
3 |
|
$endAt = microtime(true) + 0.2; // add 200ms |
50
|
|
|
|
51
|
3 |
|
$select = $this->connection->createQueryBuilder() |
52
|
3 |
|
->select('*') |
53
|
3 |
|
->from(DoctrineDbalTableCreator::getTableName()) |
54
|
3 |
|
->andWhere('status IN (:statuses)') |
55
|
3 |
|
->andWhere('redelivered_at IS NULL OR redelivered_at <= :redeliveredAt') |
56
|
3 |
|
->andWhere('exact_time <= :nowTime') |
57
|
3 |
|
->addOrderBy('priority', 'asc') |
58
|
3 |
|
->addOrderBy('created_at', 'asc') |
59
|
3 |
|
->setParameter('redeliveredAt', (new DateTimeImmutable('now'))->format('Y-m-d H:i:s'), Types::STRING) |
60
|
3 |
|
->setParameter('statuses', [Status::NEW, Status::REDELIVERED], Connection::PARAM_STR_ARRAY) |
61
|
3 |
|
->setParameter('nowTime', $nowTime, Types::INTEGER) |
62
|
3 |
|
->setMaxResults(1); |
63
|
|
|
|
64
|
3 |
|
if (count($queues)) { |
65
|
|
|
$select |
66
|
3 |
|
->where('queue IN (:queues)') |
67
|
3 |
|
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY); |
68
|
|
|
} |
69
|
|
|
|
70
|
3 |
|
while (microtime(true) < $endAt) { |
71
|
|
|
try { |
72
|
3 |
|
$deliveredMessage = $select->execute()->fetchAssociative(); |
|
|
|
|
73
|
|
|
|
74
|
3 |
|
if (empty($deliveredMessage)) { |
75
|
1 |
|
continue; |
76
|
|
|
} |
77
|
|
|
|
78
|
2 |
|
return MessageHydrator::createMessage($deliveredMessage); |
79
|
1 |
|
} catch (Throwable $e) { |
80
|
1 |
|
throw new TransportException(sprintf('Error reading queue in consumer: "%s".', $e->getMessage()), 0, $e); |
81
|
|
|
} |
82
|
|
|
} |
83
|
|
|
|
84
|
1 |
|
return null; |
85
|
|
|
} |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* @inheritDoc |
89
|
|
|
* @throws TransportException |
90
|
|
|
*/ |
91
|
6 |
|
public function send(Message $message): void |
92
|
|
|
{ |
93
|
|
|
$dataMessage = [ |
94
|
6 |
|
'id' => Uuid::uuid4()->toString(), |
95
|
6 |
|
'status' => $message->getStatus(), |
96
|
6 |
|
'created_at' => $message->getCreatedAt()->format('Y-m-d H:i:s'), |
97
|
6 |
|
'redelivered_at' => $message->getRedeliveredAt() ? $message->getRedeliveredAt()->format('Y-m-d H:i:s') : null, |
98
|
6 |
|
'attempts' => $message->getAttempts(), |
99
|
6 |
|
'queue' => $message->getQueue(), |
100
|
6 |
|
'event' => $message->getEvent(), |
101
|
6 |
|
'is_job' => $message->isJob(), |
102
|
6 |
|
'body' => $message->getBody(), |
103
|
6 |
|
'priority' => $message->getPriority(), |
104
|
6 |
|
'error' => $message->getError(), |
105
|
6 |
|
'exact_time' => $message->getExactTime(), |
106
|
|
|
]; |
107
|
|
|
try { |
108
|
6 |
|
$rowsAffected = $this->connection->insert(DoctrineDbalTableCreator::getTableName(), $dataMessage, [ |
109
|
6 |
|
'id' => Types::GUID, |
110
|
6 |
|
'status' => Types::STRING, |
111
|
6 |
|
'created_at' => Types::STRING, |
112
|
6 |
|
'redelivered_at' => Types::STRING, |
113
|
6 |
|
'attempts' => Types::SMALLINT, |
114
|
6 |
|
'queue' => Types::STRING, |
115
|
6 |
|
'event' => Types::STRING, |
116
|
6 |
|
'is_job' => Types::BOOLEAN, |
117
|
6 |
|
'body' => Types::TEXT, |
118
|
6 |
|
'priority' => Types::SMALLINT, |
119
|
6 |
|
'error' => Types::TEXT, |
120
|
6 |
|
'exact_time' => Types::BIGINT, |
121
|
|
|
]); |
122
|
6 |
|
if ($rowsAffected !== 1) { |
123
|
6 |
|
throw new TransportException('The message was not enqueued. Dbal did not confirm that the record is inserted.'); |
124
|
|
|
} |
125
|
1 |
|
} catch (Throwable $e) { |
126
|
1 |
|
throw new TransportException(sprintf('The transport fails to send the message: %s', $e->getMessage()), 0, $e); |
127
|
|
|
} |
128
|
5 |
|
} |
129
|
|
|
|
130
|
|
|
/** |
131
|
|
|
* @inheritDoc |
132
|
|
|
* @throws TransportException |
133
|
|
|
*/ |
134
|
2 |
|
public function changeMessageStatus(Message $message, Status $status): void |
135
|
|
|
{ |
136
|
2 |
|
$this->connection->update( |
137
|
2 |
|
DoctrineDbalTableCreator::getTableName(), |
138
|
2 |
|
['status' => (string)$status], |
139
|
2 |
|
['id' => $message->getId()] |
140
|
|
|
); |
141
|
|
|
|
142
|
1 |
|
MessageHydrator::changeProperty($message, 'status', $status); |
143
|
1 |
|
} |
144
|
|
|
|
145
|
|
|
/** |
146
|
|
|
* @inheritDoc |
147
|
|
|
* @throws TransportException |
148
|
|
|
*/ |
149
|
2 |
|
public function deleteMessage(Message $message): void |
150
|
|
|
{ |
151
|
2 |
|
$this->connection->delete( |
152
|
2 |
|
DoctrineDbalTableCreator::getTableName(), |
153
|
2 |
|
['id' => $message->getId()], |
154
|
1 |
|
['id' => Types::GUID] |
155
|
|
|
); |
156
|
1 |
|
} |
157
|
|
|
} |
158
|
|
|
|
This function has been deprecated. The supplier of the function has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead.