Passed
Push — wip/remove-deprecations-for-v1... ( f97f5b )
by Tomas Norre
05:15
created

QueueRepository   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 673
Duplicated Lines 0 %

Test Coverage

Coverage 70.69%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 346
c 1
b 0
f 0
dl 0
loc 673
ccs 340
cts 481
cp 0.7069
rs 4.5599
wmc 58

30 Methods

Rating   Name   Duplication   Size   Complexity  
A findOldestEntryForProcess() 0 3 1
A noUnprocessedQueueEntriesForPageWithConfigurationHashExist() 0 21 2
A unsetProcessScheduledAndProcessIdForQueueEntries() 0 12 1
A getLastProcessedEntriesTimestamps() 0 16 2
A getSetIdWithUnprocessedEntries() 0 19 2
A getTotalQueueEntriesByConfiguration() 0 22 3
A countAllPendingItems() 0 14 1
A getPerformanceData() 0 21 2
A countAllUnassignedPendingItems() 0 14 1
A countPendingItemsGroupedByConfigurationKey() 0 15 1
A countAllAssignedPendingItems() 0 14 1
A getLastProcessedEntries() 0 16 2
A countNonExecutedItemsByProcess() 0 13 1
A getQueueEntriesForPageId() 0 33 4
A unsetQueueProcessId() 0 10 1
A getDuplicateQueueItemsIfExists() 0 47 5
A fetchRecordsToBeCrawled() 0 23 1
A findByQueueId() 0 12 2
A getUnprocessedItems() 0 11 1
A cleanupQueue() 0 18 3
A cleanUpOldQueueEntries() 0 20 2
A isPageInQueueTimed() 0 3 1
A flushQueue() 0 20 4
A findYoungestEntryForProcess() 0 3 1
A getAvailableSets() 0 17 2
B isPageInQueue() 0 40 6
A updateProcessIdAndSchedulerForQueueIds() 0 11 1
A getFirstOrLastObjectByProcess() 0 15 2
A __construct() 0 6 1
A countExecutedItemsByProcess() 0 13 1

How to fix   Complexity   

Complex Class

