QueueRepository   F
last analyzed

Complexity

Total Complexity 60

Size/Duplication

Total Lines 703
Duplicated Lines 0 %

Test Coverage

Coverage 94.44%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 60
eloc 361
c 2
b 0
f 0
dl 0
loc 703
ccs 340
cts 360
cp 0.9444
rs 3.6

32 Methods

Rating   Name   Duplication   Size   Complexity  
A getLastProcessedEntriesTimestamps() 0 16 2
A getSetIdWithUnprocessedEntries() 0 19 2
A getTotalQueueEntriesByConfiguration() 0 22 3
A countAllPendingItems() 0 14 1
A getPerformanceData() 0 21 2
A countAllUnassignedPendingItems() 0 14 1
A fetchRecordsToBeCrawled() 0 23 1
A findByQueueId() 0 12 2
A countPendingItemsGroupedByConfigurationKey() 0 15 1
A countAllAssignedPendingItems() 0 14 1
A cleanupQueue() 0 18 3
A cleanUpOldQueueEntries() 0 20 2
A isPageInQueueTimed() 0 3 1
A unsetProcessScheduledAndProcessIdForQueueEntries() 0 12 1
A getLastProcessedEntries() 0 16 2
A getAvailableSets() 0 17 2
B isPageInQueue() 0 40 6
A updateProcessIdAndSchedulerForQueueIds() 0 11 1
A countNonExecutedItemsByProcess() 0 13 1
A unsetQueueProcessId() 0 10 1
A getUnprocessedItems() 0 11 1
A findOldestEntryForProcess() 0 3 1
A countUnprocessedItems() 0 7 1
A findYoungestEntryForProcess() 0 3 1
A countExecutedItemsByProcess() 0 13 1
A __construct() 0 6 1
A getQueueEntriesForPageId() 0 33 4
A countAllByProcessId() 0 16 1
A getDuplicateQueueItemsIfExists() 0 47 5
A noUnprocessedQueueEntriesForPageWithConfigurationHashExist() 0 21 2
A flushQueue() 0 20 4
A getFirstOrLastObjectByProcess() 0 15 2

How to fix   Complexity   

Complex Class

Complex classes like QueueRepository often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueRepository, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use AOE\Crawler\Value\QueueFilter;
34
use PDO;
35
use Psr\Log\LoggerAwareInterface;
36
use Psr\Log\LoggerAwareTrait;
37
use TYPO3\CMS\Core\Database\Connection;
38
use TYPO3\CMS\Core\Database\ConnectionPool;
39
use TYPO3\CMS\Core\Utility\GeneralUtility;
40
use TYPO3\CMS\Extbase\Object\ObjectManager;
41
use TYPO3\CMS\Extbase\Persistence\Repository;
42
43
/**
44
 * @internal since v9.2.5
45
 */
