Passed
Push — github-actions ( 1bf7ac...aaecb2 )
by Tomas Norre
14:09
created

QueueRepository   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 664
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 333
c 1
b 0
f 0
dl 0
loc 664
rs 5.5199
wmc 56

31 Methods

Rating   Name   Duplication   Size   Complexity  
A getLastProcessedEntriesTimestamps() 0 16 2
A countNonExecutedItemsByProcess() 0 13 1
A getSetIdWithUnprocessedEntries() 0 19 2
A getTotalQueueEntriesByConfiguration() 0 22 3
A countAllPendingItems() 0 14 1
A getPerformanceData() 0 21 2
A unsetQueueProcessId() 0 10 1
A countAllUnassignedPendingItems() 0 14 1
A countAllByProcessId() 0 12 1
A getDuplicateQueueItemsIfExists() 0 47 5
A fetchRecordsToBeCrawled() 0 23 1
A findByQueueId() 0 12 2
A countPendingItemsGroupedByConfigurationKey() 0 15 1
A getUnprocessedItems() 0 11 1
A countAllAssignedPendingItems() 0 14 1
A cleanupQueue() 0 18 3
A cleanUpOldQueueEntries() 0 20 2
A findOldestEntryForProcess() 0 3 1
A noUnprocessedQueueEntriesForPageWithConfigurationHashExist() 0 21 2
A isPageInQueueTimed() 0 3 1
A unsetProcessScheduledAndProcessIdForQueueEntries() 0 12 1
A getLastProcessedEntries() 0 16 2
A flushQueue() 0 20 4
A countUnprocessedItems() 0 3 1
A findYoungestEntryForProcess() 0 3 1
A getAvailableSets() 0 17 2
B isPageInQueue() 0 40 6
A updateProcessIdAndSchedulerForQueueIds() 0 11 1
A getFirstOrLastObjectByProcess() 0 15 2
A __construct() 0 6 1
A countExecutedItemsByProcess() 0 13 1

How to fix   Complexity   

Complex Class

Complex classes like QueueRepository often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueRepository, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use AOE\Crawler\Value\QueueFilter;
34
use Psr\Log\LoggerAwareInterface;
35
use TYPO3\CMS\Core\Database\Connection;
36
use TYPO3\CMS\Core\Database\ConnectionPool;
37
use TYPO3\CMS\Core\Utility\GeneralUtility;
38
use TYPO3\CMS\Extbase\Object\ObjectManager;
39
use TYPO3\CMS\Extbase\Persistence\Repository;
40
41
/**
42
 * Class QueueRepository
43
 *
44
 * @package AOE\Crawler\Domain\Repository
45
 */
