Passed
Push — refactor/getDuplicateRowIfExis... ( f76ae0 )
by Tomas Norre
07:43
created

QueueRepository::findByQueueId()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 9
nc 2
nop 1
dl 0
loc 12
ccs 9
cts 9
cp 1
crap 2
rs 9.9666
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use AOE\Crawler\Value\QueueFilter;
34
use Psr\Log\LoggerAwareInterface;
35
use TYPO3\CMS\Core\Database\Connection;
36
use TYPO3\CMS\Core\Database\ConnectionPool;
37
use TYPO3\CMS\Core\Utility\GeneralUtility;
38
use TYPO3\CMS\Extbase\Object\ObjectManager;
39
use TYPO3\CMS\Extbase\Persistence\Repository;
40
41
/**
42
 * Class QueueRepository
43
 *
44
 * @package AOE\Crawler\Domain\Repository
45
 */
46
class QueueRepository extends Repository implements LoggerAwareInterface
47
{
48
    use \Psr\Log\LoggerAwareTrait;
49
50
    /**
51
     * @var string
52
     */
53
    protected $tableName = 'tx_crawler_queue';
54
55
    /**
56
     * @var array
57
     */
58
    protected $extensionSettings;
59
60 105
    public function __construct()
61
    {
62 105
        $objectManager = GeneralUtility::makeInstance(ObjectManager::class);
63 105
        $this->extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
64
65 105
        parent::__construct($objectManager);
66 105
    }
67
68 3
    public function unsetQueueProcessId(string $processId): void
69
    {
70 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
71
        $queryBuilder
72 3
            ->update($this->tableName)
73 3
            ->where(
74 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
75
            )
76 3
            ->set('process_id', '')
77 3
            ->execute();
78 3
    }
79
80
    /**
81
     * This method is used to find the youngest entry for a given process.
82
     */
83 1
    public function findYoungestEntryForProcess(Process $process): array
84
    {
85 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
86
    }
87
88
    /**
89
     * This method is used to find the oldest entry for a given process.
90
     */
91 1
    public function findOldestEntryForProcess(Process $process): array
92
    {
93 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
94
    }
95
96
    /**
97
     * Counts all executed items of a process.
98
     *
99
     * @param Process $process
100
     */
101 1
    public function countExecutedItemsByProcess($process): int
102
    {
103 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
104
105
        return $queryBuilder
106 1
            ->count('*')
107 1
            ->from($this->tableName)
108 1
            ->where(
109 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
110 1
                $queryBuilder->expr()->gt('exec_time', 0)
111
            )
112 1
            ->execute()
113 1
            ->fetchColumn(0);
114
    }
115
116
    /**
117
     * Counts items of a process which yet have not been processed/executed
118
     *
119
     * @param Process $process
120
     */
121 1
    public function countNonExecutedItemsByProcess($process): int
122
    {
123 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
124
125
        return $queryBuilder
126 1
            ->count('*')
127 1
            ->from($this->tableName)
128 1
            ->where(
129 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
130 1
                $queryBuilder->expr()->eq('exec_time', 0)
131
            )
132 1
            ->execute()
133 1
            ->fetchColumn(0);
134
    }
135
136
    /**
137
     * get items which have not been processed yet
138
     */
139 5
    public function getUnprocessedItems(): array
140
    {
141 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
142
143
        return $queryBuilder
144 5
            ->select('*')
145 5
            ->from($this->tableName)
146 5
            ->where(
147 5
                $queryBuilder->expr()->eq('exec_time', 0)
148
            )
149 5
            ->execute()->fetchAll();
150
    }
151
152
    /**
153
     * Count items which have not been processed yet
154
     */
155 5
    public function countUnprocessedItems(): int
156
    {
157 5
        return count($this->getUnprocessedItems());
158
    }
159
160
    /**
161
     * This method can be used to count all queue entrys which are
162
     * scheduled for now or a earlier date.
163
     */
164 2
    public function countAllPendingItems(): int
165
    {
166 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
167
168
        return $queryBuilder
169 2
            ->count('*')
170 2
            ->from($this->tableName)
171 2
            ->where(
172 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
173 2
                $queryBuilder->expr()->eq('exec_time', 0),
174 2
                $queryBuilder->expr()->lte('scheduled', time())
175
            )
176 2
            ->execute()
177 2
            ->fetchColumn(0);
178
    }
179
180
    /**
181
     * This method can be used to count all queue entries which are
182
     * scheduled for now or a earlier date and are assigned to a process.
183
     */
184 2
    public function countAllAssignedPendingItems(): int
185
    {
186 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
187
188
        return $queryBuilder
189 2
            ->count('*')
190 2
            ->from($this->tableName)
191 2
            ->where(
192 2
                $queryBuilder->expr()->neq('process_id', '""'),
193 2
                $queryBuilder->expr()->eq('exec_time', 0),
194 2
                $queryBuilder->expr()->lte('scheduled', time())
195
            )
196 2
            ->execute()
197 2
            ->fetchColumn(0);
198
    }
199
200
    /**
201
     * This method can be used to count all queue entrys which are
202
     * scheduled for now or a earlier date and are not assigned to a process.
203
     */
204 2
    public function countAllUnassignedPendingItems(): int
205
    {
206 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
207
208
        return $queryBuilder
209 2
            ->count('*')
210 2
            ->from($this->tableName)
211 2
            ->where(
212 2
                $queryBuilder->expr()->eq('process_id', '""'),
213 2
                $queryBuilder->expr()->eq('exec_time', 0),
214 2
                $queryBuilder->expr()->lte('scheduled', time())
215
            )
216 2
            ->execute()
217 2
            ->fetchColumn(0);
218
    }
219
220
    /**
221
     * Count pending queue entries grouped by configuration key
222
     */
223 1
    public function countPendingItemsGroupedByConfigurationKey(): array
224
    {
225 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
226
        $statement = $queryBuilder
227 1
            ->from($this->tableName)
228 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
229 1
            ->addSelect('configuration')
230 1
            ->where(
231 1
                $queryBuilder->expr()->eq('exec_time', 0),
232 1
                $queryBuilder->expr()->lt('scheduled', time())
233
            )
234 1
            ->groupBy('configuration')
235 1
            ->execute();
236
237 1
        return $statement->fetchAll();
238
    }
239
240
    /**
241
     * Get set id with unprocessed entries
242
     *
243
     * @return array array of set ids
244
     */
245 1
    public function getSetIdWithUnprocessedEntries(): array
246
    {
247 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
248
        $statement = $queryBuilder
249 1
            ->select('set_id')
250 1
            ->from($this->tableName)
251 1
            ->where(
252 1
                $queryBuilder->expr()->lt('scheduled', time()),
253 1
                $queryBuilder->expr()->eq('exec_time', 0)
254
            )
255 1
            ->addGroupBy('set_id')
256 1
            ->execute();
257
258 1
        $setIds = [];
259 1
        while ($row = $statement->fetch()) {
260 1
            $setIds[] = intval($row['set_id']);
261
        }
262
263 1
        return $setIds;
264
    }
265
266
    /**
267
     * Get total queue entries by configuration
268
     *
269
     * @return array totals by configuration (keys)
270
     */
271 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
272
    {
273 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
274 1
        $totals = [];
275 1
        if (count($setIds) > 0) {
276
            $statement = $queryBuilder
277 1
                ->from($this->tableName)
278 1
                ->selectLiteral('count(*) as c')
279 1
                ->addSelect('configuration')
280 1
                ->where(
281 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
282 1
                    $queryBuilder->expr()->lt('scheduled', time())
283
                )
284 1
                ->groupBy('configuration')
285 1
                ->execute();
286
287 1
            while ($row = $statement->fetch()) {
288 1
                $totals[$row['configuration']] = $row['c'];
289
            }
290
        }
291
292 1
        return $totals;
293
    }
294
295
    /**
296
     * Get the timestamps of the last processed entries
297
     *
298
     * @param int $limit
299
     */
300 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
301
    {
302 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
303
        $statement = $queryBuilder
304 1
            ->select('exec_time')
305 1
            ->from($this->tableName)
306 1
            ->addOrderBy('exec_time', 'desc')
307 1
            ->setMaxResults($limit)
308 1
            ->execute();
309
310 1
        $rows = [];
311 1
        while ($row = $statement->fetch()) {
312 1
            $rows[] = $row['exec_time'];
313
        }
314
315 1
        return $rows;
316
    }
317
318
    /**
319
     * Get the last processed entries
320
     *
321
     * @param int $limit
322
     */
323 1
    public function getLastProcessedEntries($limit = 100): array
324
    {
325 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
326
        $statement = $queryBuilder
327 1
            ->from($this->tableName)
328 1
            ->select('*')
329 1
            ->orderBy('exec_time', 'desc')
330 1
            ->setMaxResults($limit)
331 1
            ->execute();
332
333 1
        $rows = [];
334 1
        while (($row = $statement->fetch()) !== false) {
335 1
            $rows[] = $row;
336
        }
337
338 1
        return $rows;
339
    }
340
341
    /**
342
     * Get performance statistics data
343
     *
344
     * @param int $start timestamp
345
     * @param int $end timestamp
346
     *
347
     * @return array performance data
348
     */
349 1
    public function getPerformanceData($start, $end): array
350
    {
351 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
352
        $statement = $queryBuilder
353 1
            ->from($this->tableName)
354 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
355 1
            ->addSelect('process_id_completed')
356 1
            ->where(
357 1
                $queryBuilder->expr()->neq('exec_time', 0),
358 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
359 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
360
            )
361 1
            ->groupBy('process_id_completed')
362 1
            ->execute();
363
364 1
        $rows = [];
365 1
        while ($row = $statement->fetch()) {
366 1
            $rows[$row['process_id_completed']] = $row;
367
        }
368
369 1
        return $rows;
370
    }
371
372
    /**
373
     * Determines if a page is queued
374
     */
375 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
376
    {
377 5
        $isPageInQueue = false;
378
379 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
380
        $statement = $queryBuilder
381 5
            ->from($this->tableName)
382 5
            ->count('*')
383 5
            ->where(
384 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
385
            );
386
387 5
        if ($unprocessed_only !== false) {
388 2
            $statement->andWhere(
389 2
                $queryBuilder->expr()->eq('exec_time', 0)
390
            );
391
        }
392
393 5
        if ($timed_only !== false) {
394 1
            $statement->andWhere(
395 1
                $queryBuilder->expr()->neq('scheduled', 0)
396
            );
397
        }
398
399 5
        if ($timestamp) {
400 1
            $statement->andWhere(
401 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
402
            );
403
        }
404
405
        // TODO: Currently it's not working if page doesn't exists. See tests
406
        $count = $statement
407 5
            ->execute()
408 5
            ->fetchColumn(0);
409
410 5
        if ($count !== false && $count > 0) {
411 4
            $isPageInQueue = true;
412
        }
413
414 5
        return $isPageInQueue;
415
    }
416
417
    /**
418
     * Method to check if a page is in the queue which is timed for a
419
     * date when it should be crawled
420
     */
421 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
422
    {
423 1
        return $this->isPageInQueue($uid, $show_unprocessed);
424
    }
425
426 1
    public function getAvailableSets(): array
427
    {
428 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
429
        $statement = $queryBuilder
430 1
            ->selectLiteral('count(*) as count_value')
431 1
            ->addSelect('set_id', 'scheduled')
432 1
            ->from($this->tableName)
433 1
            ->orderBy('scheduled', 'desc')
434 1
            ->groupBy('set_id', 'scheduled')
435 1
            ->execute();
436
437 1
        $rows = [];
438 1
        while ($row = $statement->fetch()) {
439 1
            $rows[] = $row;
440
        }
441
442 1
        return $rows;
443
    }
444
445 1
    public function findByQueueId(string $queueId): ?array
446
    {
447 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
448
        $queueRec = $queryBuilder
449 1
            ->select('*')
450 1
            ->from($this->tableName)
451 1
            ->where(
452 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
453
            )
454 1
            ->execute()
455 1
            ->fetch();
456 1
        return is_array($queueRec) ? $queueRec : null;
457
    }
458
459 1
    public function cleanupQueue(): void
460
    {
461 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
462 1
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
463
464 1
        if ($purgeDays > 0) {
465 1
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
466
467 1
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
468
            $del = $queryBuilderDelete
469 1
                ->delete($this->tableName)
470 1
                ->where(
471 1
                    'exec_time != 0 AND exec_time < ' . $purgeDate
472 1
                )->execute();
473
474 1
            if ($del === false) {
475
                $this->logger->info(
476
                    'Records could not be deleted.'
477
                );
478
            }
479
        }
480 1
    }
481
482
    /**
483
     * Cleans up entries that stayed for too long in the queue. These are default:
484
     * - processed entries that are over 1.5 days in age
485
     * - scheduled entries that are over 7 days old
486
     */
487 1
    public function cleanUpOldQueueEntries(): void
488
    {
489 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
490
        // 24*60*60 Seconds in 24 hours
491 1
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400;
492 1
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
493
494 1
        $now = time();
495 1
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
496
497 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
498
        $del = $queryBuilder
499 1
            ->delete($this->tableName)
500 1
            ->where(
501 1
                $condition
502 1
            )->execute();
503
504 1
        if ($del === false) {
505
            $this->logger->info(
506
                'Records could not be deleted.'
507
            );
508
        }
509 1
    }
510
511 2
    public function fetchRecordsToBeCrawled(int $countInARun)
512
    {
513 2
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
514
        return $queryBuilderSelect
515 2
            ->select('qid', 'scheduled', 'page_id', 'sitemap_priority')
516 2
            ->from($this->tableName)
517 2
            ->leftJoin(
518 2
                $this->tableName,
519 2
                'pages',
520 2
                'p',
521 2
                $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier($this->tableName . '.page_id'))
522
            )
523 2
            ->where(
524 2
                $queryBuilderSelect->expr()->eq('exec_time', 0),
525 2
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
526 2
                $queryBuilderSelect->expr()->lte('scheduled', time())
527
            )
528 2
            ->orderBy('sitemap_priority', 'DESC')
529 2
            ->addOrderBy('scheduled')
530 2
            ->addOrderBy('qid')
531 2
            ->setMaxResults($countInARun)
532 2
            ->execute()
533 2
            ->fetchAll();
534
    }