46
class QueueRepository extends Repository implements LoggerAwareInterface
47
{
48
    use LoggerAwareTrait;
49
50
    public const TABLE_NAME = 'tx_crawler_queue';
51
52
    /**
53
     * @var array
54
     */
55
    protected $extensionSettings;
56
57 114
    public function __construct()
58
    {
59 114
        $objectManager = GeneralUtility::makeInstance(ObjectManager::class);
60 114
        $this->extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
61
62 114
        parent::__construct($objectManager);
63 114
    }
64
65
    // TODO: Should be a property on the QueueObject
66 3
    public function unsetQueueProcessId(string $processId): void
67
    {
68 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
69
        $queryBuilder
70 3
            ->update(self::TABLE_NAME)
71 3
            ->where(
72 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
73
            )
74 3
            ->set('process_id', '')
75 3
            ->execute();
76 3
    }
77
78
    /**
79
     * This method is used to find the youngest entry for a given process.
80
     */
81 1
    public function findYoungestEntryForProcess(Process $process): array
82
    {
83 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
84
    }
85
86
    /**
87
     * This method is used to find the oldest entry for a given process.
88
     */
89 1
    public function findOldestEntryForProcess(Process $process): array
90
    {
91 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
92
    }
93
94
    /**
95
     * Counts all executed items of a process.
96
     *
97
     * @param Process $process
98
     */
99 1
    public function countExecutedItemsByProcess($process): int
100
    {
101 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
102
103
        return $queryBuilder
104 1
            ->count('*')
105 1
            ->from(self::TABLE_NAME)
106 1
            ->where(
107 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
108 1
                $queryBuilder->expr()->gt('exec_time', 0)
109
            )
110 1
            ->execute()
111 1
            ->fetchColumn(0);
112
    }
113
114
    /**
115
     * Counts items of a process which yet have not been processed/executed
116
     *
117
     * @param Process $process
118
     */
119 1
    public function countNonExecutedItemsByProcess($process): int
120
    {
121 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
122
123
        return $queryBuilder
124 1
            ->count('*')
125 1
            ->from(self::TABLE_NAME)
126 1
            ->where(
127 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
128 1
                $queryBuilder->expr()->eq('exec_time', 0)
129
            )
130 1
            ->execute()
131 1
            ->fetchColumn(0);
132
    }
133
134
    /**
135
     * get items which have not been processed yet
136
     */
137 10
    public function getUnprocessedItems(): array
138
    {
139 10
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
140
141
        return $queryBuilder
142 10
            ->select('*')
143 10
            ->from(self::TABLE_NAME)
144 10
            ->where(
145 10
                $queryBuilder->expr()->eq('exec_time', 0)
146
            )
147 10
            ->execute()->fetchAll();
148
    }
149
150
    /**
151
     * Count items which have not been processed yet
152
     * @deprecated Using QueueRepository->countUnprocessedItems() is deprecated since 9.1.5 and will be removed in v11.x, please use count(QueueRepository->getUnprocessedItems()) instead
153
     */
154
    public function countUnprocessedItems(): int
155
    {
156
        trigger_error(
157
            'Using QueueRepository->countUnprocessedItems() is deprecated since 9.1.5 and will be removed in v11.x, please use count(QueueRepository->getUnprocessedItems()) instead',
158
            E_USER_DEPRECATED
159
        );
160
        return count($this->getUnprocessedItems());
161
    }
162
163
    /**
164
     * This method can be used to count all queue entrys which are
165
     * scheduled for now or a earlier date.
166
     */
167 3
    public function countAllPendingItems(): int
168
    {
169 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
170
171
        return $queryBuilder
172 3
            ->count('*')
173 3
            ->from(self::TABLE_NAME)
174 3
            ->where(
175 3
                $queryBuilder->expr()->eq('process_scheduled', 0),
176 3
                $queryBuilder->expr()->eq('exec_time', 0),
177 3
                $queryBuilder->expr()->lte('scheduled', time())
178
            )
179 3
            ->execute()
180 3
            ->fetchColumn(0);
181
    }
182
183
    /**
184
     * This method can be used to count all queue entries which are
185
     * scheduled for now or a earlier date and are assigned to a process.
186
     */
187 3
    public function countAllAssignedPendingItems(): int
188
    {
189 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
190
191
        return $queryBuilder
192 3
            ->count('*')
193 3
            ->from(self::TABLE_NAME)
194 3
            ->where(
195 3
                $queryBuilder->expr()->neq('process_id', '""'),
196 3
                $queryBuilder->expr()->eq('exec_time', 0),
197 3
                $queryBuilder->expr()->lte('scheduled', time())
198
            )
199 3
            ->execute()
200 3
            ->fetchColumn(0);
201
    }
202
203
    /**
204
     * This method can be used to count all queue entrys which are
205
     * scheduled for now or a earlier date and are not assigned to a process.
206
     */
207 2
    public function countAllUnassignedPendingItems(): int
208
    {
209 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
210
211
        return $queryBuilder
212 2
            ->count('*')
213 2
            ->from(self::TABLE_NAME)
214 2
            ->where(
215 2
                $queryBuilder->expr()->eq('process_id', '""'),
216 2
                $queryBuilder->expr()->eq('exec_time', 0),
217 2
                $queryBuilder->expr()->lte('scheduled', time())
218
            )
219 2
            ->execute()
220 2
            ->fetchColumn(0);
221
    }
222
223
    /**
224
     * Count pending queue entries grouped by configuration key
225
     */
226 1
    public function countPendingItemsGroupedByConfigurationKey(): array
227
    {
228 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
229
        $statement = $queryBuilder
230 1
            ->from(self::TABLE_NAME)
231 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
232 1
            ->addSelect('configuration')
233 1
            ->where(
234 1
                $queryBuilder->expr()->eq('exec_time', 0),
235 1
                $queryBuilder->expr()->lt('scheduled', time())
236
            )
237 1
            ->groupBy('configuration')
238 1
            ->execute();
239
240 1
        return $statement->fetchAll();
241
    }
242
243
    /**
244
     * Get set id with unprocessed entries
245
     *
246
     * @return array array of set ids
247
     */
248 1
    public function getSetIdWithUnprocessedEntries(): array
249
    {
250 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
251
        $statement = $queryBuilder
252 1
            ->select('set_id')
253 1
            ->from(self::TABLE_NAME)
254 1
            ->where(
255 1
                $queryBuilder->expr()->lt('scheduled', time()),
256 1
                $queryBuilder->expr()->eq('exec_time', 0)
257
            )
258 1
            ->addGroupBy('set_id')
259 1
            ->execute();
260
261 1
        $setIds = [];
262 1
        while ($row = $statement->fetch()) {
263 1
            $setIds[] = intval($row['set_id']);
264
        }
265
266 1
        return $setIds;
267
    }
268
269
    /**
270
     * Get total queue entries by configuration
271
     *
272
     * @return array totals by configuration (keys)
273
     */
274 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
275
    {
276 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
277 1
        $totals = [];
278 1
        if (! empty($setIds)) {
279
            $statement = $queryBuilder
280 1
                ->from(self::TABLE_NAME)
281 1
                ->selectLiteral('count(*) as c')
282 1
                ->addSelect('configuration')
283 1
                ->where(
284 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
285 1
                    $queryBuilder->expr()->lt('scheduled', time())
286
                )
287 1
                ->groupBy('configuration')
288 1
                ->execute();
289
290 1
            while ($row = $statement->fetch()) {
291 1
                $totals[$row['configuration']] = $row['c'];
292
            }
293
        }
294
295 1
        return $totals;
296
    }
297
298
    /**
299
     * Get the timestamps of the last processed entries
300
     *
301
     * @param int $limit
302
     */
303 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
304
    {
305 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
306
        $statement = $queryBuilder
307 1
            ->select('exec_time')
308 1
            ->from(self::TABLE_NAME)
309 1
            ->addOrderBy('exec_time', 'desc')
310 1
            ->setMaxResults($limit)
311 1
            ->execute();
312
313 1
        $rows = [];
314 1
        while ($row = $statement->fetch()) {
315 1
            $rows[] = $row['exec_time'];
316
        }
317
318 1
        return $rows;
319
    }
320
321
    /**
322
     * Get the last processed entries
323
     *
324
     * @param int $limit
325
     */
326 1
    public function getLastProcessedEntries($limit = 100): array
327
    {
328 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
329
        $statement = $queryBuilder
330 1
            ->from(self::TABLE_NAME)
331 1
            ->select('*')
332 1
            ->orderBy('exec_time', 'desc')
333 1
            ->setMaxResults($limit)
334 1
            ->execute();
335
336 1
        $rows = [];
337 1
        while (($row = $statement->fetch()) !== false) {
338 1
            $rows[] = $row;
339
        }
340
341 1
        return $rows;
342
    }
343
344
    /**
345
     * Get performance statistics data
346
     *
347
     * @param int $start timestamp
348
     * @param int $end timestamp
349
     *
350
     * @return array performance data
351
     */
352 1
    public function getPerformanceData($start, $end): array
353
    {
354 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
355
        $statement = $queryBuilder
356 1
            ->from(self::TABLE_NAME)
357 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
358 1
            ->addSelect('process_id_completed')
359 1
            ->where(
360 1
                $queryBuilder->expr()->neq('exec_time', 0),
361 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
362 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
363
            )
364 1
            ->groupBy('process_id_completed')
365 1
            ->execute();
366
367 1
        $rows = [];
368 1
        while ($row = $statement->fetch()) {
369 1
            $rows[$row['process_id_completed']] = $row;
370
        }
371
372 1
        return $rows;
373
    }
374
375
    /**
376
     * Determines if a page is queued
377
     */
378 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
379
    {
380 5
        $isPageInQueue = false;
381
382 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
383
        $statement = $queryBuilder
384 5
            ->from(self::TABLE_NAME)
385 5
            ->count('*')
386 5
            ->where(
387 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
388
            );
389
390 5
        if ($unprocessed_only !== false) {
391 2
            $statement->andWhere(
392 2
                $queryBuilder->expr()->eq('exec_time', 0)
393
            );
394
        }
395
396 5
        if ($timed_only !== false) {
397 1
            $statement->andWhere(
398 1
                $queryBuilder->expr()->neq('scheduled', 0)
399
            );
400
        }
401
402 5
        if ($timestamp) {
403 1
            $statement->andWhere(
404 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
405
            );
406
        }
407
408
        // TODO: Currently it's not working if page doesn't exists. See tests
409
        $count = $statement
410 5
            ->execute()
411 5
            ->fetchColumn(0);
412
413 5
        if ($count !== false && $count > 0) {
414 4
            $isPageInQueue = true;
415
        }
416
417 5
        return $isPageInQueue;
418
    }
419
420
    /**
421
     * Method to check if a page is in the queue which is timed for a
422
     * date when it should be crawled
423
     */
424 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
425
    {
426 1
        return $this->isPageInQueue($uid, $show_unprocessed);
427
    }
428
429 1
    public function getAvailableSets(): array
430
    {
431 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
432
        $statement = $queryBuilder
433 1
            ->selectLiteral('count(*) as count_value')
434 1
            ->addSelect('set_id', 'scheduled')
435 1
            ->from(self::TABLE_NAME)
436 1
            ->orderBy('scheduled', 'desc')
437 1
            ->groupBy('set_id', 'scheduled')
438 1
            ->execute();
439
440 1
        $rows = [];
441 1
        while ($row = $statement->fetch()) {
442 1
            $rows[] = $row;
443
        }
444
445 1
        return $rows;
446
    }
447
448 1
    public function findByQueueId(string $queueId): ?array
449
    {
450 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
451
        $queueRec = $queryBuilder
452 1
            ->select('*')
453 1
            ->from(self::TABLE_NAME)
454 1
            ->where(
455 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
456
            )
457 1
            ->execute()
458 1
            ->fetch();
459 1
        return is_array($queueRec) ? $queueRec : null;
460
    }
461
462 3
    public function cleanupQueue(): void
463
    {
464 3
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
465 3
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
466
467 3
        if ($purgeDays > 0) {
468 3
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
469
470 3
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
471
            $del = $queryBuilderDelete
472 3
                ->delete(self::TABLE_NAME)
473 3
                ->where(
474 3
                    'exec_time != 0 AND exec_time < ' . $purgeDate
475 3
                )->execute();
476
477 3
            if ($del === false) {
478
                $this->logger->info(
0 ignored issues
show
Bug introduced by
The method info() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

478
                $this->logger->/** @scrutinizer ignore-call */ 
479
                               info(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
479
                    'Records could not be deleted.'
480
                );
481
            }
482
        }
483 3
    }
484
485
    /**
486
     * Cleans up entries that stayed for too long in the queue. These are default:
487
     * - processed entries that are over 1.5 days in age
488
     * - scheduled entries that are over 7 days old
489
     */
490 1
    public function cleanUpOldQueueEntries(): void
491
    {
492 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
493
        // 24*60*60 Seconds in 24 hours
494 1
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400;
495 1
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
496
497 1
        $now = time();
498 1
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
499
500 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
501
        $del = $queryBuilder
502 1
            ->delete(self::TABLE_NAME)
503 1
            ->where(
504 1
                $condition
505 1
            )->execute();
506
507 1
        if ($del === false) {
508
            $this->logger->info(
509
                'Records could not be deleted.'
510
            );
511
        }
512 1
    }
513
514 4
    public function fetchRecordsToBeCrawled(int $countInARun): array
515
    {
516 4
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
517
        return $queryBuilderSelect
518 4
            ->select('qid', 'scheduled', 'page_id', 'sitemap_priority')
519 4
            ->from(self::TABLE_NAME)
520 4
            ->leftJoin(
521 4
                self::TABLE_NAME,
522 4
                'pages',
523 4
                'p',
524 4
                $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier(self::TABLE_NAME . '.page_id'))
525
            )
526 4
            ->where(
527 4
                $queryBuilderSelect->expr()->eq('exec_time', 0),
528 4
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
529 4
                $queryBuilderSelect->expr()->lte('scheduled', time())
530
            )
531 4
            ->orderBy('sitemap_priority', 'DESC')
532 4
            ->addOrderBy('scheduled')
533 4
            ->addOrderBy('qid')
534 4
            ->setMaxResults($countInARun)
535 4
            ->execute()
536 4
            ->fetchAll();
537
    }
538
539 3
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
540
    {
541 3
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
542
        return $queryBuilderUpdate
543 3
            ->update(self::TABLE_NAME)
544 3
            ->where(
545 3
                $queryBuilderUpdate->expr()->in('qid', $quidList)
546
            )
547 3
            ->set('process_scheduled', time())
548 3
            ->set('process_id', $processId)
549 3
            ->execute();
550
    }
551
552 3
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
553
    {
554 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
555
        $queryBuilder
556 3
            ->update(self::TABLE_NAME)
557 3
            ->where(
558 3
                $queryBuilder->expr()->eq('exec_time', 0),
559 3
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
560
            )
561 3
            ->set('process_scheduled', 0)
562 3
            ->set('process_id', '')
563 3
            ->execute();
564 3
    }
565
566
    /**
567
     * This method is used to count if there are ANY unprocessed queue entries
568
     * of a given page_id and the configuration which matches a given hash.
569
     * If there if none, we can skip an inner detail check
570
     */
571 7
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist(int $uid, string $configurationHash): bool
572
    {
573 7
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
574 7
        $noUnprocessedQueueEntriesFound = true;
575
576
        $result = $queryBuilder
577 7
            ->count('*')
578 7
            ->from(self::TABLE_NAME)
579 7
            ->where(
580 7
                $queryBuilder->expr()->eq('page_id', $uid),
581 7
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
582 7
                $queryBuilder->expr()->eq('exec_time', 0)
583
            )
584 7
            ->execute()
585 7
            ->fetchColumn();
586
587 7
        if ($result) {
588 7
            $noUnprocessedQueueEntriesFound = false;
589
        }
590
591 7
        return $noUnprocessedQueueEntriesFound;
592
    }
593
594
    /**
595
     * Removes queue entries
596
     */
597 10
    public function flushQueue(QueueFilter $queueFilter): void
598
    {
599 10
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
600
601 10
        switch ($queueFilter) {
602 10
            case 'all':
603
                // No where claus needed delete everything
604 6
                break;
605 4
            case 'pending':
606 2
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
607 2
                break;
608 2
            case 'finished':
609
            default:
610 2
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
611 2
                break;
612
        }
613
614
        $queryBuilder
615 10
            ->delete(self::TABLE_NAME)
616 10
            ->execute();
617 10
    }
618
619
    /**
620
     * @param string $processId
621
     *
622
     * @return bool|string
623
     * @deprecated Using QueueRepository->countAllByProcessId() is deprecated since 9.1.5 and will be removed in v11.x, please use QueueRepository->findByProcessId()->count() instead
624
     */
625
    public function countAllByProcessId($processId)
626
    {
627
        trigger_error(
628
            'Using QueueRepository->countAllByProcessId() is deprecated since 9.1.5 and will be removed in v11.x, please use QueueRepository->findByProcessId()->count() instead',
629
            E_USER_DEPRECATED
630
        );
631
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
632
633
        return $queryBuilder
634
            ->count('*')
635
            ->from(self::TABLE_NAME)
636
            ->where(
637
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId, \PDO::PARAM_STR))
638
            )
