Passed
Pull Request — master (#677)
by Tomas Norre
08:27 queued 04:57
created

QueueRepository::getQueueEntriesForPageId()   A

Complexity

Conditions 4
Paths 6

Size

Total Lines 33
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 22
c 0
b 0
f 0
nc 6
nop 3
dl 0
loc 33
ccs 21
cts 21
cp 1
crap 4
rs 9.568
1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use AOE\Crawler\Value\QueueFilter;
34
use PDO;
35
use Psr\Log\LoggerAwareInterface;
36
use TYPO3\CMS\Core\Database\Connection;
37
use TYPO3\CMS\Core\Database\ConnectionPool;
38
use TYPO3\CMS\Core\Utility\GeneralUtility;
39
use TYPO3\CMS\Extbase\Object\ObjectManager;
40
use TYPO3\CMS\Extbase\Persistence\Repository;
41
42
/**
43
 * Class QueueRepository
44
 *
45
 * @package AOE\Crawler\Domain\Repository
46
 */
47
class QueueRepository extends Repository implements LoggerAwareInterface
48
{
49
    use \Psr\Log\LoggerAwareTrait;
50
51
    /**
52
     * @var string
53
     */
54
    protected $tableName = 'tx_crawler_queue';
55
56
    /**
57
     * @var array
58
     */
59
    protected $extensionSettings;
60
61 109
    public function __construct()
62
    {
63 109
        $objectManager = GeneralUtility::makeInstance(ObjectManager::class);
64 109
        $this->extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
65
66 109
        parent::__construct($objectManager);
67 109
    }
68
69 3
    public function unsetQueueProcessId(string $processId): void
70
    {
71 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
72
        $queryBuilder
73 3
            ->update($this->tableName)
74 3
            ->where(
75 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
76
            )
77 3
            ->set('process_id', '')
78 3
            ->execute();
79 3
    }
80
81
    /**
82
     * This method is used to find the youngest entry for a given process.
83
     */
84 1
    public function findYoungestEntryForProcess(Process $process): array
85
    {
86 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
87
    }
88
89
    /**
90
     * This method is used to find the oldest entry for a given process.
91
     */
92 1
    public function findOldestEntryForProcess(Process $process): array
93
    {
94 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
95
    }
96
97
    /**
98
     * Counts all executed items of a process.
99
     *
100
     * @param Process $process
101
     */
102 1
    public function countExecutedItemsByProcess($process): int
103
    {
104 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
105
106
        return $queryBuilder
107 1
            ->count('*')
108 1
            ->from($this->tableName)
109 1
            ->where(
110 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
111 1
                $queryBuilder->expr()->gt('exec_time', 0)
112
            )
113 1
            ->execute()
114 1
            ->fetchColumn(0);
115
    }
116
117
    /**
118
     * Counts items of a process which yet have not been processed/executed
119
     *
120
     * @param Process $process
121
     */
122 1
    public function countNonExecutedItemsByProcess($process): int
123
    {
124 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
125
126
        return $queryBuilder
127 1
            ->count('*')
128 1
            ->from($this->tableName)
129 1
            ->where(
130 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
131 1
                $queryBuilder->expr()->eq('exec_time', 0)
132
            )
133 1
            ->execute()
134 1
            ->fetchColumn(0);
135
    }
136
137
    /**
138
     * get items which have not been processed yet
139
     */
140 5
    public function getUnprocessedItems(): array
141
    {
142 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
143
144
        return $queryBuilder
145 5
            ->select('*')
146 5
            ->from($this->tableName)
147 5
            ->where(
148 5
                $queryBuilder->expr()->eq('exec_time', 0)
149
            )
150 5
            ->execute()->fetchAll();
151
    }
152
153
    /**
154
     * Count items which have not been processed yet
155
     */
156 5
    public function countUnprocessedItems(): int
157
    {
158 5
        return count($this->getUnprocessedItems());
159
    }
160
161
    /**
162
     * This method can be used to count all queue entrys which are
163
     * scheduled for now or a earlier date.
164
     */
165 2
    public function countAllPendingItems(): int
166
    {
167 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
168
169
        return $queryBuilder
170 2
            ->count('*')
171 2
            ->from($this->tableName)
172 2
            ->where(
173 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
174 2
                $queryBuilder->expr()->eq('exec_time', 0),
175 2
                $queryBuilder->expr()->lte('scheduled', time())
176
            )
177 2
            ->execute()
178 2
            ->fetchColumn(0);
179
    }
180
181
    /**
182
     * This method can be used to count all queue entries which are
183
     * scheduled for now or a earlier date and are assigned to a process.
184
     */
185 2
    public function countAllAssignedPendingItems(): int
186
    {
187 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
188
189
        return $queryBuilder
190 2
            ->count('*')
191 2
            ->from($this->tableName)
192 2
            ->where(
193 2
                $queryBuilder->expr()->neq('process_id', '""'),
194 2
                $queryBuilder->expr()->eq('exec_time', 0),
195 2
                $queryBuilder->expr()->lte('scheduled', time())
196
            )
197 2
            ->execute()
198 2
            ->fetchColumn(0);
199
    }
200
201
    /**
202
     * This method can be used to count all queue entrys which are
203
     * scheduled for now or a earlier date and are not assigned to a process.
204
     */
205 2
    public function countAllUnassignedPendingItems(): int
206
    {
207 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
208
209
        return $queryBuilder
210 2
            ->count('*')
211 2
            ->from($this->tableName)
212 2
            ->where(
213 2
                $queryBuilder->expr()->eq('process_id', '""'),
214 2
                $queryBuilder->expr()->eq('exec_time', 0),
215 2
                $queryBuilder->expr()->lte('scheduled', time())
216
            )
217 2
            ->execute()
218 2
            ->fetchColumn(0);
219
    }
220
221
    /**
222
     * Count pending queue entries grouped by configuration key
223
     */
224 1
    public function countPendingItemsGroupedByConfigurationKey(): array
225
    {
226 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
227
        $statement = $queryBuilder
228 1
            ->from($this->tableName)
229 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
230 1
            ->addSelect('configuration')
231 1
            ->where(
232 1
                $queryBuilder->expr()->eq('exec_time', 0),
233 1
                $queryBuilder->expr()->lt('scheduled', time())
234
            )
235 1
            ->groupBy('configuration')
236 1
            ->execute();
237
238 1
        return $statement->fetchAll();
239
    }
240
241
    /**
242
     * Get set id with unprocessed entries
243
     *
244
     * @return array array of set ids
245
     */
246 1
    public function getSetIdWithUnprocessedEntries(): array
247
    {
248 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
249
        $statement = $queryBuilder
250 1
            ->select('set_id')
251 1
            ->from($this->tableName)
252 1
            ->where(
253 1
                $queryBuilder->expr()->lt('scheduled', time()),
254 1
                $queryBuilder->expr()->eq('exec_time', 0)
255
            )
256 1
            ->addGroupBy('set_id')
257 1
            ->execute();
258
259 1
        $setIds = [];
260 1
        while ($row = $statement->fetch()) {
261 1
            $setIds[] = intval($row['set_id']);
262
        }
263
264 1
        return $setIds;
265
    }
266
267
    /**
268
     * Get total queue entries by configuration
269
     *
270
     * @return array totals by configuration (keys)
271
     */
272 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
273
    {
274 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
275 1
        $totals = [];
276 1
        if (count($setIds) > 0) {
277
            $statement = $queryBuilder
278 1
                ->from($this->tableName)
279 1
                ->selectLiteral('count(*) as c')
280 1
                ->addSelect('configuration')
281 1
                ->where(
282 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
283 1
                    $queryBuilder->expr()->lt('scheduled', time())
284
                )
285 1
                ->groupBy('configuration')
286 1
                ->execute();
287
288 1
            while ($row = $statement->fetch()) {
289 1
                $totals[$row['configuration']] = $row['c'];
290
            }
291
        }
292
293 1
        return $totals;
294
    }
295
296
    /**
297
     * Get the timestamps of the last processed entries
298
     *
299
     * @param int $limit
300
     */
301 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
302
    {
303 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
304
        $statement = $queryBuilder
305 1
            ->select('exec_time')
306 1
            ->from($this->tableName)
307 1
            ->addOrderBy('exec_time', 'desc')
308 1
            ->setMaxResults($limit)
309 1
            ->execute();
310
311 1
        $rows = [];
312 1
        while ($row = $statement->fetch()) {
313 1
            $rows[] = $row['exec_time'];
314
        }
315
316 1
        return $rows;
317
    }
318
319
    /**
320
     * Get the last processed entries
321
     *
322
     * @param int $limit
323
     */
324 1
    public function getLastProcessedEntries($limit = 100): array
325
    {
326 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
327
        $statement = $queryBuilder
328 1
            ->from($this->tableName)
329 1
            ->select('*')
330 1
            ->orderBy('exec_time', 'desc')
331 1
            ->setMaxResults($limit)
332 1
            ->execute();
333
334 1
        $rows = [];
335 1
        while (($row = $statement->fetch()) !== false) {
336 1
            $rows[] = $row;
337
        }
338
339 1
        return $rows;
340
    }
341
342
    /**
343
     * Get performance statistics data
344
     *
345
     * @param int $start timestamp
346
     * @param int $end timestamp
347
     *
348
     * @return array performance data
349
     */
350 1
    public function getPerformanceData($start, $end): array
351
    {
352 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
353
        $statement = $queryBuilder
354 1
            ->from($this->tableName)
355 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
356 1
            ->addSelect('process_id_completed')
357 1
            ->where(
358 1
                $queryBuilder->expr()->neq('exec_time', 0),
359 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
360 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
361
            )
362 1
            ->groupBy('process_id_completed')
363 1
            ->execute();
364
365 1
        $rows = [];
366 1
        while ($row = $statement->fetch()) {
367 1
            $rows[$row['process_id_completed']] = $row;
368
        }
369
370 1
        return $rows;
371
    }
372
373
    /**
374
     * Determines if a page is queued
375
     */
376 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
377
    {
378 5
        $isPageInQueue = false;
379
380 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
381
        $statement = $queryBuilder
382 5
            ->from($this->tableName)
383 5
            ->count('*')
384 5
            ->where(
385 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
386
            );
387
388 5
        if ($unprocessed_only !== false) {
389 2
            $statement->andWhere(
390 2
                $queryBuilder->expr()->eq('exec_time', 0)
391
            );
392
        }
393
394 5
        if ($timed_only !== false) {
395 1
            $statement->andWhere(
396 1
                $queryBuilder->expr()->neq('scheduled', 0)
397
            );
398
        }
399
400 5
        if ($timestamp) {
401 1
            $statement->andWhere(
402 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
403
            );
404
        }
405
406
        // TODO: Currently it's not working if page doesn't exists. See tests
407
        $count = $statement
408 5
            ->execute()
409 5
            ->fetchColumn(0);
410
411 5
        if ($count !== false && $count > 0) {
412 4
            $isPageInQueue = true;
413
        }
414
415 5
        return $isPageInQueue;
416
    }
417
418
    /**
419
     * Method to check if a page is in the queue which is timed for a
420
     * date when it should be crawled
421
     */
422 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
423
    {
424 1
        return $this->isPageInQueue($uid, $show_unprocessed);
425
    }
426
427 1
    public function getAvailableSets(): array
428
    {
429 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
430
        $statement = $queryBuilder
431 1
            ->selectLiteral('count(*) as count_value')
432 1
            ->addSelect('set_id', 'scheduled')
433 1
            ->from($this->tableName)
434 1
            ->orderBy('scheduled', 'desc')
435 1
            ->groupBy('set_id', 'scheduled')
436 1
            ->execute();
437
438 1
        $rows = [];
439 1
        while ($row = $statement->fetch()) {
440 1
            $rows[] = $row;
441
        }
442
443 1
        return $rows;
444
    }
445
446 1
    public function findByQueueId(string $queueId): ?array
447
    {
448 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
449
        $queueRec = $queryBuilder
450 1
            ->select('*')
451 1
            ->from($this->tableName)
452 1
            ->where(
453 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
454
            )
455 1
            ->execute()
456 1
            ->fetch();
457 1
        return is_array($queueRec) ? $queueRec : null;
458
    }
459
460 1
    public function cleanupQueue(): void
461
    {
462 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
463 1
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
464
465 1
        if ($purgeDays > 0) {
466 1
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
467
468 1
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
469
            $del = $queryBuilderDelete
470 1
                ->delete($this->tableName)
471 1
                ->where(
472 1
                    'exec_time != 0 AND exec_time < ' . $purgeDate
473 1
                )->execute();
474
475 1
            if ($del === false) {
476
                $this->logger->info(
477
                    'Records could not be deleted.'
478
                );
479
            }
480
        }
481 1
    }
482
483
    /**
484
     * Cleans up entries that stayed for too long in the queue. These are default:
485
     * - processed entries that are over 1.5 days in age
486
     * - scheduled entries that are over 7 days old
487
     */
488 1
    public function cleanUpOldQueueEntries(): void
489
    {
490 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
491
        // 24*60*60 Seconds in 24 hours
492 1
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400;
493 1
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
494
495 1
        $now = time();
496 1
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
497
498 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
499
        $del = $queryBuilder
500 1
            ->delete($this->tableName)
501 1
            ->where(
502 1
                $condition
503 1
            )->execute();
504
505 1
        if ($del === false) {
506
            $this->logger->info(
507
                'Records could not be deleted.'
508
            );
509
        }
510 1
    }
511
512 2
    public function fetchRecordsToBeCrawled(int $countInARun)
513
    {
514 2
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
515
        return $queryBuilderSelect
516 2
            ->select('qid', 'scheduled', 'page_id', 'sitemap_priority')
517 2
            ->from($this->tableName)
518 2
            ->leftJoin(
519 2
                $this->tableName,
520 2
                'pages',
521 2
                'p',
522 2
                $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier($this->tableName . '.page_id'))
523
            )
524 2
            ->where(
525 2
                $queryBuilderSelect->expr()->eq('exec_time', 0),
526 2
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
527 2
                $queryBuilderSelect->expr()->lte('scheduled', time())
528
            )
529 2
            ->orderBy('sitemap_priority', 'DESC')
530 2
            ->addOrderBy('scheduled')
531 2
            ->addOrderBy('qid')
532 2
            ->setMaxResults($countInARun)
533 2
            ->execute()
534 2
            ->fetchAll();
535
    }
536
537 1
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
538
    {
539 1
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
540
        return $queryBuilderUpdate
541 1
            ->update($this->tableName)
542 1
            ->where(
543 1
                $queryBuilderUpdate->expr()->in('qid', $quidList)
544
            )
545 1
            ->set('process_scheduled', time())
546 1
            ->set('process_id', $processId)
547 1
            ->execute();
548
    }
549
550 1
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
551
    {
552 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
553
        $queryBuilder
554 1
            ->update($this->tableName)
555 1
            ->where(
556 1
                $queryBuilder->expr()->eq('exec_time', 0),
557 1
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
558
            )
559 1
            ->set('process_scheduled', 0)
560 1
            ->set('process_id', '')
561 1
            ->execute();
562 1
    }
563
564
    /**
565
     * This method is used to count if there are ANY unprocessed queue entries
566
     * of a given page_id and the configuration which matches a given hash.
567
     * If there if none, we can skip an inner detail check
568
     *
569
     * @param int $uid
570
     * @param string $configurationHash
571
     * @return boolean
572
     */
573 4
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist($uid, $configurationHash): bool
574
    {
575 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
576 4
        $noUnprocessedQueueEntriesFound = true;
577
578
        $result = $queryBuilder
579 4
            ->count('*')
580 4
            ->from($this->tableName)
581 4
            ->where(
582 4
                $queryBuilder->expr()->eq('page_id', (int) $uid),
583 4
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
584 4
                $queryBuilder->expr()->eq('exec_time', 0)
585
            )
586 4
            ->execute()
587 4
            ->fetchColumn();
588
589 4
        if ($result) {
590 4
            $noUnprocessedQueueEntriesFound = false;
591
        }
592
593 4
        return $noUnprocessedQueueEntriesFound;
594
    }
595
596
    /**
597
     * Removes queue entries
598
     */
599 5
    public function flushQueue(QueueFilter $queueFilter): void
600
    {
601 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
602
603 5
        switch ($queueFilter) {
604 5
            case 'all':
605
                // No where claus needed delete everything
606 3
                break;
607 2
            case 'pending':
608 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
609 1
                break;
610 1
            case 'finished':
611
            default:
612 1
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
613 1
                break;
614
        }
615
616
        $queryBuilder
617 5
            ->delete($this->tableName)
618 5
            ->execute();
619 5
    }
620
621
    /**
622
     * @param string $processId
623
     *
624
     * @return bool|string
625
     */
626 1
    public function countAllByProcessId($processId)
627
    {
628 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
629
630
        return $queryBuilder
631 1
            ->count('*')
632 1
            ->from($this->tableName)
633 1
            ->where(
634 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId, \PDO::PARAM_STR))
635
            )