535
536 1
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
537
    {
538 1
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
539
        return $queryBuilderUpdate
540 1
            ->update($this->tableName)
541 1
            ->where(
542 1
                $queryBuilderUpdate->expr()->in('qid', $quidList)
543
            )
544 1
            ->set('process_scheduled', time())
545 1
            ->set('process_id', $processId)
546 1
            ->execute();
547
    }
548
549 1
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
550
    {
551 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
552
        $queryBuilder
553 1
            ->update($this->tableName)
554 1
            ->where(
555 1
                $queryBuilder->expr()->eq('exec_time', 0),
556 1
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
557
            )
558 1
            ->set('process_scheduled', 0)
559 1
            ->set('process_id', '')
560 1
            ->execute();
561 1
    }
562
563
    /**
564
     * This method is used to count if there are ANY unprocessed queue entries
565
     * of a given page_id and the configuration which matches a given hash.
566
     * If there if none, we can skip an inner detail check
567
     *
568
     * @param int $uid
569
     * @param string $configurationHash
570
     * @return boolean
571
     */
572 4
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist($uid, $configurationHash): bool
573
    {
574 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
575 4
        $noUnprocessedQueueEntriesFound = true;
576
577
        $result = $queryBuilder
578 4
            ->count('*')
579 4
            ->from($this->tableName)
580 4
            ->where(
581 4
                $queryBuilder->expr()->eq('page_id', (int) $uid),
582 4
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
583 4
                $queryBuilder->expr()->eq('exec_time', 0)
584
            )
585 4
            ->execute()
586 4
            ->fetchColumn();
587
588 4
        if ($result) {
589 4
            $noUnprocessedQueueEntriesFound = false;
590
        }
591
592 4
        return $noUnprocessedQueueEntriesFound;
593
    }