Complex classes like QueueRepository often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueRepository, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use AOE\Crawler\Value\QueueFilter;
34
use PDO;
35
use Psr\Log\LoggerAwareInterface;
36
use Psr\Log\LoggerAwareTrait;
37
use TYPO3\CMS\Core\Database\Connection;
38
use TYPO3\CMS\Core\Database\ConnectionPool;
39
use TYPO3\CMS\Core\Utility\GeneralUtility;
40
use TYPO3\CMS\Extbase\Object\ObjectManager;
41
use TYPO3\CMS\Extbase\Persistence\Repository;
42
43
class QueueRepository extends Repository implements LoggerAwareInterface
44
{
45
    use LoggerAwareTrait;
46
47
    /**
48
     * @var string
49
     */
50
    protected $tableName = 'tx_crawler_queue';
51
52
    /**
53
     * @var array
54
     */
55
    protected $extensionSettings;
56
57 86
    public function __construct()
58
    {
59 86
        $objectManager = GeneralUtility::makeInstance(ObjectManager::class);
60 86
        $this->extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
61
62 86
        parent::__construct($objectManager);
63 86
    }
64
65
    // TODO: Should be a property on the QueueObject
66 3
    public function unsetQueueProcessId(string $processId): void
67
    {
68 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
69
        $queryBuilder
70 3
            ->update($this->tableName)
71 3
            ->where(
72 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
73
            )
74 3
            ->set('process_id', '')
75 3
            ->execute();
76 3
    }
77
78
    /**
79
     * This method is used to find the youngest entry for a given process.
80
     */
81 1
    public function findYoungestEntryForProcess(Process $process): array
82
    {
83 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
84
    }
85
86
    /**
87
     * This method is used to find the oldest entry for a given process.
88
     */
89 1
    public function findOldestEntryForProcess(Process $process): array
90
    {
91 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
92
    }
93
94
    /**
95
     * Counts all executed items of a process.
96
     *
97
     * @param Process $process
98
     */
99 1
    public function countExecutedItemsByProcess($process): int
100
    {
101 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
102
103
        return $queryBuilder
104 1
            ->count('*')
105 1
            ->from($this->tableName)
106 1
            ->where(
107 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
108 1
                $queryBuilder->expr()->gt('exec_time', 0)
109
            )
110 1
            ->execute()
111 1
            ->fetchColumn(0);
112
    }
113
114
    /**
115
     * Counts items of a process which yet have not been processed/executed
116
     *
117
     * @param Process $process
118
     */
119 1
    public function countNonExecutedItemsByProcess($process): int
120
    {
121 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
122
123
        return $queryBuilder
124 1
            ->count('*')
125 1
            ->from($this->tableName)
126 1
            ->where(
127 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
128 1
                $queryBuilder->expr()->eq('exec_time', 0)
129
            )
130 1
            ->execute()
131 1
            ->fetchColumn(0);
132
    }
133
134
    /**
135
     * get items which have not been processed yet
136
     */
137 6
    public function getUnprocessedItems(): array
138
    {
139 6
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
140
141
        return $queryBuilder
142 6
            ->select('*')
143 6
            ->from($this->tableName)
144 6
            ->where(
145 6
                $queryBuilder->expr()->eq('exec_time', 0)
146
            )
147 6
            ->execute()->fetchAll();
148
    }
149
150
    /**
151
     * This method can be used to count all queue entrys which are
152
     * scheduled for now or a earlier date.
153
     */
154 1
    public function countAllPendingItems(): int
155
    {
156 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
157
158
        return $queryBuilder
159 1
            ->count('*')
160 1
            ->from($this->tableName)
161 1
            ->where(
162 1
                $queryBuilder->expr()->eq('process_scheduled', 0),
163 1
                $queryBuilder->expr()->eq('exec_time', 0),
164 1
                $queryBuilder->expr()->lte('scheduled', time())
165
            )
166 1
            ->execute()
167 1
            ->fetchColumn(0);
168
    }
169
170
    /**
171
     * This method can be used to count all queue entries which are
172
     * scheduled for now or a earlier date and are assigned to a process.
173
     */
174 1
    public function countAllAssignedPendingItems(): int
175
    {
176 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
177
178
        return $queryBuilder
179 1
            ->count('*')
180 1
            ->from($this->tableName)
181 1
            ->where(
182 1
                $queryBuilder->expr()->neq('process_id', '""'),
183 1
                $queryBuilder->expr()->eq('exec_time', 0),
184 1
                $queryBuilder->expr()->lte('scheduled', time())
185
            )
186 1
            ->execute()
187 1
            ->fetchColumn(0);
188
    }
189
190
    /**
191
     * This method can be used to count all queue entrys which are
192
     * scheduled for now or a earlier date and are not assigned to a process.
193
     */
194 2
    public function countAllUnassignedPendingItems(): int
195
    {
196 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
197
198
        return $queryBuilder
199 2
            ->count('*')
200 2
            ->from($this->tableName)
201 2
            ->where(
202 2
                $queryBuilder->expr()->eq('process_id', '""'),
203 2
                $queryBuilder->expr()->eq('exec_time', 0),
204 2
                $queryBuilder->expr()->lte('scheduled', time())
205
            )
206 2
            ->execute()
207 2
            ->fetchColumn(0);
208
    }
209
210
    /**
211
     * Count pending queue entries grouped by configuration key
212
     */
213 1
    public function countPendingItemsGroupedByConfigurationKey(): array
214
    {
215 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
216
        $statement = $queryBuilder
217 1
            ->from($this->tableName)
218 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
219 1
            ->addSelect('configuration')
220 1
            ->where(
221 1
                $queryBuilder->expr()->eq('exec_time', 0),
222 1
                $queryBuilder->expr()->lt('scheduled', time())
223
            )
224 1
            ->groupBy('configuration')
225 1
            ->execute();
226
227 1
        return $statement->fetchAll();
228
    }
229
230
    /**
231
     * Get set id with unprocessed entries
232
     *
233
     * @return array array of set ids
234
     */
235 1
    public function getSetIdWithUnprocessedEntries(): array
236
    {
237 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
238
        $statement = $queryBuilder
239 1
            ->select('set_id')
240 1
            ->from($this->tableName)
241 1
            ->where(
242 1
                $queryBuilder->expr()->lt('scheduled', time()),
243 1
                $queryBuilder->expr()->eq('exec_time', 0)
244
            )
245 1
            ->addGroupBy('set_id')
246 1
            ->execute();
247
248 1
        $setIds = [];
249 1
        while ($row = $statement->fetch()) {
250 1
            $setIds[] = intval($row['set_id']);
251
        }
252
253 1
        return $setIds;
254
    }
255
256
    /**
257
     * Get total queue entries by configuration
258
     *
259
     * @return array totals by configuration (keys)
260
     */
261 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
262
    {
263 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
264 1
        $totals = [];
265 1
        if (! empty($setIds)) {
266
            $statement = $queryBuilder
267 1
                ->from($this->tableName)
268 1
                ->selectLiteral('count(*) as c')
269 1
                ->addSelect('configuration')
270 1
                ->where(
271 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
272 1
                    $queryBuilder->expr()->lt('scheduled', time())
273
                )
274 1
                ->groupBy('configuration')
275 1
                ->execute();
276
277 1
            while ($row = $statement->fetch()) {
278 1
                $totals[$row['configuration']] = $row['c'];
279
            }
280
        }
281
282 1
        return $totals;
283
    }
284
285
    /**
286
     * Get the timestamps of the last processed entries
287
     *
288
     * @param int $limit
289
     */
290 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
291
    {
292 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
293
        $statement = $queryBuilder
294 1
            ->select('exec_time')
295 1
            ->from($this->tableName)
296 1
            ->addOrderBy('exec_time', 'desc')
297 1
            ->setMaxResults($limit)
298 1
            ->execute();
299
300 1
        $rows = [];
301 1
        while ($row = $statement->fetch()) {
302 1
            $rows[] = $row['exec_time'];
303
        }
304
305 1
        return $rows;
306
    }
307
308
    /**
309
     * Get the last processed entries
310
     *
311
     * @param int $limit
312
     */
313 1
    public function getLastProcessedEntries($limit = 100): array
314
    {
315 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
316
        $statement = $queryBuilder
317 1
            ->from($this->tableName)
318 1
            ->select('*')
319 1
            ->orderBy('exec_time', 'desc')
320 1
            ->setMaxResults($limit)
321 1
            ->execute();
322
323 1
        $rows = [];
324 1
        while (($row = $statement->fetch()) !== false) {
325 1
            $rows[] = $row;
326
        }
327
328 1
        return $rows;
329
    }
330
331
    /**
332
     * Get performance statistics data
333
     *
334
     * @param int $start timestamp
335
     * @param int $end timestamp
336
     *
337
     * @return array performance data
338
     */
339 1
    public function getPerformanceData($start, $end): array
340
    {
341 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
342
        $statement = $queryBuilder
343 1
            ->from($this->tableName)
344 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
345 1
            ->addSelect('process_id_completed')
346 1
            ->where(
347 1
                $queryBuilder->expr()->neq('exec_time', 0),
348 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, PDO::PARAM_INT)),
349 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, PDO::PARAM_INT))
350
            )