636 1
            ->execute()
637 1
            ->fetchColumn(0);
638
    }
639
640 9
    public function getDuplicateQueueItemsIfExists(bool $enableTimeslot, int $timestamp, int $currentTime, int $pageId, string $parametersHash): array
641
    {
642 9
        $rows = [];
643
644 9
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
645
        $queryBuilder
646 9
            ->select('qid')
647 9
            ->from('tx_crawler_queue');
648
        //if this entry is scheduled with "now"
649 9
        if ($timestamp <= $currentTime) {
650 3
            if ($enableTimeslot) {
651 2
                $timeBegin = $currentTime - 100;
652 2
                $timeEnd = $currentTime + 100;
653
                $queryBuilder
654 2
                    ->where(
655 2
                        'scheduled BETWEEN ' . $timeBegin . ' AND ' . $timeEnd . ''
656
                    )
657 2
                    ->orWhere(
658 2
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
659
                    );
660
            } else {
661
                $queryBuilder
662 1
                    ->where(
663 3
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
664
                    );
665
            }
666 6
        } elseif ($timestamp > $currentTime) {
667
            //entry with a timestamp in the future need to have the same schedule time
668
            $queryBuilder
669 6
                ->where(
670 6
                    $queryBuilder->expr()->eq('scheduled', $timestamp)
671
                );
672
        }
673
674
        $queryBuilder
675 9
            ->andWhere('NOT exec_time')
676 9
            ->andWhere('NOT process_id')
677 9
            ->andWhere($queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($pageId, \PDO::PARAM_INT)))
