JobRepository::findIncomingDependencies()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nc 2
nop 1
dl 0
loc 13
rs 9.9666
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Setono\SyliusSchedulerPlugin\Doctrine\ORM;
6
7
use Doctrine\DBAL\Connection;
8
use Doctrine\DBAL\Types\Type;
9
use Doctrine\ORM\QueryBuilder;
10
use Setono\SyliusSchedulerPlugin\Model\JobInterface;
11
use Sylius\Bundle\ResourceBundle\Doctrine\ORM\EntityRepository;
12
use Sylius\Component\Resource\Repository\RepositoryInterface;
13
14
class JobRepository extends EntityRepository implements JobRepositoryInterface
15
{
16
    /**
17
     * {@inheritdoc}
18
     */
19
    public function findOneByIdAndScheduleId(string $id, string $scheduleId): ?JobInterface
20
    {
21
        return $this->createQueryBuilder('j')
22
            ->leftJoin('j.schedule', 'schedule')
23
            ->andWhere('j.id = :id')
24
            ->andWhere('schedule.id = :scheduleId')
25
            ->setParameter('id', $id)
26
            ->setParameter('scheduleId', $scheduleId)
27
            ->setMaxResults(1)
28
            ->getQuery()
29
            ->getOneOrNullResult();
30
    }
31
32
    /**
33
     * {@inheritdoc}
34
     */
35
    public function createQueryBuilderByScheduleId(string $scheduleId): QueryBuilder
36
    {
37
        return $this->createQueryBuilder('j')
38
            ->innerJoin('j.schedule', 'schedule')
39
            ->andWhere('j.originalJob IS NULL')
40
            ->andWhere('schedule.id = :scheduleId')
41
            ->setParameter('scheduleId', $scheduleId)
42
            ;
43
    }
44
45
    /**
46
     * {@inheritdoc}
47
     */
48
    public function createQueryBuilderByOriginalJobId(string $originalJobId): QueryBuilder
49
    {
50
        return $this->createQueryBuilder('j')
51
            ->innerJoin('j.schedule', 'schedule')
52
            ->andWhere('j.originalJob = :originalJobId')
53
            ->setParameter('originalJobId', $originalJobId)
54
            ;
55
    }
56
57
    /**
58
     * {@inheritdoc}
59
     */
60
    public function findOneByCommand(string $command, array $args = []): ?JobInterface
61
    {
62
        return $this->createQueryBuilder('j')
63
            ->andWhere('j.command = :command')
64
            ->andWhere('j.args = :args')
65
            ->setParameter('command', $command)
66
            ->setParameter('args', $args, Type::JSON_ARRAY)
67
            ->setMaxResults(1)
68
            ->getQuery()
69
            ->getOneOrNullResult();
70
    }
71
72
    /**
73
     * {@inheritdoc}
74
     */
75
    public function findFirstOneByCommand(string $command, array $args = []): ?JobInterface
76
    {
77
        return $this->createQueryBuilder('j')
78
            ->andWhere('j.command = :command')
79
            ->andWhere('j.args = :args')
80
            ->orderBy('j.id', RepositoryInterface::ORDER_ASCENDING)
81
            ->setParameter('command', $command)
82
            ->setParameter('args', $args, 'json_array')
83
            ->setMaxResults(1)
84
            ->getQuery()
85
            ->getSingleResult();
86
    }
87
88
    /**
89
     * {@inheritdoc}
90
     */
91
    public function findOnePending(array $excludedIds = [], array $excludedQueues = [], array $restrictedQueues = []): ?JobInterface
92
    {
93
        $qb = $this->createQueryBuilder('j')
94
            ->orderBy('j.priority', 'ASC')
95
            ->addOrderBy('j.id', 'ASC')
96
        ;
97
98
        $conditions = [];
99
100
        $conditions[] = $qb->expr()->isNull('j.workerName');
101
102
        $conditions[] = $qb->expr()->lt('j.executeAfter', ':now');
103
        $qb->setParameter(':now', new \DateTime(), 'datetime');
104
105
        $conditions[] = $qb->expr()->eq('j.state', ':state');
106
        $qb->setParameter('state', JobInterface::STATE_PENDING);
107
108
        if (!empty($excludedIds)) {
109
            $conditions[] = $qb->expr()->notIn('j.id', ':excludedIds');
110
            $qb->setParameter('excludedIds', $excludedIds, Connection::PARAM_INT_ARRAY);
111
        }
112
113
        if (!empty($excludedQueues)) {
114
            $conditions[] = $qb->expr()->notIn('j.queue', ':excludedQueues');
115
            $qb->setParameter('excludedQueues', $excludedQueues, Connection::PARAM_STR_ARRAY);
116
        }
117
118
        if (!empty($restrictedQueues)) {
119
            $conditions[] = $qb->expr()->in('j.queue', ':restrictedQueues');
120
            $qb->setParameter('restrictedQueues', $restrictedQueues, Connection::PARAM_STR_ARRAY);
121
        }
122
123
        $qb->where(\call_user_func_array([$qb->expr(), 'andX'], $conditions));
124
125
        return $qb->getQuery()->setMaxResults(1)->getOneOrNullResult();
126
    }
127
128
    /**
129
     * {@inheritdoc}
130
     */
131
    public function findSucceededBefore(\DateTime $retentionTime, array $excludedIds = [], $limit = 100): array
132
    {
133
        return $this->createQueryBuilder('j')
134
            ->andWhere('j.closedAt < :maxRetentionTime')
135
            ->setParameter('maxRetentionTime', $retentionTime)
136
            ->andWhere('j.originalJob IS NULL')
137
            ->andWhere('j.state = :succeeded')
138
            ->setParameter('succeeded', JobInterface::STATE_FINISHED)
139
            ->andWhere('j.id NOT IN (:excludedIds)')
140
            ->setParameter('excludedIds', $excludedIds)
141
            ->setMaxResults($limit)
142
            ->getQuery()
143
            ->getResult();
144
    }
145
146
    /**
147
     * {@inheritdoc}
148
     */
149
    public function findFinishedBefore(\DateTime $retentionTime, array $excludedIds = [], $limit = 100): array
150
    {
151
        return $this->createQueryBuilder('j')
152
            ->andWhere('j.closedAt < :maxRetentionTime')
153
            ->setParameter('maxRetentionTime', $retentionTime)
154
            ->andWhere('j.originalJob IS NULL')
155
            ->andWhere('j.id NOT IN (:excludedIds)')
156
            ->setParameter('excludedIds', $excludedIds)
157
            ->setMaxResults($limit)
158
            ->getQuery()
159
            ->getResult();
160
    }
161
162
    /**
163
     * {@inheritdoc}
164
     */
165
    public function findCancelledBefore(\DateTime $retentionTime, array $excludedIds = [], $limit = 100): array
166
    {
167
        return $this->createQueryBuilder('j')
168
            ->andWhere('j.state = :canceled')
169
            ->setParameter('canceled', JobInterface::STATE_CANCELED)
170
            ->andWhere('j.createdAt < :maxRetentionTime')
171
            ->setParameter('maxRetentionTime', $retentionTime)
172
            ->andWhere('j.originalJob IS NULL')
173
            ->andWhere('j.id NOT IN (:excludedIds)')
174
            ->setParameter('excludedIds', $excludedIds)
175
            ->setMaxResults($limit)
176
            ->getQuery()
177
            ->getResult();
178
    }
179
180
    /**
181
     * {@inheritdoc}
182
     */
183
    public function findOneStartableAndAquireLock(string $workerName, array &$excludedIds = [], $excludedQueues = [], $restrictedQueues = []): ?JobInterface
184
    {
185
        while (true) {
186
            $job = $this->findOnePending($excludedIds, $excludedQueues, $restrictedQueues);
187
            if (null === $job) {
188
                return null;
189
            }
190
191
            if ($job->isStartable() && $this->acquireLock($workerName, $job)) {
192
                return $job;
193
            }
194
195
            $excludedIds[] = $job->getId();
196
197
            // We do not want to have non-startable jobs floating around in
198
            // cache as they might be changed by another process. So, better
199
            // re-fetch them when they are not excluded anymore.
200
            $this->_em->detach($job);
201
        }
202
203
        return null;
204
    }
205
206
    /**
207
     * @param string $workerName
208
     * @param JobInterface $job
209
     *
210
     * @return bool
211
     */
212
    private function acquireLock(string $workerName, JobInterface $job): bool
213
    {
214
        $affectedRows = $this->_em->getConnection()->executeUpdate(
215
            sprintf(
216
                'UPDATE %s SET worker_name = :worker WHERE id = :id AND worker_name IS NULL',
217
                $this->_em->getClassMetadata($this->getClassName())->getTableName()
218
            ),
219
            [
220
                'worker' => $workerName,
221
                'id' => $job->getId(),
222
            ]
223
        );
224
225
        if ($affectedRows > 0) {
226
            $job->setWorkerName($workerName);
227
228
            return true;
229
        }
230
231
        return false;
232
    }
233
234
    /**
235
     * {@inheritdoc}
236
     */
237
    public function findIncomingDependencies(JobInterface $job): array
238
    {
239
        $jobIds = $this->getJobIdsOfIncomingDependencies($job);
240
        if (empty($jobIds)) {
241
            return [];
242
        }
243
244
        return $this->createQueryBuilder('j')
245
            ->leftJoin('j.dependencies', 'd')
246
            ->andWhere('j.id IN (:ids)')
247
            ->setParameter('ids', $jobIds)
248
            ->getQuery()
249
            ->getResult()
250
            ;
251
    }
252
253
    /**
254
     * @param JobInterface $job
255
     *
256
     * @return array
257
     */
258
    private function getJobIdsOfIncomingDependencies(JobInterface $job): array
259
    {
260
        return $this->_em->getConnection()
261
            ->executeQuery('SELECT source_job_id FROM setono_sylius_scheduler_job_dependencies WHERE destination_job_id = :id', ['id' => $job->getId()])
262
            ->fetchAll(\PDO::FETCH_COLUMN);
263
    }
264
265
    /**
266
     * {@inheritdoc}
267
     */
268
    public function findOneStale(array $excludedIds = [], ?\DateTime $maxAge = null): ?JobInterface
269
    {
270
        return $this->createQueryBuilder('j')
271
            ->andWhere('j.state = :running')
272
            ->setParameter('running', JobInterface::STATE_RUNNING)
273
            ->andWhere('j.workerName IS NOT NULL')
274
            ->andWhere('j.checkedAt < :maxAge')
275
            ->setParameter('maxAge', $maxAge ?: new \DateTime('-5 minutes'), 'datetime')
276
            ->andWhere('j.id NOT IN (:excludedIds)')
277
            ->setParameter('excludedIds', $excludedIds)
278
            ->setMaxResults(1)
279
            ->getQuery()
280
            ->getOneOrNullResult();
281
    }
282
283
    /**
284
     * {@inheritdoc}
285
     */
286
    public function findStale(string $workerName): array
287
    {
288
        return $this->createQueryBuilder('j')
289
            ->andWhere('j.state = :running')
290
            ->setParameter('running', JobInterface::STATE_RUNNING)
291
            ->andWhere('(j.workerName = :worker OR j.workerName IS NULL)')
292
            ->setParameter('worker', $workerName)
293
            ->getQuery()
294
            ->getResult();
295
    }
296
297
    /**
298
     * {@inheritdoc}
299
     */
300
    public function findLastJobsWithError($limit = 10): array
301
    {
302
        return $this->createQueryBuilder('j')
303
            ->andWhere('j.state IN (:errorStates)')
304
            ->setParameter('errorStates', [
305
                JobInterface::STATE_TERMINATED,
306
                JobInterface::STATE_FAILED,
307
            ])
308
            ->andWhere('j.originalJob IS NULL')
309
            ->orderBy('j.closedAt', RepositoryInterface::ORDER_DESCENDING)
310
            ->setMaxResults($limit)
311
            ->getQuery()
312
            ->getResult();
313
    }
314
}
315