46
class QueueRepository extends Repository implements LoggerAwareInterface
47
{
48
    use \Psr\Log\LoggerAwareTrait;
49
50
    /**
51
     * @var string
52
     */
53
    protected $tableName = 'tx_crawler_queue';
54
55
    /**
56
     * @var array
57
     */
58
    protected $extensionSettings;
59
60
    public function __construct()
61
    {
62
        $objectManager = GeneralUtility::makeInstance(ObjectManager::class);
63
        $this->extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
64
65
        parent::__construct($objectManager);
66
    }
67
68
    public function unsetQueueProcessId(string $processId): void
69
    {
70
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
71
        $queryBuilder
72
            ->update($this->tableName)
73
            ->where(
74
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
75
            )
76
            ->set('process_id', '')
77
            ->execute();
78
    }
79
80
    /**
81
     * This method is used to find the youngest entry for a given process.
82
     */
83
    public function findYoungestEntryForProcess(Process $process): array
84
    {
85
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
86
    }
87
88
    /**
89
     * This method is used to find the oldest entry for a given process.
90
     */
91
    public function findOldestEntryForProcess(Process $process): array
92
    {
93
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
94
    }
95
96
    /**
97
     * Counts all executed items of a process.
98
     *
99
     * @param Process $process
100
     */
101
    public function countExecutedItemsByProcess($process): int
102
    {
103
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
104
105
        return $queryBuilder
106
            ->count('*')
107
            ->from($this->tableName)
108
            ->where(
109
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
110
                $queryBuilder->expr()->gt('exec_time', 0)
111
            )
112
            ->execute()
113
            ->fetchColumn(0);
114
    }
115
116
    /**
117
     * Counts items of a process which yet have not been processed/executed
118
     *
119
     * @param Process $process
120
     */
121
    public function countNonExecutedItemsByProcess($process): int
122
    {
123
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
124
125
        return $queryBuilder
126
            ->count('*')
127
            ->from($this->tableName)
128
            ->where(
129
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
130
                $queryBuilder->expr()->eq('exec_time', 0)
131
            )
132
            ->execute()
133
            ->fetchColumn(0);
134
    }
135
136
    /**
137
     * get items which have not been processed yet
138
     */
139
    public function getUnprocessedItems(): array
140
    {
141
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
142
143
        return $queryBuilder
144
            ->select('*')
145
            ->from($this->tableName)
146
            ->where(
147
                $queryBuilder->expr()->eq('exec_time', 0)
148
            )
149
            ->execute()->fetchAll();
150
    }
151
152
    /**
153
     * Count items which have not been processed yet
154
     */
155
    public function countUnprocessedItems(): int
156
    {
157
        return count($this->getUnprocessedItems());
158
    }
159
160
    /**
161
     * This method can be used to count all queue entrys which are
162
     * scheduled for now or a earlier date.
163
     */
164
    public function countAllPendingItems(): int
165
    {
166
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
167
168
        return $queryBuilder
169
            ->count('*')
170
            ->from($this->tableName)
171
            ->where(
172
                $queryBuilder->expr()->eq('process_scheduled', 0),
173
                $queryBuilder->expr()->eq('exec_time', 0),
174
                $queryBuilder->expr()->lte('scheduled', time())
175
            )
176
            ->execute()
177
            ->fetchColumn(0);
178
    }
179
180
    /**
181
     * This method can be used to count all queue entries which are
182
     * scheduled for now or a earlier date and are assigned to a process.
183
     */
184
    public function countAllAssignedPendingItems(): int
185
    {
186
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
187
188
        return $queryBuilder
189
            ->count('*')
190
            ->from($this->tableName)
191
            ->where(
192
                $queryBuilder->expr()->neq('process_id', '""'),
193
                $queryBuilder->expr()->eq('exec_time', 0),
194
                $queryBuilder->expr()->lte('scheduled', time())
195
            )
196
            ->execute()
197
            ->fetchColumn(0);
198
    }
199
200
    /**
201
     * This method can be used to count all queue entrys which are
202
     * scheduled for now or a earlier date and are not assigned to a process.
203
     */
204
    public function countAllUnassignedPendingItems(): int
205
    {
206
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
207
208
        return $queryBuilder
209
            ->count('*')
210
            ->from($this->tableName)
211
            ->where(
212
                $queryBuilder->expr()->eq('process_id', '""'),
213
                $queryBuilder->expr()->eq('exec_time', 0),
214
                $queryBuilder->expr()->lte('scheduled', time())
215
            )
216
            ->execute()
217
            ->fetchColumn(0);
218
    }
219
220
    /**
221
     * Count pending queue entries grouped by configuration key
222
     */
223
    public function countPendingItemsGroupedByConfigurationKey(): array
224
    {
225
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
226
        $statement = $queryBuilder
227
            ->from($this->tableName)
228
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
229
            ->addSelect('configuration')
230
            ->where(
231
                $queryBuilder->expr()->eq('exec_time', 0),
232
                $queryBuilder->expr()->lt('scheduled', time())
233
            )
234
            ->groupBy('configuration')
235
            ->execute();
236
237
        return $statement->fetchAll();
238
    }
239
240
    /**
241
     * Get set id with unprocessed entries
242
     *
243
     * @return array array of set ids
244
     */
245
    public function getSetIdWithUnprocessedEntries(): array
246
    {
247
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
248
        $statement = $queryBuilder
249
            ->select('set_id')
250
            ->from($this->tableName)
251
            ->where(
252
                $queryBuilder->expr()->lt('scheduled', time()),
253
                $queryBuilder->expr()->eq('exec_time', 0)
254
            )
255
            ->addGroupBy('set_id')
256
            ->execute();
257
258
        $setIds = [];
259
        while ($row = $statement->fetch()) {
260
            $setIds[] = intval($row['set_id']);
261
        }
262
263
        return $setIds;
264
    }
265
266
    /**
267
     * Get total queue entries by configuration
268
     *
269
     * @return array totals by configuration (keys)
270
     */
271
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
272
    {
273
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
274
        $totals = [];
275
        if (count($setIds) > 0) {
276
            $statement = $queryBuilder
277
                ->from($this->tableName)
278
                ->selectLiteral('count(*) as c')
279
                ->addSelect('configuration')
280
                ->where(
281
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
282
                    $queryBuilder->expr()->lt('scheduled', time())
283
                )
284
                ->groupBy('configuration')
285
                ->execute();
286
287
            while ($row = $statement->fetch()) {
288
                $totals[$row['configuration']] = $row['c'];
289
            }
290
        }
291
292
        return $totals;
293
    }
294
295
    /**
296
     * Get the timestamps of the last processed entries
297
     *
298
     * @param int $limit
299
     */
300
    public function getLastProcessedEntriesTimestamps($limit = 100): array
301
    {
302
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
303
        $statement = $queryBuilder
304
            ->select('exec_time')
305
            ->from($this->tableName)
306
            ->addOrderBy('exec_time', 'desc')
307
            ->setMaxResults($limit)
308
            ->execute();
309
310
        $rows = [];
311
        while ($row = $statement->fetch()) {
312
            $rows[] = $row['exec_time'];
313
        }
314
315
        return $rows;
316
    }
317
318
    /**
319
     * Get the last processed entries
320
     *
321
     * @param int $limit
322
     */
323
    public function getLastProcessedEntries($limit = 100): array
324
    {
325
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
326
        $statement = $queryBuilder
327
            ->from($this->tableName)
328
            ->select('*')
329
            ->orderBy('exec_time', 'desc')
330
            ->setMaxResults($limit)
331
            ->execute();
332
333
        $rows = [];
334
        while (($row = $statement->fetch()) !== false) {
335
            $rows[] = $row;
336
        }
337
338
        return $rows;
339
    }
340
341
    /**
342
     * Get performance statistics data
343
     *
344
     * @param int $start timestamp
345
     * @param int $end timestamp
346
     *
347
     * @return array performance data
348
     */
349
    public function getPerformanceData($start, $end): array
350
    {
351
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
352
        $statement = $queryBuilder
353
            ->from($this->tableName)
354
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
355
            ->addSelect('process_id_completed')
356
            ->where(
357
                $queryBuilder->expr()->neq('exec_time', 0),
358
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
359
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
360
            )
361
            ->groupBy('process_id_completed')
362
            ->execute();
363
364
        $rows = [];
365
        while ($row = $statement->fetch()) {
366
            $rows[$row['process_id_completed']] = $row;
367
        }
368
369
        return $rows;
370
    }
371
372
    /**
373
     * Determines if a page is queued
374
     */
375
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
376
    {
377
        $isPageInQueue = false;
378
379
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
380
        $statement = $queryBuilder
381
            ->from($this->tableName)
382
            ->count('*')
383
            ->where(
384
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
385
            );
386
387
        if ($unprocessed_only !== false) {
388
            $statement->andWhere(
389
                $queryBuilder->expr()->eq('exec_time', 0)
390
            );
391
        }
392
393
        if ($timed_only !== false) {
394
            $statement->andWhere(
395
                $queryBuilder->expr()->neq('scheduled', 0)
396
            );
397
        }
398
399
        if ($timestamp) {
400
            $statement->andWhere(
401
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
402
            );
403
        }
404
405
        // TODO: Currently it's not working if page doesn't exists. See tests
406
        $count = $statement
407
            ->execute()
408
            ->fetchColumn(0);
409
410
        if ($count !== false && $count > 0) {
411
            $isPageInQueue = true;
412
        }
413
414
        return $isPageInQueue;
415
    }
416
417
    /**
418
     * Method to check if a page is in the queue which is timed for a
419
     * date when it should be crawled
420
     */
421
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
422
    {
423
        return $this->isPageInQueue($uid, $show_unprocessed);
424
    }
425
426
    public function getAvailableSets(): array
427
    {
428
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
429
        $statement = $queryBuilder
430
            ->selectLiteral('count(*) as count_value')
431
            ->addSelect('set_id', 'scheduled')
432
            ->from($this->tableName)
433
            ->orderBy('scheduled', 'desc')
434
            ->groupBy('set_id', 'scheduled')
435
            ->execute();
436
437
        $rows = [];
438
        while ($row = $statement->fetch()) {
439
            $rows[] = $row;
440
        }
441
442
        return $rows;
443
    }
444
445
    public function findByQueueId(string $queueId): ?array
446
    {
447
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
448
        $queueRec = $queryBuilder
449
            ->select('*')
450
            ->from($this->tableName)
451
            ->where(
452
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
453
            )
454
            ->execute()
455
            ->fetch();
456
        return is_array($queueRec) ? $queueRec : null;
457
    }
458
459
    public function cleanupQueue(): void
460
    {
461
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
462
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
463
464
        if ($purgeDays > 0) {
465
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
466
467
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
468
            $del = $queryBuilderDelete
469
                ->delete($this->tableName)
470
                ->where(
471
                    'exec_time != 0 AND exec_time < ' . $purgeDate
472
                )->execute();
473
474
            if ($del === false) {
475
                $this->logger->info(
476
                    'Records could not be deleted.'
477
                );
478
            }
479
        }
480
    }
481
482
    /**
483
     * Cleans up entries that stayed for too long in the queue. These are default:
484
     * - processed entries that are over 1.5 days in age
485
     * - scheduled entries that are over 7 days old
486
     */
487
    public function cleanUpOldQueueEntries(): void
488
    {
489
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
490
        // 24*60*60 Seconds in 24 hours
491
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400;
492
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
493
494
        $now = time();
495
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
496
497
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
498
        $del = $queryBuilder
499
            ->delete($this->tableName)
500
            ->where(
501
                $condition
502
            )->execute();
503
504
        if ($del === false) {
505
            $this->logger->info(
506
                'Records could not be deleted.'
507
            );
508
        }
509
    }
510
511
    public function fetchRecordsToBeCrawled(int $countInARun)
512
    {
513
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
514
        return $queryBuilderSelect
515
            ->select('qid', 'scheduled', 'page_id', 'sitemap_priority')
516
            ->from($this->tableName)
517
            ->leftJoin(
518
                $this->tableName,
519
                'pages',
520
                'p',
521
                $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier($this->tableName . '.page_id'))
522
            )
523
            ->where(
524
                $queryBuilderSelect->expr()->eq('exec_time', 0),
525
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
526
                $queryBuilderSelect->expr()->lte('scheduled', time())
527
            )
528
            ->orderBy('sitemap_priority', 'DESC')
529
            ->addOrderBy('scheduled')
530
            ->addOrderBy('qid')
531
            ->setMaxResults($countInARun)
532
            ->execute()
533
            ->fetchAll();
534
    }