351 1
            ->groupBy('process_id_completed')
352 1
            ->execute();
353
354 1
        $rows = [];
355 1
        while ($row = $statement->fetch()) {
356 1
            $rows[$row['process_id_completed']] = $row;
357
        }
358
359 1
        return $rows;
360
    }
361
362
    /**
363
     * Determines if a page is queued
364
     */
365 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
366
    {
367 5
        $isPageInQueue = false;
368
369 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
370
        $statement = $queryBuilder
371 5
            ->from($this->tableName)
372 5
            ->count('*')
373 5
            ->where(
374 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, PDO::PARAM_INT))
375
            );
376
377 5
        if ($unprocessed_only !== false) {
378 2
            $statement->andWhere(
379 2
                $queryBuilder->expr()->eq('exec_time', 0)
380
            );
381
        }
382
383 5
        if ($timed_only !== false) {
384 1
            $statement->andWhere(
385 1
                $queryBuilder->expr()->neq('scheduled', 0)
386
            );
387
        }
388
389 5
        if ($timestamp) {
390 1
            $statement->andWhere(
391 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, PDO::PARAM_INT))
392
            );
393
        }
394
395
        // TODO: Currently it's not working if page doesn't exists. See tests
396
        $count = $statement
397 5
            ->execute()
398 5
            ->fetchColumn(0);
399
400 5
        if ($count !== false && $count > 0) {
401 4
            $isPageInQueue = true;
402
        }
403
404 5
        return $isPageInQueue;
405
    }
406
407
    /**
408
     * Method to check if a page is in the queue which is timed for a
409
     * date when it should be crawled
410
     */
411 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
412
    {
413 1
        return $this->isPageInQueue($uid, $show_unprocessed);
414
    }
415
416 1
    public function getAvailableSets(): array
417
    {
418 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
419
        $statement = $queryBuilder
420 1
            ->selectLiteral('count(*) as count_value')
421 1
            ->addSelect('set_id', 'scheduled')
422 1
            ->from($this->tableName)
423 1
            ->orderBy('scheduled', 'desc')
424 1
            ->groupBy('set_id', 'scheduled')
425 1
            ->execute();
426
427 1
        $rows = [];
428 1
        while ($row = $statement->fetch()) {
429 1
            $rows[] = $row;
430
        }
431
432 1
        return $rows;
433
    }