639
            ->execute()
640
            ->fetchColumn(0);
641
    }
642
643 12
    public function getDuplicateQueueItemsIfExists(bool $enableTimeslot, int $timestamp, int $currentTime, int $pageId, string $parametersHash): array
644
    {
645 12
        $rows = [];
646
647 12
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
648
        $queryBuilder
649 12
            ->select('qid')
650 12
            ->from(self::TABLE_NAME);
651
        //if this entry is scheduled with "now"
652 12
        if ($timestamp <= $currentTime) {
653 4
            if ($enableTimeslot) {
654 3
                $timeBegin = $currentTime - 100;
655 3
                $timeEnd = $currentTime + 100;
656
                $queryBuilder
657 3
                    ->where(
658 3
                        'scheduled BETWEEN ' . $timeBegin . ' AND ' . $timeEnd . ''
659
                    )
660 3
                    ->orWhere(
661 3
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
662
                    );
663
            } else {
664
                $queryBuilder
665 1
                    ->where(
666 4
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
667
                    );
668
            }
669 8
        } elseif ($timestamp > $currentTime) {
670
            //entry with a timestamp in the future need to have the same schedule time
671
            $queryBuilder
672 8
                ->where(
673 8
                    $queryBuilder->expr()->eq('scheduled', $timestamp)
674
                );
675
        }
676
677
        $queryBuilder
678 12
            ->andWhere('NOT exec_time')
679 12
            ->andWhere('NOT process_id')
680 12
            ->andWhere($queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($pageId, \PDO::PARAM_INT)))
