Passed
Push — main ( fcc530...d42fc3 )
by Tomas Norre
28:03 queued 21:49
created

QueueRepository::cleanUpOldQueueEntries()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 20
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 2.0116

Importance

Changes 0
Metric Value
eloc 14
nc 2
nop 0
dl 0
loc 20
c 0
b 0
f 0
cc 2
ccs 12
cts 14
cp 0.8571
crap 2.0116
rs 9.7998
1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use AOE\Crawler\Value\QueueFilter;
34
use PDO;
35
use Psr\Log\LoggerAwareInterface;
36
use Psr\Log\LoggerAwareTrait;
37
use TYPO3\CMS\Core\Database\Connection;
38
use TYPO3\CMS\Core\Database\ConnectionPool;
39
use TYPO3\CMS\Core\Utility\GeneralUtility;
40
use TYPO3\CMS\Extbase\Object\ObjectManager;
41
use TYPO3\CMS\Extbase\Persistence\Repository;
42
43
/**
44
 * @internal since v9.2.5
45
 */
46
class QueueRepository extends Repository implements LoggerAwareInterface
47
{
48
    use LoggerAwareTrait;
49
50
    public const TABLE_NAME = 'tx_crawler_queue';
51
52
    /**
53
     * @var string
54
     * @deprecated Since v9.2.5 - This will be remove in v10
55
     */
56
    protected $tableName = 'tx_crawler_queue';
57
58
    /**
59
     * @var array
60
     */
61
    protected $extensionSettings;
62
63 112
    public function __construct()
64
    {
65 112
        $objectManager = GeneralUtility::makeInstance(ObjectManager::class);
66 112
        $this->extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
67
68 112
        parent::__construct($objectManager);
69 112
    }
70
71
    // TODO: Should be a property on the QueueObject
72 3
    public function unsetQueueProcessId(string $processId): void
73
    {
74 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
75
        $queryBuilder
76 3
            ->update(self::TABLE_NAME)
77 3
            ->where(
78 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
79
            )
80 3
            ->set('process_id', '')
81 3
            ->execute();
82 3
    }
83
84
    /**
85
     * This method is used to find the youngest entry for a given process.
86
     */
87 1
    public function findYoungestEntryForProcess(Process $process): array
88
    {
89 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
90
    }
91
92
    /**
93
     * This method is used to find the oldest entry for a given process.
94
     */
95 1
    public function findOldestEntryForProcess(Process $process): array
96
    {
97 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
98
    }
99
100
    /**
101
     * Counts all executed items of a process.
102
     *
103
     * @param Process $process
104
     */
105 1
    public function countExecutedItemsByProcess($process): int
106
    {
107 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
108
109
        return $queryBuilder
110 1
            ->count('*')
111 1
            ->from(self::TABLE_NAME)
112 1
            ->where(
113 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
114 1
                $queryBuilder->expr()->gt('exec_time', 0)
115
            )
116 1
            ->execute()
117 1
            ->fetchColumn(0);
118
    }
119
120
    /**
121
     * Counts items of a process which yet have not been processed/executed
122
     *
123
     * @param Process $process
124
     */
125 1
    public function countNonExecutedItemsByProcess($process): int
126
    {
127 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
128
129
        return $queryBuilder
130 1
            ->count('*')
131 1
            ->from(self::TABLE_NAME)
132 1
            ->where(
133 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
134 1
                $queryBuilder->expr()->eq('exec_time', 0)
135
            )
136 1
            ->execute()
137 1
            ->fetchColumn(0);
138
    }
139
140
    /**
141
     * get items which have not been processed yet
142
     */
143 10
    public function getUnprocessedItems(): array
144
    {
145 10
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
146
147
        return $queryBuilder
148 10
            ->select('*')
149 10
            ->from(self::TABLE_NAME)
150 10
            ->where(
151 10
                $queryBuilder->expr()->eq('exec_time', 0)
152
            )
153 10
            ->execute()->fetchAll();
154
    }
155
156
    /**
157
     * Count items which have not been processed yet
158
     * @deprecated Using QueueRepository->countUnprocessedItems() is deprecated since 9.1.5 and will be removed in v11.x, please use count(QueueRepository->getUnprocessedItems()) instead
159
     */
160
    public function countUnprocessedItems(): int
161
    {
162
        trigger_error(
163
            'Using QueueRepository->countUnprocessedItems() is deprecated since 9.1.5 and will be removed in v11.x, please use count(QueueRepository->getUnprocessedItems()) instead',
164
            E_USER_DEPRECATED
165
        );
166
        return count($this->getUnprocessedItems());
167
    }
168
169
    /**
170
     * This method can be used to count all queue entrys which are
171
     * scheduled for now or a earlier date.
172
     */
173 2
    public function countAllPendingItems(): int
174
    {
175 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
176
177
        return $queryBuilder
178 2
            ->count('*')
179 2
            ->from(self::TABLE_NAME)
180 2
            ->where(
181 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
182 2
                $queryBuilder->expr()->eq('exec_time', 0),
183 2
                $queryBuilder->expr()->lte('scheduled', time())
184
            )
185 2
            ->execute()
186 2
            ->fetchColumn(0);
187
    }
188
189
    /**
190
     * This method can be used to count all queue entries which are
191
     * scheduled for now or a earlier date and are assigned to a process.
192
     */
193 2
    public function countAllAssignedPendingItems(): int
194
    {
195 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
196
197
        return $queryBuilder
198 2
            ->count('*')
199 2
            ->from(self::TABLE_NAME)
200 2
            ->where(
201 2
                $queryBuilder->expr()->neq('process_id', '""'),
202 2
                $queryBuilder->expr()->eq('exec_time', 0),
203 2
                $queryBuilder->expr()->lte('scheduled', time())
204
            )
205 2
            ->execute()
206 2
            ->fetchColumn(0);
207
    }
208
209
    /**
210
     * This method can be used to count all queue entrys which are
211
     * scheduled for now or a earlier date and are not assigned to a process.
212
     */
213 2
    public function countAllUnassignedPendingItems(): int
214
    {
215 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
216
217
        return $queryBuilder
218 2
            ->count('*')
219 2
            ->from(self::TABLE_NAME)
220 2
            ->where(
221 2
                $queryBuilder->expr()->eq('process_id', '""'),
222 2
                $queryBuilder->expr()->eq('exec_time', 0),
223 2
                $queryBuilder->expr()->lte('scheduled', time())
224
            )
225 2
            ->execute()
226 2
            ->fetchColumn(0);
227
    }
228
229
    /**
230
     * Count pending queue entries grouped by configuration key
231
     */
232 1
    public function countPendingItemsGroupedByConfigurationKey(): array
233
    {
234 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
235
        $statement = $queryBuilder
236 1
            ->from(self::TABLE_NAME)
237 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
238 1
            ->addSelect('configuration')
239 1
            ->where(
240 1
                $queryBuilder->expr()->eq('exec_time', 0),
241 1
                $queryBuilder->expr()->lt('scheduled', time())
242
            )
243 1
            ->groupBy('configuration')
244 1
            ->execute();
245
246 1
        return $statement->fetchAll();
247
    }
248
249
    /**
250
     * Get set id with unprocessed entries
251
     *
252
     * @return array array of set ids
253
     */
254 1
    public function getSetIdWithUnprocessedEntries(): array
255
    {
256 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
257
        $statement = $queryBuilder
258 1
            ->select('set_id')
259 1
            ->from(self::TABLE_NAME)
260 1
            ->where(
261 1
                $queryBuilder->expr()->lt('scheduled', time()),
262 1
                $queryBuilder->expr()->eq('exec_time', 0)
263
            )
264 1
            ->addGroupBy('set_id')
265 1
            ->execute();
266
267 1
        $setIds = [];
268 1
        while ($row = $statement->fetch()) {
269 1
            $setIds[] = intval($row['set_id']);
270
        }
271
272 1
        return $setIds;
273
    }
274
275
    /**
276
     * Get total queue entries by configuration
277
     *
278
     * @return array totals by configuration (keys)
279
     */
280 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
281
    {
282 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
283 1
        $totals = [];
284 1
        if (! empty($setIds)) {
285
            $statement = $queryBuilder
286 1
                ->from(self::TABLE_NAME)
287 1
                ->selectLiteral('count(*) as c')
288 1
                ->addSelect('configuration')
289 1
                ->where(
290 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
291 1
                    $queryBuilder->expr()->lt('scheduled', time())
292
                )
293 1
                ->groupBy('configuration')
294 1
                ->execute();
295
296 1
            while ($row = $statement->fetch()) {
297 1
                $totals[$row['configuration']] = $row['c'];
298
            }
299
        }
300
301 1
        return $totals;
302
    }
303
304
    /**
305
     * Get the timestamps of the last processed entries
306
     *
307
     * @param int $limit
308
     */
309 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
310
    {
311 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
312
        $statement = $queryBuilder
313 1
            ->select('exec_time')
314 1
            ->from(self::TABLE_NAME)
315 1
            ->addOrderBy('exec_time', 'desc')
316 1
            ->setMaxResults($limit)
317 1
            ->execute();
318
319 1
        $rows = [];
320 1
        while ($row = $statement->fetch()) {
321 1
            $rows[] = $row['exec_time'];
322
        }
323
324 1
        return $rows;
325
    }
326
327
    /**
328
     * Get the last processed entries
329
     *
330
     * @param int $limit
331
     */
332 1
    public function getLastProcessedEntries($limit = 100): array
333
    {
334 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
335
        $statement = $queryBuilder
336 1
            ->from(self::TABLE_NAME)
337 1
            ->select('*')
338 1
            ->orderBy('exec_time', 'desc')
339 1
            ->setMaxResults($limit)
340 1
            ->execute();
341
342 1
        $rows = [];
343 1
        while (($row = $statement->fetch()) !== false) {
344 1
            $rows[] = $row;
345
        }
346
347 1
        return $rows;
348
    }
349
350
    /**
351
     * Get performance statistics data
352
     *
353
     * @param int $start timestamp
354
     * @param int $end timestamp
355
     *
356
     * @return array performance data
357
     */
358 1
    public function getPerformanceData($start, $end): array
359
    {
360 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
361
        $statement = $queryBuilder
362 1
            ->from(self::TABLE_NAME)
363 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
364 1
            ->addSelect('process_id_completed')
365 1
            ->where(
366 1
                $queryBuilder->expr()->neq('exec_time', 0),
367 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
368 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
369
            )
370 1
            ->groupBy('process_id_completed')
371 1
            ->execute();
372
373 1
        $rows = [];
374 1
        while ($row = $statement->fetch()) {
375 1
            $rows[$row['process_id_completed']] = $row;
376
        }
377
378 1
        return $rows;
379
    }
380
381
    /**
382
     * Determines if a page is queued
383
     */
384 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
385
    {
386 5
        $isPageInQueue = false;
387
388 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
389
        $statement = $queryBuilder
390 5
            ->from(self::TABLE_NAME)
391 5
            ->count('*')
392 5
            ->where(
393 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
394
            );
395
396 5
        if ($unprocessed_only !== false) {
397 2
            $statement->andWhere(
398 2
                $queryBuilder->expr()->eq('exec_time', 0)
399
            );
400
        }
401
402 5
        if ($timed_only !== false) {
403 1
            $statement->andWhere(
404 1
                $queryBuilder->expr()->neq('scheduled', 0)
405
            );
406
        }
407
408 5
        if ($timestamp) {
409 1
            $statement->andWhere(
410 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
411
            );
412
        }
413
414
        // TODO: Currently it's not working if page doesn't exists. See tests
415
        $count = $statement
416 5
            ->execute()
417 5
            ->fetchColumn(0);
418
419 5
        if ($count !== false && $count > 0) {
420 4
            $isPageInQueue = true;
421
        }
422
423 5
        return $isPageInQueue;
424
    }
425
426
    /**
427
     * Method to check if a page is in the queue which is timed for a
428
     * date when it should be crawled
429
     */
430 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
431
    {
432 1
        return $this->isPageInQueue($uid, $show_unprocessed);
433
    }
434
435 1
    public function getAvailableSets(): array
436
    {
437 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
438
        $statement = $queryBuilder
439 1
            ->selectLiteral('count(*) as count_value')
440 1
            ->addSelect('set_id', 'scheduled')
441 1
            ->from(self::TABLE_NAME)
442 1
            ->orderBy('scheduled', 'desc')
443 1
            ->groupBy('set_id', 'scheduled')
444 1
            ->execute();
445
446 1
        $rows = [];
447 1
        while ($row = $statement->fetch()) {
448 1
            $rows[] = $row;
449
        }
450
451 1
        return $rows;
452
    }
453
454 1
    public function findByQueueId(string $queueId): ?array
455
    {
456 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
457
        $queueRec = $queryBuilder
458 1
            ->select('*')
459 1
            ->from(self::TABLE_NAME)
460 1
            ->where(
461 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
462
            )
463 1
            ->execute()
464 1
            ->fetch();
465 1
        return is_array($queueRec) ? $queueRec : null;
466
    }
467
468 3
    public function cleanupQueue(): void
469
    {
470 3
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
471 3
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
472
473 3
        if ($purgeDays > 0) {
474 3
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
475
476 3
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
477
            $del = $queryBuilderDelete
478 3
                ->delete(self::TABLE_NAME)
479 3
                ->where(
480 3
                    'exec_time != 0 AND exec_time < ' . $purgeDate
481 3
                )->execute();
482
483 3
            if ($del === false) {
484
                $this->logger->info(
0 ignored issues
show
Bug introduced by
The method info() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

484
                $this->logger->/** @scrutinizer ignore-call */ 
485
                               info(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
485
                    'Records could not be deleted.'
486
                );
487
            }
488
        }
489 3
    }
490
491
    /**
492
     * Cleans up entries that stayed for too long in the queue. These are default:
493
     * - processed entries that are over 1.5 days in age
494
     * - scheduled entries that are over 7 days old
495
     */
496 1
    public function cleanUpOldQueueEntries(): void
497
    {
498 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
499
        // 24*60*60 Seconds in 24 hours
500 1
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400;
501 1
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
502
503 1
        $now = time();
504 1
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
505
506 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
507
        $del = $queryBuilder
508 1
            ->delete(self::TABLE_NAME)
509 1
            ->where(
510 1
                $condition
511 1
            )->execute();
512
513 1
        if ($del === false) {
514
            $this->logger->info(
515
                'Records could not be deleted.'
516
            );
517
        }
518 1
    }
519
520 4
    public function fetchRecordsToBeCrawled(int $countInARun): array
521
    {
522 4
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
523
        return $queryBuilderSelect
524 4
            ->select('qid', 'scheduled', 'page_id', 'sitemap_priority')
525 4
            ->from(self::TABLE_NAME)
526 4
            ->leftJoin(
527 4
                self::TABLE_NAME,
528 4
                'pages',
529 4
                'p',
530 4
                $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier(self::TABLE_NAME . '.page_id'))
531
            )
532 4
            ->where(
533 4
                $queryBuilderSelect->expr()->eq('exec_time', 0),
534 4
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
535 4
                $queryBuilderSelect->expr()->lte('scheduled', time())
536
            )
537 4
            ->orderBy('sitemap_priority', 'DESC')
538 4
            ->addOrderBy('scheduled')
539 4
            ->addOrderBy('qid')
540 4
            ->setMaxResults($countInARun)
541 4
            ->execute()
542 4
            ->fetchAll();
543
    }
544
545 3
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
546
    {
547 3
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
548
        return $queryBuilderUpdate
549 3
            ->update(self::TABLE_NAME)
550 3
            ->where(
551 3
                $queryBuilderUpdate->expr()->in('qid', $quidList)
552
            )
553 3
            ->set('process_scheduled', time())
554 3
            ->set('process_id', $processId)
555 3
            ->execute();
556
    }
557
558 3
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
559
    {
560 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
561
        $queryBuilder
562 3
            ->update(self::TABLE_NAME)
563 3
            ->where(
564 3
                $queryBuilder->expr()->eq('exec_time', 0),
565 3
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
566
            )
567 3
            ->set('process_scheduled', 0)
568 3
            ->set('process_id', '')
569 3
            ->execute();
570 3
    }
571
572
    /**
573
     * This method is used to count if there are ANY unprocessed queue entries
574
     * of a given page_id and the configuration which matches a given hash.
575
     * If there if none, we can skip an inner detail check
576
     */
577 7
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist(int $uid, string $configurationHash): bool
578
    {
579 7
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
580 7
        $noUnprocessedQueueEntriesFound = true;
581
582
        $result = $queryBuilder
583 7
            ->count('*')
584 7
            ->from(self::TABLE_NAME)
585 7
            ->where(
586 7
                $queryBuilder->expr()->eq('page_id', $uid),
587 7
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
588 7
                $queryBuilder->expr()->eq('exec_time', 0)
589
            )
590 7
            ->execute()
591 7
            ->fetchColumn();
592
593 7
        if ($result) {
594 7
            $noUnprocessedQueueEntriesFound = false;
595
        }
596
597 7
        return $noUnprocessedQueueEntriesFound;
598
    }
599
600
    /**
601
     * Removes queue entries
602
     */
603 8
    public function flushQueue(QueueFilter $queueFilter): void
604
    {
605 8
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
606
607 8
        switch ($queueFilter) {
608 8
            case 'all':
609
                // No where claus needed delete everything
610 4
                break;
611 4
            case 'pending':
612 2
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
613 2
                break;
614 2
            case 'finished':
615
            default:
616 2
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
617 2
                break;
618
        }
619
620
        $queryBuilder
621 8
            ->delete(self::TABLE_NAME)
622 8
            ->execute();
623 8
    }
624
625
    /**
626
     * @param string $processId
627
     *
628
     * @return bool|string
629
     * @deprecated Using QueueRepository->countAllByProcessId() is deprecated since 9.1.5 and will be removed in v11.x, please use QueueRepository->findByProcessId()->count() instead
630
     */
631
    public function countAllByProcessId($processId)
632
    {
633
        trigger_error(
634
            'Using QueueRepository->countAllByProcessId() is deprecated since 9.1.5 and will be removed in v11.x, please use QueueRepository->findByProcessId()->count() instead',
635
            E_USER_DEPRECATED
636
        );
637
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
638
639
        return $queryBuilder
640
            ->count('*')
641
            ->from(self::TABLE_NAME)
642
            ->where(
643
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId, \PDO::PARAM_STR))
644
            )