434
435 1
    public function findByQueueId(string $queueId): ?array
436
    {
437 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
438
        $queueRec = $queryBuilder
439 1
            ->select('*')
440 1
            ->from($this->tableName)
441 1
            ->where(
442 1
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
443
            )
444 1
            ->execute()
445 1
            ->fetch();
446 1
        return is_array($queueRec) ? $queueRec : null;
447
    }
448
449 3
    public function cleanupQueue(): void
450
    {
451 3
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
452 3
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
453
454 3
        if ($purgeDays > 0) {
455 3
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
456
457 3
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
458
            $del = $queryBuilderDelete
459 3
                ->delete($this->tableName)
460 3
                ->where(
461 3
                    'exec_time != 0 AND exec_time < ' . $purgeDate
462 3
                )->execute();
463
464 3
            if ($del === false) {
465
                $this->logger->info(
466
                    'Records could not be deleted.'
467
                );
468
            }
469
        }
470 3
    }
471
472
    /**
473
     * Cleans up entries that stayed for too long in the queue. These are default:
474
     * - processed entries that are over 1.5 days in age
475
     * - scheduled entries that are over 7 days old
476
     */
477 1
    public function cleanUpOldQueueEntries(): void
478
    {
479 1
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
480
        // 24*60*60 Seconds in 24 hours
481 1
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400;
482 1
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
483
484 1
        $now = time();
485 1
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
486
487 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
488
        $del = $queryBuilder
489 1
            ->delete($this->tableName)
490 1
            ->where(
491 1
                $condition
492 1
            )->execute();
493
494 1
        if ($del === false) {
495
            $this->logger->info(
496
                'Records could not be deleted.'
497
            );
498
        }
499 1
    }
500
501 4
    public function fetchRecordsToBeCrawled(int $countInARun)
502
    {
503 4
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
504
        return $queryBuilderSelect
505 4
            ->select('qid', 'scheduled', 'page_id', 'sitemap_priority')
506 4
            ->from($this->tableName)
507 4
            ->leftJoin(
508 4
                $this->tableName,
509 4
                'pages',
510 4
                'p',
511 4
                $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier($this->tableName . '.page_id'))
512
            )
513 4
            ->where(
514 4
                $queryBuilderSelect->expr()->eq('exec_time', 0),
515 4
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
516 4
                $queryBuilderSelect->expr()->lte('scheduled', time())
517
            )
518 4
            ->orderBy('sitemap_priority', 'DESC')
519 4
            ->addOrderBy('scheduled')
520 4
            ->addOrderBy('qid')
521 4
            ->setMaxResults($countInARun)
522 4
            ->execute()
523 4
            ->fetchAll();
524
    }
525
526 3
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
527
    {
528 3
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
529
        return $queryBuilderUpdate
530 3
            ->update($this->tableName)
531 3
            ->where(
532 3
                $queryBuilderUpdate->expr()->in('qid', $quidList)
533
            )
534 3
            ->set('process_scheduled', time())
535 3
            ->set('process_id', $processId)
536 3
            ->execute();
537
    }
538
539 3
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
540
    {
541 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
542
        $queryBuilder
543 3
            ->update($this->tableName)
544 3
            ->where(
545 3
                $queryBuilder->expr()->eq('exec_time', 0),
546 3
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
547
            )
548 3
            ->set('process_scheduled', 0)
549 3
            ->set('process_id', '')
550 3
            ->execute();
551 3
    }
552
553
    /**
554
     * This method is used to count if there are ANY unprocessed queue entries
555
     * of a given page_id and the configuration which matches a given hash.
556
     * If there if none, we can skip an inner detail check
557
     *
558
     * @param int $uid
559
     * @param string $configurationHash
560
     * @return boolean
561
     */
562 3
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist($uid, $configurationHash): bool
563
    {
564 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
565 3
        $noUnprocessedQueueEntriesFound = true;
566
567
        $result = $queryBuilder
568 3
            ->count('*')
569 3
            ->from($this->tableName)
570 3
            ->where(
571 3
                $queryBuilder->expr()->eq('page_id', (int) $uid),
572 3
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
573 3
                $queryBuilder->expr()->eq('exec_time', 0)
574
            )
575 3
            ->execute()
576 3
            ->fetchColumn();
577
578 3
        if ($result) {
579 3
            $noUnprocessedQueueEntriesFound = false;
580
        }
581
582 3
        return $noUnprocessedQueueEntriesFound;
583
    }
584
585
    /**
586
     * Removes queue entries
587
     */
