Passed
Push — master ( 5bd05b...9a2fe3 )
by
unknown
48s queued 11s
created

DoctrineDbalStore::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 2
Bugs 1 Features 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 2
b 1
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Simple\Queue\Store;
6
7
use Throwable;
8
use Ramsey\Uuid\Uuid;
9
use DateTimeImmutable;
10
use Simple\Queue\Status;
11
use Simple\Queue\Message;
12
use Doctrine\DBAL\Connection;
13
use Doctrine\DBAL\Types\Types;
14
use Simple\Queue\MessageHydrator;
15
16
/**
17
 * Class DoctrineDbalStore
18
 * @package Simple\Queue\Store
19
 */
20
class DoctrineDbalStore implements StoreInterface
21
{
22
    /** @var Connection */
23
    private Connection $connection;
24
25
    /**
26
     * DoctrineDbalStore constructor.
27
     * @param Connection $connection
28
     */
29 20
    public function __construct(Connection $connection)
30
    {
31 20
        $this->connection = $connection;
32 20
    }
33
34
    /**
35
     * @inheritDoc
36
     */
37 1
    public function init(): void
38
    {
39 1
        (new DoctrineDbalTableCreator($this->connection))->createDataBaseTable();
40 1
    }
41
42
    /**
43
     * @inheritDoc
44
     * @throws StoreException
45
     */
46
    public function fetchMessage(array $queues = []): ?Message
47
    {
48
        $nowTime = time();
49
        $endAt = microtime(true) + 0.2; // add 200ms
50
51
        $select = $this->connection->createQueryBuilder()
52
            ->select('*')
53
            ->from(DoctrineDbalTableCreator::getTableName())
54
            ->andWhere('status IN (:statuses)')
55
            ->andWhere('redelivered_at IS NULL OR redelivered_at <= :redeliveredAt')
56
            ->andWhere('exact_time <= :nowTime')
57
            ->addOrderBy('priority', 'asc')
58
            ->addOrderBy('created_at', 'asc')
59
            ->setParameter('redeliveredAt', new DateTimeImmutable('now'), Types::DATETIME_IMMUTABLE)
60
            ->setParameter('statuses', [Status::NEW, Status::REDELIVERED], Connection::PARAM_STR_ARRAY)
61
            ->setParameter('nowTime', $nowTime, Types::INTEGER)
62
            ->setMaxResults(1);
63
64
        if (count($queues)) {
65
            $select
66
                ->where('queue IN (:queues)')
67
                ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY);
68
        }
69
70
        while (microtime(true) < $endAt) {
71
            try {
72
                $deliveredMessage = $select->execute()->fetchAssociative();
73
74
                if (empty($deliveredMessage)) {
75
                    continue;
76
                }
77
78
                return MessageHydrator::createMessage($deliveredMessage);
79
            } catch (Throwable $e) {
80
                throw new StoreException(sprintf('Error reading queue in consumer: "%s".', $e));
81
            }
82
        }
83
84
        return null;
85
    }
86
87
    /**
88
     * @inheritDoc
89
     * @throws StoreException
90
     */
91 4
    public function send(Message $message): void
92
    {
93
        $dataMessage = [
94 4
            'id' => Uuid::uuid4()->toString(),
95 4
            'status' => $message->getStatus(),
96 4
            'created_at' => $message->getCreatedAt(),
97 4
            'redelivered_at' => $message->getRedeliveredAt(),
98 4
            'attempts' => $message->getAttempts(),
99 4
            'queue' => $message->getQueue(),
100 4
            'event' => $message->getEvent(),
101 4
            'is_job' => $message->isJob(),
102 4
            'body' => $message->getBody(),
103 4
            'priority' => $message->getPriority(),
104 4
            'error' => $message->getError(),
105 4
            'exact_time' => $message->getExactTime(),
106
        ];
107
        try {
108 4
            $rowsAffected = $this->connection->insert(DoctrineDbalTableCreator::getTableName(), $dataMessage, [
109 4
                'id' => Types::GUID,
110 4
                'status' => Types::STRING,
111 4
                'created_at' => Types::DATETIME_IMMUTABLE,
112 4
                'redelivered_at' => Types::DATETIME_IMMUTABLE,
113 4
                'attempts' => Types::SMALLINT,
114 4
                'queue' => Types::STRING,
115 4
                'event' => Types::STRING,
116 4
                'is_job' => Types::BOOLEAN,
117 4
                'body' => Types::TEXT,
118 4
                'priority' => Types::SMALLINT,
119 4
                'error' => Types::TEXT,
120 4
                'exact_time' => Types::BIGINT,
121
            ]);
122 4
            if ($rowsAffected !== 1) {
123 4
                throw new StoreException('The message was not enqueued. Dbal did not confirm that the record is inserted.');
124
            }
125 1
        } catch (Throwable $e) {
126 1
            throw new StoreException('The transport fails to send the message due to some internal error.', 0, $e);
127
        }
128 3
    }
129
130
    /**
131
     * @inheritDoc
132
     * @throws StoreException
133
     */
134 2
    public function changeMessageStatus(Message $message, Status $status): void
135
    {
136 2
        $this->connection->update(
137 2
            DoctrineDbalTableCreator::getTableName(),
138 2
            ['status' => (string)$status],
139 2
            ['id' => $message->getId()]
140
        );
141
142 1
        MessageHydrator::changeProperty($message, 'status', $status);
143 1
    }
144
145
    /**
146
     * @inheritDoc
147
     * @throws StoreException
148
     */
149 2
    public function deleteMessage(Message $message): void
150
    {
151 2
        $this->connection->delete(
152 2
            DoctrineDbalTableCreator::getTableName(),
153 2
            ['id' => $message->getId()],
154 1
            ['id' => Types::GUID]
155
        );
156 1
    }
157
}
158