645
            ->execute()
646
            ->fetchColumn(0);
647
    }
648
649 12
    public function getDuplicateQueueItemsIfExists(bool $enableTimeslot, int $timestamp, int $currentTime, int $pageId, string $parametersHash): array
650
    {
651 12
        $rows = [];
652
653 12
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
654
        $queryBuilder
655 12
            ->select('qid')
656 12
            ->from(QueueRepository::TABLE_NAME);
657
        //if this entry is scheduled with "now"
658 12
        if ($timestamp <= $currentTime) {
659 4
            if ($enableTimeslot) {
660 3
                $timeBegin = $currentTime - 100;
661 3
                $timeEnd = $currentTime + 100;
662
                $queryBuilder
663 3
                    ->where(
664 3
                        'scheduled BETWEEN ' . $timeBegin . ' AND ' . $timeEnd . ''
665
                    )
666 3
                    ->orWhere(
667 3
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
668
                    );
669
            } else {
670
                $queryBuilder
671 1
                    ->where(
672 4
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
673
                    );
674
            }
675 8
        } elseif ($timestamp > $currentTime) {
676
            //entry with a timestamp in the future need to have the same schedule time
677
            $queryBuilder
678 8
                ->where(
679 8
                    $queryBuilder->expr()->eq('scheduled', $timestamp)
680
                );
681
        }
682
683
        $queryBuilder
684 12
            ->andWhere('NOT exec_time')
685 12
            ->andWhere('NOT process_id')
686 12
            ->andWhere($queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($pageId, \PDO::PARAM_INT)))