588 6
    public function flushQueue(QueueFilter $queueFilter): void
589
    {
590 6
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
591
592 6
        switch ($queueFilter) {
593 6
            case 'all':
594
                // No where claus needed delete everything
595 2
                break;
596 4
            case 'pending':
597 2
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
598 2
                break;
599 2
            case 'finished':
600
            default:
601 2
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
602 2
                break;
603
        }
604
605
        $queryBuilder
606 6
            ->delete($this->tableName)
607 6
            ->execute();
608 6
    }
609
610 8
    public function getDuplicateQueueItemsIfExists(bool $enableTimeslot, int $timestamp, int $currentTime, int $pageId, string $parametersHash): array
611
    {
612 8
        $rows = [];
613
614 8
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
615
        $queryBuilder
616 8
            ->select('qid')
617 8
            ->from('tx_crawler_queue');
618
        //if this entry is scheduled with "now"
619 8
        if ($timestamp <= $currentTime) {
620 3
            if ($enableTimeslot) {
621 2
                $timeBegin = $currentTime - 100;
622 2
                $timeEnd = $currentTime + 100;
623
                $queryBuilder
624 2
                    ->where(
625 2
                        'scheduled BETWEEN ' . $timeBegin . ' AND ' . $timeEnd . ''
626
                    )
627 2
                    ->orWhere(
628 2
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
629
                    );
630
            } else {
631
                $queryBuilder
632 1
                    ->where(
633 3
                        $queryBuilder->expr()->lte('scheduled', $currentTime)
634
                    );
635
            }
636 5
        } elseif ($timestamp > $currentTime) {
637
            //entry with a timestamp in the future need to have the same schedule time
638
            $queryBuilder
639 5
                ->where(
640 5
                    $queryBuilder->expr()->eq('scheduled', $timestamp)
641
                );
642
        }
643
644
        $queryBuilder
645 8
            ->andWhere('NOT exec_time')
646 8
            ->andWhere('NOT process_id')
647 8
            ->andWhere($queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($pageId, PDO::PARAM_INT)))
648 8
            ->andWhere($queryBuilder->expr()->eq('parameters_hash', $queryBuilder->createNamedParameter($parametersHash, PDO::PARAM_STR)));
649
650 8
        $statement = $queryBuilder->execute();
651
652 8
        while ($row = $statement->fetch()) {
653 7
            $rows[] = $row['qid'];
654
        }
655
656 8
        return $rows;
657
    }
658
659 4
    public function getQueueEntriesForPageId(int $id, int $itemsPerPage, QueueFilter $queueFilter): array
660
    {
661 4
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
662
        $queryBuilder
663 4
            ->select('*')
664 4
            ->from($this->tableName)
665 4
            ->where(
666 4
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($id, PDO::PARAM_INT))
667
            )
668 4
            ->orderBy('scheduled', 'DESC');
669
670 4
        $expressionBuilder = GeneralUtility::makeInstance(ConnectionPool::class)
671 4
            ->getConnectionForTable($this->tableName)
672 4
            ->getExpressionBuilder();
673 4
        $query = $expressionBuilder->andX();
0 ignored issues
show
Unused Code introduced by
The assignment to $query is dead and can be removed.
Loading history...
674
        // PHPStorm adds the highlight that the $addWhere is immediately overwritten,
675
        // but the $query = $expressionBuilder->andX() ensures that the $addWhere is written correctly with AND
676
        // between the statements, it's not a mistake in the code.
677 4
        switch ($queueFilter) {
678 4
            case 'pending':
679 1
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
680 1
                break;
681 3
            case 'finished':
682 1
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
683 1
                break;
684
        }
685
686 4
        if ($itemsPerPage > 0) {
687
            $queryBuilder
688 4
                ->setMaxResults($itemsPerPage);
689
        }
690
691 4
        return $queryBuilder->execute()->fetchAll();
692
    }
693
694
    /**
695
     * This internal helper method is used to create an instance of an entry object
696
     *
697
     * @param Process $process
698
     * @param string $orderByField first matching item will be returned as object
699
     * @param string $orderBySorting sorting direction
700
     */
701 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
702
    {
703 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
704
        $first = $queryBuilder
705 5
            ->select('*')
706 5
            ->from($this->tableName)
707 5
            ->where(
708 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
709 5
                $queryBuilder->expr()->gt('exec_time', 0)
710
            )
711 5
            ->setMaxResults(1)
712 5
            ->addOrderBy($orderByField, $orderBySorting)
713 5
            ->execute()->fetch(0);
714
715 5
        return $first ?: [];
716
    }
717
}
718