Completed
Push — master ( 51c8d9...38822b )
by Lexey
02:42
created

QueueService::commit()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Lamoda\QueueBundle\Service;
6
7
use Exception;
8
use Lamoda\QueueBundle\ConstantMessage;
9
use Lamoda\QueueBundle\Entity\QueueEntityInterface;
10
use Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass;
11
use Lamoda\QueueBundle\Entity\QueueRepository;
12
use Lamoda\QueueBundle\Event\QueueAttemptsReachedEvent;
13
use Lamoda\QueueBundle\Exception\AttemptsReachedException;
14
use Lamoda\QueueBundle\Exception\UnexpectedValueException;
15
use Lamoda\QueueBundle\Factory\EntityFactoryInterface;
16
use Lamoda\QueueBundle\QueueInterface;
17
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
18
19
class QueueService
20
{
21
    /** @var QueueRepository */
22
    protected $repository;
23
24
    /** @var EntityFactoryInterface */
25
    protected $entityFactory;
26
27
    /** @var EventDispatcherInterface */
28
    protected $eventDispatcher;
29
30
    /** @var int */
31
    protected $maxAttempts;
32
33
    /** @var array */
34
    protected $queueSuitableStatuses = [
35
        QueueEntityMappedSuperclass::STATUS_NEW,
36
        QueueEntityMappedSuperclass::STATUS_IN_PROGRESS
37
    ];
38
39 10
    public function __construct(
40
        QueueRepository $repository,
41
        EntityFactoryInterface $entityFactory,
42
        EventDispatcherInterface $eventDispatcher,
43
        int $maxAttempts
44
    ) {
45 10
        $this->repository = $repository;
46 10
        $this->entityFactory = $entityFactory;
47 10
        $this->eventDispatcher = $eventDispatcher;
48 10
        $this->maxAttempts = $maxAttempts;
49 10
    }
50
51
    /**
52
     * @param int      $limit
53
     * @param int | null $offset
54
     *
55
     * @throws \Doctrine\ORM\TransactionRequiredException
56
     *
57
     * @return array|QueueEntityInterface[]
58
     */
59 1
    public function getToRestore(int $limit, ?int $offset = null): array
60
    {
61 1
        return $this->repository->getToRestore($this->maxAttempts, $limit, $offset);
62
    }
63
64
    /**
65
     * @param int $id
66
     *
67
     * @throws Exception
68
     *
69
     * @return QueueEntityInterface
70
     */
71 5
    public function getToProcess(int $id): QueueEntityInterface
72
    {
73 5
        $queueEntity = $this->repository->find($id);
74
75 5
        if (!($queueEntity instanceof QueueEntityInterface)) {
76 1
            throw new UnexpectedValueException(sprintf(ConstantMessage::QUEUE_ENTITY_NOT_FOUND, $id));
77
        }
78
79 4
        if (!in_array($queueEntity->getStatus(), $this->queueSuitableStatuses, true)) {
80 1
            throw new UnexpectedValueException(sprintf(
81 1
                ConstantMessage::QUEUE_ENTITY_NOT_FOUND_IN_SUITABLE_STATUS,
82 1
                $queueEntity->getName(),
83 1
                $queueEntity->getJobName(),
84 1
                $queueEntity->getStatusAsString()
85
            ));
86
        }
87
88 3
        $attemptsReached = $queueEntity->isMaxAttemptsReached($this->maxAttempts);
89 3
        if ($attemptsReached) {
90 1
            $queueEntity->setAttemptsReached();
91
        } else {
92 2
            $queueEntity->setInProgress();
93
        }
94
95 3
        $this->repository->save($queueEntity);
96
97 3
        if ($attemptsReached) {
98 1
            $this->eventDispatcher->dispatch(QueueAttemptsReachedEvent::NAME, new QueueAttemptsReachedEvent($queueEntity));
99
100 1
            throw new AttemptsReachedException(sprintf(ConstantMessage::QUEUE_ATTEMPTS_REACHED, $queueEntity->getName()));
101
        }
102
103 2
        return $queueEntity;
104
    }
105
106
    /**
107
     * @param int      $limit
108
     * @param int|null $offset
109
     *
110
     * @throws \Doctrine\ORM\TransactionRequiredException
111
     *
112
     * @return array|QueueEntityInterface[]
113
     */
114
    public function getToRepublish(int $limit, ?int $offset = null): array
115
    {
116
        return $this->repository->getToRepublish($limit, $offset);
117
    }
118
119
    /**
120
     * @param QueueInterface $queueable
121
     *
122
     * @throws UnexpectedValueException
123
     *
124
     * @return QueueEntityInterface
125
     */
126 2
    public function createQueue(QueueInterface $queueable): QueueEntityInterface
127
    {
128 2
        $queue = $this->entityFactory->createQueue($queueable);
129
130 2
        return $this->save($queue);
131
    }
132
133
    public function flush(QueueEntityInterface $entity = null): void
134
    {
135
        $this->repository->flush($entity);
136
    }
137
138 3
    public function save(QueueEntityInterface $entity): QueueEntityInterface
139
    {
140 3
        $this->repository->save($entity);
141
142 3
        return $entity;
143
    }
144
145 2
    public function isTransactionActive(): bool
146
    {
147 2
        return $this->repository->isTransactionActive();
148
    }
149
150
    public function beginTransaction(): void
151
    {
152
        $this->repository->beginTransaction();
153
    }
154
155
    public function rollback(): void
156
    {
157
        $this->repository->rollback();
158
    }
159
160
    public function commit(): void
161
    {
162
        $this->repository->commit();
163
    }
164
}
165