678 9
            ->andWhere($queryBuilder->expr()->eq('parameters_hash', $queryBuilder->createNamedParameter($parametersHash, \PDO::PARAM_STR)));
679
680 9
        $statement = $queryBuilder->execute();
681
682 9
        while ($row = $statement->fetch()) {
683 7
            $rows[] = $row['qid'];
684
        }
685
686 9
        return $rows;
687
    }
688
689 4
    public function getQueueEntriesForPageId(int $id, int $itemsPerPage, QueueFilter $queueFilter): array
690
    {
691 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
692
        $queryBuilder
693 4
            ->select('*')
694 4
            ->from($this->tableName)
695 4
            ->where(
696 4
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($id, PDO::PARAM_INT))
697
            )
698 4
            ->orderBy('scheduled', 'DESC');
699
700 4
        $expressionBuilder = GeneralUtility::makeInstance(ConnectionPool::class)
701 4
            ->getConnectionForTable($this->tableName)
702 4
            ->getExpressionBuilder();
703 4
        $query = $expressionBuilder->andX();
0 ignored issues
show
Unused Code introduced by
The assignment to $query is dead and can be removed.
Loading history...
704
        // PHPStorm adds the highlight that the $addWhere is immediately overwritten,
705
        // but the $query = $expressionBuilder->andX() ensures that the $addWhere is written correctly with AND
706
        // between the statements, it's not a mistake in the code.
707 4
        switch ($queueFilter) {
708 4
            case 'pending':
709 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
710 1
                break;
711 3
            case 'finished':
712 1
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
713 1
                break;
714
        }
715
716 4
        if ($itemsPerPage > 0) {
717
            $queryBuilder
718 4
                ->setMaxResults($itemsPerPage);
719
        }
720
721 4
        return $queryBuilder->execute()->fetchAll();
722
    }
723
724
    /**
725
     * This internal helper method is used to create an instance of an entry object
726
     *
727
     * @param Process $process
728
     * @param string $orderByField first matching item will be returned as object
729
     * @param string $orderBySorting sorting direction
730
     */
731 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
732
    {
733 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
734
        $first = $queryBuilder
735 5
            ->select('*')
736 5
            ->from($this->tableName)
737 5
            ->where(
738 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
739 5
                $queryBuilder->expr()->gt('exec_time', 0)
740
            )
741 5
            ->setMaxResults(1)
742 5
            ->addOrderBy($orderByField, $orderBySorting)
743 5
            ->execute()->fetch(0);
744
745 5
        return $first ?: [];
746
    }
747
}
748