687 12
            ->andWhere($queryBuilder->expr()->eq('parameters_hash', $queryBuilder->createNamedParameter($parametersHash, \PDO::PARAM_STR)));
688
689 12
        $statement = $queryBuilder->execute();
690
691 12
        while ($row = $statement->fetch()) {
692 9
            $rows[] = $row['qid'];
693
        }
694
695 12
        return $rows;
696
    }
697
698 4
    public function getQueueEntriesForPageId(int $id, int $itemsPerPage, QueueFilter $queueFilter): array
699
    {
700 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
701
        $queryBuilder
702 4
            ->select('*')
703 4
            ->from(self::TABLE_NAME)
704 4
            ->where(
705 4
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($id, PDO::PARAM_INT))
706
            )
707 4
            ->orderBy('scheduled', 'DESC');
708
709 4
        $expressionBuilder = GeneralUtility::makeInstance(ConnectionPool::class)
710 4
            ->getConnectionForTable(self::TABLE_NAME)
711 4
            ->getExpressionBuilder();
712 4
        $query = $expressionBuilder->andX();
0 ignored issues
show
Unused Code introduced by
The assignment to $query is dead and can be removed.
Loading history...
713
        // PHPStorm adds the highlight that the $addWhere is immediately overwritten,
714
        // but the $query = $expressionBuilder->andX() ensures that the $addWhere is written correctly with AND
715
        // between the statements, it's not a mistake in the code.
716 4
        switch ($queueFilter) {
717 4
            case 'pending':
718 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
719 1
                break;
720 3
            case 'finished':
721 1
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
722 1
                break;
723
        }
724
725 4
        if ($itemsPerPage > 0) {
726
            $queryBuilder
727 4
                ->setMaxResults($itemsPerPage);
728
        }
729
730 4
        return $queryBuilder->execute()->fetchAll();
731
    }
732
733
    /**
734
     * This internal helper method is used to create an instance of an entry object
735
     *
736
     * @param Process $process
737
     * @param string $orderByField first matching item will be returned as object
738
     * @param string $orderBySorting sorting direction
739
     */
740 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
741
    {
742 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable(self::TABLE_NAME);
743
        $first = $queryBuilder
744 5
            ->select('*')
745 5
            ->from(self::TABLE_NAME)
746 5
            ->where(
747 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
748 5
                $queryBuilder->expr()->gt('exec_time', 0)
749
            )
750 5
            ->setMaxResults(1)
751 5
            ->addOrderBy($orderByField, $orderBySorting)
752 5
            ->execute()->fetch(0);
753
754 5
        return $first ?: [];
755
    }
756
}
757