535
536
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
537
    {
538
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
539
        return $queryBuilderUpdate
540
            ->update($this->tableName)
541
            ->where(
542
                $queryBuilderUpdate->expr()->in('qid', $quidList)
543
            )
544
            ->set('process_scheduled', time())
545
            ->set('process_id', $processId)
546
            ->execute();
547
    }
548
549
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
550
    {
551
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
552
        $queryBuilder
553
            ->update($this->tableName)
554
            ->where(
555
                $queryBuilder->expr()->eq('exec_time', 0),
556
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
557
            )
558
            ->set('process_scheduled', 0)
559
            ->set('process_id', '')
560
            ->execute();
561
    }
562
563
    /**
564
     * This method is used to count if there are ANY unprocessed queue entries
565
     * of a given page_id and the configuration which matches a given hash.
566
     * If there if none, we can skip an inner detail check
567
     *
568
     * @param int $uid
569
     * @param string $configurationHash
570
     * @return boolean
571
     */
572
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist($uid, $configurationHash): bool
573
    {
574
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
575
        $noUnprocessedQueueEntriesFound = true;
576
577
        $result = $queryBuilder
578
            ->count('*')
579
            ->from($this->tableName)
580
            ->where(
581
                $queryBuilder->expr()->eq('page_id', (int) $uid),
582
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
583
                $queryBuilder->expr()->eq('exec_time', 0)
584
            )
585
            ->execute()
586
            ->fetchColumn();
587
588
        if ($result) {
589
            $noUnprocessedQueueEntriesFound = false;
590
        }
591
592
        return $noUnprocessedQueueEntriesFound;
593
    }
