Passed
Push — testing/behaviour-104 ( 344e70...67b3c9 )
by Tomas Norre
36:59 queued 19:28
created

getLastProcessedEntriesTimestamps()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 11
c 0
b 0
f 0
nc 2
nop 1
dl 0
loc 16
rs 9.9
1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2020 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Configuration\ExtensionConfigurationProvider;
32
use AOE\Crawler\Domain\Model\Process;
33
use Psr\Log\LoggerAwareInterface;
34
use TYPO3\CMS\Core\Database\Connection;
35
use TYPO3\CMS\Core\Database\ConnectionPool;
36
use TYPO3\CMS\Core\Utility\GeneralUtility;
37
use TYPO3\CMS\Extbase\Object\ObjectManager;
38
use TYPO3\CMS\Extbase\Persistence\Repository;
39
40
/**
41
 * Class QueueRepository
42
 *
43
 * @package AOE\Crawler\Domain\Repository
44
 */
45
class QueueRepository extends Repository implements LoggerAwareInterface
46
{
47
    use \Psr\Log\LoggerAwareTrait;
48
49
    /**
50
     * @var string
51
     */
52
    protected $tableName = 'tx_crawler_queue';
53
54
    public function __construct()
55
    {
56
        $objectManager = GeneralUtility::makeInstance(ObjectManager::class);
57
        parent::__construct($objectManager);
58
    }
59
60
    public function unsetQueueProcessId(string $processId): void
61
    {
62
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
63
        $queryBuilder
64
            ->update($this->tableName)
65
            ->where(
66
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
67
            )
68
            ->set('process_id', '')
69
            ->execute();
70
    }
71
72
    /**
73
     * This method is used to find the youngest entry for a given process.
74
     */
75
    public function findYoungestEntryForProcess(Process $process): array
76
    {
77
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
78
    }
79
80
    /**
81
     * This method is used to find the oldest entry for a given process.
82
     */
83
    public function findOldestEntryForProcess(Process $process): array
84
    {
85
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
86
    }
87
88
    /**
89
     * Counts all executed items of a process.
90
     *
91
     * @param Process $process
92
     */
93
    public function countExecutedItemsByProcess($process): int
94
    {
95
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
96
97
        return $queryBuilder
98
            ->count('*')
99
            ->from($this->tableName)
100
            ->where(
101
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
102
                $queryBuilder->expr()->gt('exec_time', 0)
103
            )
104
            ->execute()
105
            ->fetchColumn(0);
106
    }
107
108
    /**
109
     * Counts items of a process which yet have not been processed/executed
110
     *
111
     * @param Process $process
112
     */
113
    public function countNonExecutedItemsByProcess($process): int
114
    {
115
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
116
117
        return $queryBuilder
118
            ->count('*')
119
            ->from($this->tableName)
120
            ->where(
121
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
122
                $queryBuilder->expr()->eq('exec_time', 0)
123
            )
124
            ->execute()
125
            ->fetchColumn(0);
126
    }
127
128
    /**
129
     * get items which have not been processed yet
130
     */
131
    public function getUnprocessedItems(): array
132
    {
133
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
134
135
        return $queryBuilder
136
            ->select('*')
137
            ->from($this->tableName)
138
            ->where(
139
                $queryBuilder->expr()->eq('exec_time', 0)
140
            )
141
            ->execute()->fetchAll();
142
    }
143
144
    /**
145
     * Count items which have not been processed yet
146
     */
147
    public function countUnprocessedItems(): int
148
    {
149
        return count($this->getUnprocessedItems());
150
    }
151
152
    /**
153
     * This method can be used to count all queue entrys which are
154
     * scheduled for now or a earlier date.
155
     */
156
    public function countAllPendingItems(): int
157
    {
158
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
159
160
        return $queryBuilder
161
            ->count('*')
162
            ->from($this->tableName)
163
            ->where(
164
                $queryBuilder->expr()->eq('process_scheduled', 0),
165
                $queryBuilder->expr()->eq('exec_time', 0),
166
                $queryBuilder->expr()->lte('scheduled', time())
167
            )
168
            ->execute()
169
            ->fetchColumn(0);
170
    }
171
172
    /**
173
     * This method can be used to count all queue entries which are
174
     * scheduled for now or a earlier date and are assigned to a process.
175
     */
176
    public function countAllAssignedPendingItems(): int
177
    {
178
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
179
180
        return $queryBuilder
181
            ->count('*')
182
            ->from($this->tableName)
183
            ->where(
184
                $queryBuilder->expr()->neq('process_id', '""'),
185
                $queryBuilder->expr()->eq('exec_time', 0),
186
                $queryBuilder->expr()->lte('scheduled', time())
187
            )
188
            ->execute()
189
            ->fetchColumn(0);
190
    }
191
192
    /**
193
     * This method can be used to count all queue entrys which are
194
     * scheduled for now or a earlier date and are not assigned to a process.
195
     */
196
    public function countAllUnassignedPendingItems(): int
197
    {
198
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
199
200
        return $queryBuilder
201
            ->count('*')
202
            ->from($this->tableName)
203
            ->where(
204
                $queryBuilder->expr()->eq('process_id', '""'),
205
                $queryBuilder->expr()->eq('exec_time', 0),
206
                $queryBuilder->expr()->lte('scheduled', time())
207
            )
208
            ->execute()
209
            ->fetchColumn(0);
210
    }
211
212
    /**
213
     * Count pending queue entries grouped by configuration key
214
     */
215
    public function countPendingItemsGroupedByConfigurationKey(): array
216
    {
217
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
218
        $statement = $queryBuilder
219
            ->from($this->tableName)
220
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
221
            ->addSelect('configuration')
222
            ->where(
223
                $queryBuilder->expr()->eq('exec_time', 0),
224
                $queryBuilder->expr()->lt('scheduled', time())
225
            )
226
            ->groupBy('configuration')
227
            ->execute();
228
229
        return $statement->fetchAll();
230
    }
231
232
    /**
233
     * Get set id with unprocessed entries
234
     *
235
     * @return array array of set ids
236
     */
237
    public function getSetIdWithUnprocessedEntries(): array
238
    {
239
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
240
        $statement = $queryBuilder
241
            ->select('set_id')
242
            ->from($this->tableName)
243
            ->where(
244
                $queryBuilder->expr()->lt('scheduled', time()),
245
                $queryBuilder->expr()->eq('exec_time', 0)
246
            )
247
            ->addGroupBy('set_id')
248
            ->execute();
249
250
        $setIds = [];
251
        while ($row = $statement->fetch()) {
252
            $setIds[] = intval($row['set_id']);
253
        }
254
255
        return $setIds;
256
    }
257
258
    /**
259
     * Get total queue entries by configuration
260
     *
261
     * @return array totals by configuration (keys)
262
     */
263
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
264
    {
265
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
266
        $totals = [];
267
        if (count($setIds) > 0) {
268
            $statement = $queryBuilder
269
                ->from($this->tableName)
270
                ->selectLiteral('count(*) as c')
271
                ->addSelect('configuration')
272
                ->where(
273
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
274
                    $queryBuilder->expr()->lt('scheduled', time())
275
                )
276
                ->groupBy('configuration')
277
                ->execute();
278
279
            while ($row = $statement->fetch()) {
280
                $totals[$row['configuration']] = $row['c'];
281
            }
282
        }
283
284
        return $totals;
285
    }
286
287
    /**
288
     * Get the timestamps of the last processed entries
289
     *
290
     * @param int $limit
291
     */
292
    public function getLastProcessedEntriesTimestamps($limit = 100): array
293
    {
294
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
295
        $statement = $queryBuilder
296
            ->select('exec_time')
297
            ->from($this->tableName)
298
            ->addOrderBy('exec_time', 'desc')
299
            ->setMaxResults($limit)
300
            ->execute();
301
302
        $rows = [];
303
        while ($row = $statement->fetch()) {
304
            $rows[] = $row['exec_time'];
305
        }
306
307
        return $rows;
308
    }
309
310
    /**
311
     * Get the last processed entries
312
     *
313
     * @param int $limit
314
     */
315
    public function getLastProcessedEntries($limit = 100): array
316
    {
317
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
318
        $statement = $queryBuilder
319
            ->from($this->tableName)
320
            ->select('*')
321
            ->orderBy('exec_time', 'desc')
322
            ->setMaxResults($limit)
323
            ->execute();
324
325
        $rows = [];
326
        while (($row = $statement->fetch()) !== false) {
327
            $rows[] = $row;
328
        }
329
330
        return $rows;
331
    }
332
333
    /**
334
     * Get performance statistics data
335
     *
336
     * @param int $start timestamp
337
     * @param int $end timestamp
338
     *
339
     * @return array performance data
340
     */
341
    public function getPerformanceData($start, $end): array
342
    {
343
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
344
        $statement = $queryBuilder
345
            ->from($this->tableName)
346
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
347
            ->addSelect('process_id_completed')
348
            ->where(
349
                $queryBuilder->expr()->neq('exec_time', 0),
350
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
351
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
352
            )
353
            ->groupBy('process_id_completed')
354
            ->execute();
355
356
        $rows = [];
357
        while ($row = $statement->fetch()) {
358
            $rows[$row['process_id_completed']] = $row;
359
        }
360
361
        return $rows;
362
    }
363
364
    /**
365
     * Determines if a page is queued
366
     */
367
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
368
    {
369
        $isPageInQueue = false;
370
371
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
372
        $statement = $queryBuilder
373
            ->from($this->tableName)
374
            ->count('*')
375
            ->where(
376
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
377
            );
378
379
        if ($unprocessed_only !== false) {
380
            $statement->andWhere(
381
                $queryBuilder->expr()->eq('exec_time', 0)
382
            );
383
        }
384
385
        if ($timed_only !== false) {
386
            $statement->andWhere(
387
                $queryBuilder->expr()->neq('scheduled', 0)
388
            );
389
        }
390
391
        if ($timestamp) {
392
            $statement->andWhere(
393
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
394
            );
395
        }
396
397
        // TODO: Currently it's not working if page doesn't exists. See tests
398
        $count = $statement
399
            ->execute()
400
            ->fetchColumn(0);
401
402
        if ($count !== false && $count > 0) {
403
            $isPageInQueue = true;
404
        }
405
406
        return $isPageInQueue;
407
    }
408
409
    /**
410
     * Method to check if a page is in the queue which is timed for a
411
     * date when it should be crawled
412
     */
413
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
414
    {
415
        return $this->isPageInQueue($uid, $show_unprocessed);
416
    }
417
418
    public function getAvailableSets(): array
419
    {
420
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
421
        $statement = $queryBuilder
422
            ->selectLiteral('count(*) as count_value')
423
            ->addSelect('set_id', 'scheduled')
424
            ->from($this->tableName)
425
            ->orderBy('scheduled', 'desc')
426
            ->groupBy('set_id', 'scheduled')
427
            ->execute();
428
429
        $rows = [];
430
        while ($row = $statement->fetch()) {
431
            $rows[] = $row;
432
        }
433
434
        return $rows;
435
    }
436
437
    public function findByQueueId(string $queueId): ?array
438
    {
439
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
440
        $queueRec = $queryBuilder
441
            ->select('*')
442
            ->from($this->tableName)
443
            ->where(
444
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
445
            )
446
            ->execute()
447
            ->fetch();
448
        return is_array($queueRec) ? $queueRec : null;
449
    }
450
451
    public function cleanupQueue(): void
452
    {
453
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
454
        $purgeDays = (int) $extensionSettings['purgeQueueDays'];
455
456
        if ($purgeDays > 0) {
457
            $purgeDate = time() - 24 * 60 * 60 * $purgeDays;
458
459
            $queryBuilderDelete = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
460
            $del = $queryBuilderDelete
461
                ->delete($this->tableName)
462
                ->where(
463
                    'exec_time != 0 AND exec_time < ' . $purgeDate
464
                )->execute();
465
466
            if ($del === false) {
467
                $this->logger->info(
468
                    'Records could not be deleted.'
469
                );
470
            }
471
        }
472
    }
473
474
    /**
475
     * Cleans up entries that stayed for too long in the queue. These are default:
476
     * - processed entries that are over 1.5 days in age
477
     * - scheduled entries that are over 7 days old
478
     */
479
    public function cleanUpOldQueueEntries(): void
480
    {
481
        $extensionSettings = GeneralUtility::makeInstance(ExtensionConfigurationProvider::class)->getExtensionConfiguration();
482
        $processedAgeInSeconds = $extensionSettings['cleanUpProcessedAge'] * 86400; // 24*60*60 Seconds in 24 hours
483
        $scheduledAgeInSeconds = $extensionSettings['cleanUpScheduledAge'] * 86400;
484
485
        $now = time();
486
        $condition = '(exec_time<>0 AND exec_time<' . ($now - $processedAgeInSeconds) . ') OR scheduled<=' . ($now - $scheduledAgeInSeconds);
487
488
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
489
        $del = $queryBuilder
490
            ->delete($this->tableName)
491
            ->where(
492
                $condition
493
            )->execute();
494
495
        if ($del === false) {
496
            $this->logger->info(
497
                'Records could not be deleted.'
498
            );
499
        }
500
    }
501
502
    public function fetchRecordsToBeCrawled(int $countInARun)
503
    {
504
        $queryBuilderSelect = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
505
        return $queryBuilderSelect
506
            ->select('qid', 'scheduled', 'page_id', 'sitemap_priority')
507
            ->from($this->tableName)
508
            ->leftJoin(
509
                $this->tableName,
510
                'pages',
511
                'p',
512
                $queryBuilderSelect->expr()->eq('p.uid', $queryBuilderSelect->quoteIdentifier($this->tableName . '.page_id'))
513
            )
514
            ->where(
515
                $queryBuilderSelect->expr()->eq('exec_time', 0),
516
                $queryBuilderSelect->expr()->eq('process_scheduled', 0),
517
                $queryBuilderSelect->expr()->lte('scheduled', time())
518
            )
519
            ->orderBy('sitemap_priority', 'DESC')
520
            ->addOrderBy('scheduled')
521
            ->addOrderBy('qid')
522
            ->setMaxResults($countInARun)
523
            ->execute()
524
            ->fetchAll();
525
    }
526
527
    public function updateProcessIdAndSchedulerForQueueIds(array $quidList, string $processId)
528
    {
529
        $queryBuilderUpdate = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
530
        return $queryBuilderUpdate
531
            ->update($this->tableName)
532
            ->where(
533
                $queryBuilderUpdate->expr()->in('qid', $quidList)
534
            )
535
            ->set('process_scheduled', time())
536
            ->set('process_id', $processId)
537
            ->execute();
538
    }
539
540
    public function unsetProcessScheduledAndProcessIdForQueueEntries(array $processIds): void
541
    {
542
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
543
        $queryBuilder
544
            ->update($this->tableName)
545
            ->where(
546
                $queryBuilder->expr()->eq('exec_time', 0),
547
                $queryBuilder->expr()->in('process_id', $queryBuilder->createNamedParameter($processIds, Connection::PARAM_STR_ARRAY))
548
            )
549
            ->set('process_scheduled', 0)
550
            ->set('process_id', '')
551
            ->execute();
552
    }
553
554
    /**
555
     * This method is used to count if there are ANY unprocessed queue entries
556
     * of a given page_id and the configuration which matches a given hash.
557
     * If there if none, we can skip an inner detail check
558
     *
559
     * @param int $uid
560
     * @param string $configurationHash
561
     * @return boolean
562
     */
563
    public function noUnprocessedQueueEntriesForPageWithConfigurationHashExist($uid, $configurationHash): bool
564
    {
565
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
566
        $noUnprocessedQueueEntriesFound = true;
567
568
        $result = $queryBuilder
569
            ->count('*')
570
            ->from($this->tableName)
571
            ->where(
572
                $queryBuilder->expr()->eq('page_id', (int) $uid),
573
                $queryBuilder->expr()->eq('configuration_hash', $queryBuilder->createNamedParameter($configurationHash)),
574
                $queryBuilder->expr()->eq('exec_time', 0)
575
            )
576
            ->execute()
577
            ->fetchColumn();
578
579
        if ($result) {
580
            $noUnprocessedQueueEntriesFound = false;
581
        }
582
583
        return $noUnprocessedQueueEntriesFound;
584
    }
585
586
    /**
587
     * Removes queue entries
588
     *
589
     * @param string $filter all, pending, finished
590
     */
591
    public function flushQueue(string $filter): void
592
    {
593
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
594
595
        switch (strtolower($filter)) {
596
            case 'all':
597
                // No where claus needed delete everything
598
                break;
599
            case 'pending':
600
                $queryBuilder->andWhere($queryBuilder->expr()->eq('exec_time', 0));
601
                break;
602
            case 'finished':
603
            default:
604
                $queryBuilder->andWhere($queryBuilder->expr()->gt('exec_time', 0));
605
                break;
606
        }
607
608
        $queryBuilder
609
            ->delete($this->tableName)
610
            ->execute();
611
    }
612
613
    /**
614
     * @param string $processId
615
     *
616
     * @return bool|string
617
     */
618
    public function countAllByProcessId($processId)
619
    {
620
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
621
622
        return $queryBuilder
623
            ->count('*')
624
            ->from($this->tableName)
625
            ->where(
626
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId, \PDO::PARAM_STR))
627
            )
628
            ->execute()
629
            ->fetchColumn(0);
630
    }
631
632
    /**
633
     * This internal helper method is used to create an instance of an entry object
634
     *
635
     * @param Process $process
636
     * @param string $orderByField first matching item will be returned as object
637
     * @param string $orderBySorting sorting direction
638
     */
639
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
640
    {
641
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
642
        $first = $queryBuilder
643
            ->select('*')
644
            ->from($this->tableName)
645
            ->where(
646
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
647
                $queryBuilder->expr()->gt('exec_time', 0)
648
            )
649
            ->setMaxResults(1)
650
            ->addOrderBy($orderByField, $orderBySorting)
651
            ->execute()->fetch(0);
652
653
        return $first ?: [];
654
    }
655
}
656