DoctrineDbalTransport::deleteMessage()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 1
dl 0
loc 6
ccs 5
cts 5
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Simple\Queue\Transport;
6
7
use Throwable;
8
use Ramsey\Uuid\Uuid;
9
use DateTimeImmutable;
10
use Simple\Queue\Status;
11
use Simple\Queue\Message;
12
use Doctrine\DBAL\Connection;
13
use Doctrine\DBAL\Types\Types;
14
use Simple\Queue\MessageHydrator;
15
16
/**
17
 * Class DoctrineDbalTransport
18
 * @package Simple\Queue\Transport
19
 */
20
class DoctrineDbalTransport implements TransportInterface
21
{
22
    /** @var Connection */
23
    private Connection $connection;
24
25
    /**
26
     * DoctrineDbalTransport constructor.
27
     * @param Connection $connection
28
     */
29 26
    public function __construct(Connection $connection)
30
    {
31 26
        $this->connection = $connection;
32 26
    }
33
34
    /**
35
     * @inheritDoc
36
     */
37 1
    public function init(): void
38
    {
39 1
        (new DoctrineDbalTableCreator($this->connection))->createDataBaseTable();
40 1
    }
41
42
    /**
43
     * @inheritDoc
44
     * @throws TransportException
45
     */
46 3
    public function fetchMessage(array $queues = []): ?Message
47
    {
48 3
        $nowTime = time();
49 3
        $endAt = microtime(true) + 0.2; // add 200ms
50
51 3
        $select = $this->connection->createQueryBuilder()
52 3
            ->select('*')
53 3
            ->from(DoctrineDbalTableCreator::getTableName())
54 3
            ->andWhere('status IN (:statuses)')
55 3
            ->andWhere('redelivered_at IS NULL OR redelivered_at <= :redeliveredAt')
56 3
            ->andWhere('exact_time <= :nowTime')
57 3
            ->addOrderBy('priority', 'asc')
58 3
            ->addOrderBy('created_at', 'asc')
59 3
            ->setParameter('redeliveredAt', (new DateTimeImmutable('now'))->format('Y-m-d H:i:s'), Types::STRING)
60 3
            ->setParameter('statuses', [Status::NEW, Status::REDELIVERED], Connection::PARAM_STR_ARRAY)
61 3
            ->setParameter('nowTime', $nowTime, Types::INTEGER)
62 3
            ->setMaxResults(1);
63
64 3
        if (count($queues)) {
65
            $select
66 3
                ->where('queue IN (:queues)')
67 3
                ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY);
68
        }
69
70 3
        while (microtime(true) < $endAt) {
71
            try {
72 3
                $deliveredMessage = $select->execute()->fetchAssociative();
0 ignored issues
show
Deprecated Code introduced by
The function Doctrine\DBAL\Query\QueryBuilder::execute() has been deprecated: Use {@link executeQuery()} or {@link executeStatement()} instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

72
                $deliveredMessage = /** @scrutinizer ignore-deprecated */ $select->execute()->fetchAssociative();

This function has been deprecated. The supplier of the function has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead.

Loading history...
73
74 3
                if (empty($deliveredMessage)) {
75 1
                    continue;
76
                }
77
78 2
                return MessageHydrator::createMessage($deliveredMessage);
79 1
            } catch (Throwable $e) {
80 1
                throw new TransportException(sprintf('Error reading queue in consumer: "%s".', $e->getMessage()), 0, $e);
81
            }
82
        }
83
84 1
        return null;
85
    }
86
87
    /**
88
     * @inheritDoc
89
     * @throws TransportException
90
     */
91 6
    public function send(Message $message): void
92
    {
93
        $dataMessage = [
94 6
            'id' => Uuid::uuid4()->toString(),
95 6
            'status' => $message->getStatus(),
96 6
            'created_at' => $message->getCreatedAt()->format('Y-m-d H:i:s'),
97 6
            'redelivered_at' => $message->getRedeliveredAt() ? $message->getRedeliveredAt()->format('Y-m-d H:i:s') : null,
98 6
            'attempts' => $message->getAttempts(),
99 6
            'queue' => $message->getQueue(),
100 6
            'event' => $message->getEvent(),
101 6
            'is_job' => $message->isJob(),
102 6
            'body' => $message->getBody(),
103 6
            'priority' => $message->getPriority(),
104 6
            'error' => $message->getError(),
105 6
            'exact_time' => $message->getExactTime(),
106
        ];
107
        try {
108 6
            $rowsAffected = $this->connection->insert(DoctrineDbalTableCreator::getTableName(), $dataMessage, [
109 6
                'id' => Types::GUID,
110 6
                'status' => Types::STRING,
111 6
                'created_at' => Types::STRING,
112 6
                'redelivered_at' => Types::STRING,
113 6
                'attempts' => Types::SMALLINT,
114 6
                'queue' => Types::STRING,
115 6
                'event' => Types::STRING,
116 6
                'is_job' => Types::BOOLEAN,
117 6
                'body' => Types::TEXT,
118 6
                'priority' => Types::SMALLINT,
119 6
                'error' => Types::TEXT,
120 6
                'exact_time' => Types::BIGINT,
121
            ]);
122 6
            if ($rowsAffected !== 1) {
123 6
                throw new TransportException('The message was not enqueued. Dbal did not confirm that the record is inserted.');
124
            }
125 1
        } catch (Throwable $e) {
126 1
            throw new TransportException(sprintf('The transport fails to send the message: %s', $e->getMessage()), 0, $e);
127
        }
128 5
    }
129
130
    /**
131
     * @inheritDoc
132
     * @throws TransportException
133
     */
134 2
    public function changeMessageStatus(Message $message, Status $status): void
135
    {
136 2
        $this->connection->update(
137 2
            DoctrineDbalTableCreator::getTableName(),
138 2
            ['status' => (string)$status],
139 2
            ['id' => $message->getId()]
140
        );
141
142 1
        MessageHydrator::changeProperty($message, 'status', $status);
143 1
    }
144
145
    /**
146
     * @inheritDoc
147
     * @throws TransportException
148
     */
149 2
    public function deleteMessage(Message $message): void
150
    {
151 2
        $this->connection->delete(
152 2
            DoctrineDbalTableCreator::getTableName(),
153 2
            ['id' => $message->getId()],
154 1
            ['id' => Types::GUID]
155
        );
156 1
    }
157
}
158