594
595
    /**
596
     * Removes queue entries
597
     */
598
    public function flushQueue(QueueFilter $queueFilter): void
599
    {
600
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
601
602
        switch ($queueFilter) {
603
            case 'all':
604
                // No where claus needed delete everything
605
                break;
606
            case 'pending':
607
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
608
                break;
609
            case 'finished':
610
            default:
611
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
612
                break;
613
        }
614
615
        $queryBuilder
616
            ->delete($this->tableName)
617
            ->execute();
618
    }
619
620
    /**
621
     * @param string $processId
622
     *
623
     * @return bool|string
624
     */
625
    public function countAllByProcessId($processId)
626
    {
627
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
628
629
        return $queryBuilder
630
            ->count('*')
631
            ->from($this->tableName)
632
            ->where(
633
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId, \PDO::PARAM_STR))
634
            )
635
            ->execute()
636
            ->fetchColumn(0);
637
    }
638
639
    public function getDuplicateQueueItemsIfExists(bool $enableTimeslot, int $timestamp, int $currentTime, int $pageId, string $parametersHash): array
640
    {
641
        $rows = [];
642
643
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
644
        $queryBuilder
645
            ->select('qid')
646
            ->from('tx_crawler_queue');
647
        //if this entry is scheduled with "now"
648
        if ($timestamp <= $currentTime) {
649
            if ($enableTimeslot) {
650
                $timeBegin = $currentTime - 100;
651
                $timeEnd = $currentTime + 100;
652
                $queryBuilder
653
                    ->where(
654
                        'scheduled BETWEEN ' . $timeBegin . ' AND ' . $timeEnd . ''
655
                    )
656
                    ->orWhere(
657
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
658
                    );
659
            } else {
660
                $queryBuilder
661
                    ->where(
662
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
663
                    );
664
            }
665
        } elseif ($timestamp > $currentTime) {
666
            //entry with a timestamp in the future need to have the same schedule time
667
            $queryBuilder
668
                ->where(
669
                    $queryBuilder->expr()->eq('scheduled', $timestamp)
670
                );
671
        }
672
673
        $queryBuilder
674
            ->andWhere('NOT exec_time')
675
            ->andWhere('NOT process_id')
676
            ->andWhere($queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($pageId, \PDO::PARAM_INT)))
677
            ->andWhere($queryBuilder->expr()->eq('parameters_hash', $queryBuilder->createNamedParameter($parametersHash, \PDO::PARAM_STR)));
678
679
        $statement = $queryBuilder->execute();
680
681
        while ($row = $statement->fetch()) {
682
            $rows[] = $row['qid'];
683
        }
684
685
        return $rows;
686
    }
687
688
    /**
689
     * This internal helper method is used to create an instance of an entry object
690
     *
691
     * @param Process $process
692
     * @param string $orderByField first matching item will be returned as object
693
     * @param string $orderBySorting sorting direction
694
     */
695
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
696
    {
697
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
698
        $first = $queryBuilder
699
            ->select('*')
700
            ->from($this->tableName)
701
            ->where(
702
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
703
                $queryBuilder->expr()->gt('exec_time', 0)
704
            )
705
            ->setMaxResults(1)
706
            ->addOrderBy($orderByField, $orderBySorting)
707
            ->execute()->fetch(0);
708
709
        return $first ?: [];
710
    }
711
}
712