594
595
    /**
596
     * Removes queue entries
597
     */
598 5
    public function flushQueue(QueueFilter $queueFilter): void
599
    {
600 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
601
602 5
        switch ($queueFilter) {
603 5
            case 'all':
604
                // No where claus needed delete everything
605 3
                break;
606 2
            case 'pending':
607 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
608 1
                break;
609 1
            case 'finished':
610
            default:
611 1
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
612 1
                break;
613
        }
614
615
        $queryBuilder
616 5
            ->delete($this->tableName)
617 5
            ->execute();
618 5
    }
619
620
    /**
621
     * @param string $processId
622
     *
623
     * @return bool|string
624
     */
625 1
    public function countAllByProcessId($processId)
626
    {
627 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
628
629
        return $queryBuilder
630 1
            ->count('*')
631 1
            ->from($this->tableName)
632 1
            ->where(
633 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId, \PDO::PARAM_STR))
634
            )
635 1
            ->execute()
636 1
            ->fetchColumn(0);
637
    }
638
639 9
    public function getDuplicateQueueItemsIfExists(bool $enableTimeslot, int $timestamp, int $currentTime, int $pageId, string $parametersHash): array
640
    {
641 9
        $rows = [];
642
643 9
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
644
        $queryBuilder
645 9
            ->select('qid')
646 9
            ->from('tx_crawler_queue');
647
        //if this entry is scheduled with "now"
648 9
        if ($timestamp <= $currentTime) {
649 3
            if ($enableTimeslot) {
650 2
                $timeBegin = $currentTime - 100;
651 2
                $timeEnd = $currentTime + 100;
652
                $queryBuilder
653 2
                    ->where(
654 2
                        'scheduled BETWEEN ' . $timeBegin . ' AND ' . $timeEnd . ''
655
                    )
656 2
                    ->orWhere(
657 2
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
658
                    );
659
            } else {
660
                $queryBuilder
661 1
                    ->where(
662 3
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
663
                    );
664
            }
665 6
        } elseif ($timestamp > $currentTime) {
666
            //entry with a timestamp in the future need to have the same schedule time
667
            $queryBuilder
668 6
                ->where(
669 6
                    $queryBuilder->expr()->eq('scheduled', $timestamp)
670
                );
671
        }
672
673
        $queryBuilder
674 9
            ->andWhere('NOT exec_time')
675 9
            ->andWhere('NOT process_id')
676 9
            ->andWhere($queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($pageId, \PDO::PARAM_INT)))
677 9
            ->andWhere($queryBuilder->expr()->eq('parameters_hash', $queryBuilder->createNamedParameter($parametersHash, \PDO::PARAM_STR)));
678
679 9
        $statement = $queryBuilder->execute();
680
681 9
        while ($row = $statement->fetch()) {
682 7
            $rows[] = $row['qid'];
683
        }
684
685 9
        return $rows;
686
    }
687
688
    /**
689
     * This internal helper method is used to create an instance of an entry object
690
     *
691
     * @param Process $process
692
     * @param string $orderByField first matching item will be returned as object
693
     * @param string $orderBySorting sorting direction
694
     */
695 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
696
    {
697 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
698
        $first = $queryBuilder
699 5
            ->select('*')
700 5
            ->from($this->tableName)
701 5
            ->where(
702 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
703 5
                $queryBuilder->expr()->gt('exec_time', 0)
704
            )
705 5
            ->setMaxResults(1)
706 5
            ->addOrderBy($orderByField, $orderBySorting)
707 5
            ->execute()->fetch(0);
708
709 5
        return $first ?: [];
710
    }
711
}
712