681 12
            ->andWhere($queryBuilder->expr()->eq('parameters_hash', $queryBuilder->createNamedParameter($parametersHash, \PDO::PARAM_STR)));
682
683 12
        $statement = $queryBuilder->execute();
684
685 12
        while ($row = $statement->fetch()) {
686 9
            $rows[] = $row['qid'];
687
        }
688
689 12
        return $rows;
690
    }
691
692 4
    public function getQueueEntriesForPageId(int $id, int $itemsPerPage, QueueFilter $queueFilter): array
693
    {
694 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
695
        $queryBuilder
696 4
            ->select('*')
697 4
            ->from(self::TABLE_NAME)
698 4
            ->where(
699 4
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($id, PDO::PARAM_INT))
700
            )
701 4
            ->orderBy('scheduled', 'DESC');
702
703 4
        $expressionBuilder = GeneralUtility::makeInstance(ConnectionPool::class)
704 4
            ->getConnectionForTable(self::TABLE_NAME)
705 4
            ->getExpressionBuilder();
706 4
        $query = $expressionBuilder->andX();
0 ignored issues
show
Unused Code introduced by
The assignment to $query is dead and can be removed.
Loading history...
707
        // PHPStorm adds the highlight that the $addWhere is immediately overwritten,
708
        // but the $query = $expressionBuilder->andX() ensures that the $addWhere is written correctly with AND
709
        // between the statements, it's not a mistake in the code.
710 4
        switch ($queueFilter) {
711 4
            case 'pending':
712 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
713 1
                break;
714 3
            case 'finished':
715 1
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
716 1
                break;
717
        }
718
719 4
        if ($itemsPerPage > 0) {
720
            $queryBuilder
721 4
                ->setMaxResults($itemsPerPage);
722
        }
723
724 4
        return $queryBuilder->execute()->fetchAll();
725
    }
726
727
    /**
728
     * This internal helper method is used to create an instance of an entry object
729
     *
730
     * @param Process $process
731
     * @param string $orderByField first matching item will be returned as object
732
     * @param string $orderBySorting sorting direction
733
     */
734 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
735
    {
736 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
737
        $first = $queryBuilder
738 5
            ->select('*')
739 5
            ->from(self::TABLE_NAME)
740 5
            ->where(
741 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
742 5
                $queryBuilder->expr()->gt('exec_time', 0)
743
            )
744 5
            ->setMaxResults(1)
745 5
            ->addOrderBy($orderByField, $orderBySorting)
746 5
            ->execute()->fetch(0);
747
748 5
        return $first ?: [];
749
    }
750
}
751