Passed
Push — typo3v9 ( 45c992...e095e3 )
by Tomas Norre
06:12
created

QueueRepository   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 462
Duplicated Lines 0 %

Test Coverage

Coverage 62.33%

Importance

Changes 0
Metric Value
eloc 200
c 0
b 0
f 0
dl 0
loc 462
ccs 182
cts 292
cp 0.6233
rs 9.44
wmc 37

22 Methods

Rating   Name   Duplication   Size   Complexity  
A getLastProcessedEntriesTimestamps() 0 16 2
A getSetIdWithUnprocessedEntries() 0 19 2
A getTotalQueueEntriesByConfiguration() 0 22 3
A countNonExecutedItemsByProcess() 0 13 1
A countAllPendingItems() 0 14 1
A getPerformanceData() 0 21 2
A unsetQueueProcessId() 0 10 1
A countAllUnassignedPendingItems() 0 14 1
A findByQueueId() 0 12 2
A countPendingItemsGroupedByConfigurationKey() 0 15 1
A getUnprocessedItems() 0 11 1
A countAllAssignedPendingItems() 0 14 1
A findOldestEntryForProcess() 0 3 1
A isPageInQueueTimed() 0 3 1
A getLastProcessedEntries() 0 16 2
A countUnprocessedItems() 0 3 1
A findYoungestEntryForProcess() 0 3 1
A getAvailableSets() 0 17 2
B isPageInQueue() 0 45 7
A getFirstOrLastObjectByProcess() 0 15 2
A __construct() 0 2 1
A countExecutedItemsByProcess() 0 13 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace AOE\Crawler\Domain\Repository;
6
7
/***************************************************************
8
 *  Copyright notice
9
 *
10
 *  (c) 2019 AOE GmbH <[email protected]>
11
 *
12
 *  All rights reserved
13
 *
14
 *  This script is part of the TYPO3 project. The TYPO3 project is
15
 *  free software; you can redistribute it and/or modify
16
 *  it under the terms of the GNU General Public License as published by
17
 *  the Free Software Foundation; either version 3 of the License, or
18
 *  (at your option) any later version.
19
 *
20
 *  The GNU General Public License can be found at
21
 *  http://www.gnu.org/copyleft/gpl.html.
22
 *
23
 *  This script is distributed in the hope that it will be useful,
24
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
25
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26
 *  GNU General Public License for more details.
27
 *
28
 *  This copyright notice MUST APPEAR in all copies of the script!
29
 ***************************************************************/
30
31
use AOE\Crawler\Domain\Model\Process;
32
use TYPO3\CMS\Core\Database\ConnectionPool;
33
use TYPO3\CMS\Core\Utility\GeneralUtility;
34
use TYPO3\CMS\Core\Utility\MathUtility;
35
36
/**
37
 * Class QueueRepository
38
 *
39
 * @package AOE\Crawler\Domain\Repository
40
 */
41
class QueueRepository extends AbstractRepository
42
{
43
    /**
44
     * @var string
45
     */
46
    protected $tableName = 'tx_crawler_queue';
47
48 76
    public function __construct()
49
    {
50
        // Left empty intentional
51 76
    }
52
53 3
    public function unsetQueueProcessId(string $processId): void
54
    {
55 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
56
        $queryBuilder
57 3
            ->update($this->tableName)
58 3
            ->where(
59 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
60
            )
61 3
            ->set('process_id', '')
62 3
            ->execute();
63 3
    }
64
65
    /**
66
     * This method is used to find the youngest entry for a given process.
67
     *
68
     * @param Process $process
69
     *
70
     * @return array
71
     */
72 1
    public function findYoungestEntryForProcess(Process $process): array
73
    {
74 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
75
    }
76
77
    /**
78
     * This method is used to find the oldest entry for a given process.
79
     *
80
     * @param Process $process
81
     *
82
     * @return array
83
     */
84 1
    public function findOldestEntryForProcess(Process $process): array
85
    {
86 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
87
    }
88
89
    /**
90
     * This internal helper method is used to create an instance of an entry object
91
     *
92
     * @param Process $process
93
     * @param string $orderByField first matching item will be returned as object
94
     * @param string $orderBySorting sorting direction
95
     *
96
     * @return array
97
     */
98 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
99
    {
100 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
101
        $first = $queryBuilder
102 5
            ->select('*')
103 5
            ->from($this->tableName)
104 5
            ->where(
105 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
106 5
                $queryBuilder->expr()->gt('exec_time', 0)
107
            )
108 5
            ->setMaxResults(1)
109 5
            ->addOrderBy($orderByField, $orderBySorting)
110 5
            ->execute()->fetch(0);
111
112 5
        return $first ?: [];
113
    }
114
115
    /**
116
     * Counts all executed items of a process.
117
     *
118
     * @param Process $process
119
     *
120
     * @return int
121
     */
122 1
    public function countExecutedItemsByProcess($process): int
123
    {
124 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
125
126
        return $queryBuilder
127 1
            ->count('*')
128 1
            ->from($this->tableName)
129 1
            ->where(
130 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
131 1
                $queryBuilder->expr()->gt('exec_time', 0)
132
            )
133 1
            ->execute()
134 1
            ->fetchColumn(0);
135
    }
136
137
    /**
138
     * Counts items of a process which yet have not been processed/executed
139
     *
140
     * @param Process $process
141
     *
142
     * @return int
143
     */
144 1
    public function countNonExecutedItemsByProcess($process): int
145
    {
146 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
147
148
        return $queryBuilder
149 1
            ->count('*')
150 1
            ->from($this->tableName)
151 1
            ->where(
152 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
153 1
                $queryBuilder->expr()->eq('exec_time', 0)
154
            )
155 1
            ->execute()
156 1
            ->fetchColumn(0);
157
    }
158
159
    /**
160
     * get items which have not been processed yet
161
     *
162
     * @return array
163
     */
164 3
    public function getUnprocessedItems(): array
165
    {
166 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
167
168
        return $queryBuilder
169 3
            ->select('*')
170 3
            ->from($this->tableName)
171 3
            ->where(
172 3
                $queryBuilder->expr()->eq('exec_time', 0)
173
            )
174 3
            ->execute()->fetchAll();
175
    }
176
177
    /**
178
     * Count items which have not been processed yet
179
     *
180
     * @return int
181
     */
182 3
    public function countUnprocessedItems(): int
183
    {
184 3
        return count($this->getUnprocessedItems());
185
    }
186
187
    /**
188
     * This method can be used to count all queue entrys which are
189
     * scheduled for now or a earlier date.
190
     *
191
     * @return int
192
     */
193 2
    public function countAllPendingItems(): int
194
    {
195 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
196
197
        return $queryBuilder
198 2
            ->count('*')
199 2
            ->from($this->tableName)
200 2
            ->where(
201 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
202 2
                $queryBuilder->expr()->eq('exec_time', 0),
203 2
                $queryBuilder->expr()->lte('scheduled', time())
204
            )
205 2
            ->execute()
206 2
            ->fetchColumn(0);
207
    }
208
209
    /**
210
     * This method can be used to count all queue entrys which are
211
     * scheduled for now or a earlier date and are assigned to a process.
212
     *
213
     * @return int
214
     */
215 2
    public function countAllAssignedPendingItems(): int
216
    {
217 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
218
219
        return $queryBuilder
220 2
            ->count('*')
221 2
            ->from($this->tableName)
222 2
            ->where(
223 2
                $queryBuilder->expr()->neq('process_id', '""'),
224 2
                $queryBuilder->expr()->eq('exec_time', 0),
225 2
                $queryBuilder->expr()->lte('scheduled', time())
226
            )
227 2
            ->execute()
228 2
            ->fetchColumn(0);
229
    }
230
231
    /**
232
     * This method can be used to count all queue entrys which are
233
     * scheduled for now or a earlier date and are not assigned to a process.
234
     *
235
     * @return int
236
     */
237 1
    public function countAllUnassignedPendingItems(): int
238
    {
239 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
240
241
        return $queryBuilder
242 1
            ->count('*')
243 1
            ->from($this->tableName)
244 1
            ->where(
245 1
                $queryBuilder->expr()->eq('process_id', '""'),
246 1
                $queryBuilder->expr()->eq('exec_time', 0),
247 1
                $queryBuilder->expr()->lte('scheduled', time())
248
            )
249 1
            ->execute()
250 1
            ->fetchColumn(0);
251
    }
252
253
    /**
254
     * Count pending queue entries grouped by configuration key
255
     *
256
     * @return array
257
     */
258 1
    public function countPendingItemsGroupedByConfigurationKey(): array
259
    {
260 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
261
        $statement = $queryBuilder
262 1
            ->from($this->tableName)
263 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
264 1
            ->addSelect('configuration')
265 1
            ->where(
266 1
                $queryBuilder->expr()->eq('exec_time', 0),
267 1
                $queryBuilder->expr()->lt('scheduled', time())
268
            )
269 1
            ->groupBy('configuration')
270 1
            ->execute();
271
272 1
        return $statement->fetchAll();
273
    }
274
275
    /**
276
     * Get set id with unprocessed entries
277
     *
278
     * @return array array of set ids
279
     */
280 1
    public function getSetIdWithUnprocessedEntries(): array
281
    {
282 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
283
        $statement = $queryBuilder
284 1
            ->select('set_id')
285 1
            ->from($this->tableName)
286 1
            ->where(
287 1
                $queryBuilder->expr()->lt('scheduled', time()),
288 1
                $queryBuilder->expr()->eq('exec_time', 0)
289
            )
290 1
            ->addGroupBy('set_id')
291 1
            ->execute();
292
293 1
        $setIds = [];
294 1
        while ($row = $statement->fetch()) {
295 1
            $setIds[] = intval($row['set_id']);
296
        }
297
298 1
        return $setIds;
299
    }
300
301
    /**
302
     * Get total queue entries by configuration
303
     *
304
     * @param array $setIds
305
     *
306
     * @return array totals by configuration (keys)
307
     */
308 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
309
    {
310 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
311 1
        $totals = [];
312 1
        if (count($setIds) > 0) {
313
            $statement = $queryBuilder
314 1
                ->from($this->tableName)
315 1
                ->selectLiteral('count(*) as c')
316 1
                ->addSelect('configuration')
317 1
                ->where(
318 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
319 1
                    $queryBuilder->expr()->lt('scheduled', time())
320
                )
321 1
                ->groupBy('configuration')
322 1
                ->execute();
323
324 1
            while ($row = $statement->fetch()) {
325 1
                $totals[$row['configuration']] = $row['c'];
326
            }
327
        }
328
329 1
        return $totals;
330
    }
331
332
    /**
333
     * Get the timestamps of the last processed entries
334
     *
335
     * @param int $limit
336
     *
337
     * @return array
338
     */
339 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
340
    {
341 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
342
        $statement = $queryBuilder
343 1
            ->select('exec_time')
344 1
            ->from($this->tableName)
345 1
            ->addOrderBy('exec_time', 'desc')
346 1
            ->setMaxResults($limit)
347 1
            ->execute();
348
349 1
        $rows = [];
350 1
        while ($row = $statement->fetch()) {
351 1
            $rows[] = $row['exec_time'];
352
        }
353
354 1
        return $rows;
355
    }
356
357
    /**
358
     * Get the last processed entries
359
     *
360
     * @param int $limit
361
     *
362
     * @return array
363
     */
364 1
    public function getLastProcessedEntries($limit = 100): array
365
    {
366 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
367
        $statement = $queryBuilder
368 1
            ->from($this->tableName)
369 1
            ->select('*')
370 1
            ->orderBy('exec_time', 'desc')
371 1
            ->setMaxResults($limit)
372 1
            ->execute();
373
374 1
        $rows = [];
375 1
        while (($row = $statement->fetch()) !== false) {
376 1
            $rows[] = $row;
377
        }
378
379 1
        return $rows;
380
    }
381
382
    /**
383
     * Get performance statistics data
384
     *
385
     * @param int $start timestamp
386
     * @param int $end timestamp
387
     *
388
     * @return array performance data
389
     */
390 1
    public function getPerformanceData($start, $end): array
391
    {
392 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
393
        $statement = $queryBuilder
394 1
            ->from($this->tableName)
395 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
396 1
            ->addSelect('process_id_completed')
397 1
            ->where(
398 1
                $queryBuilder->expr()->neq('exec_time', 0),
399 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
400 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
401
            )
402 1
            ->groupBy('process_id_completed')
403 1
            ->execute();
404
405 1
        $rows = [];
406 1
        while ($row = $statement->fetch()) {
407 1
            $rows[$row['process_id_completed']] = $row;
408
        }
409
410 1
        return $rows;
411
    }
412
413
    /**
414
     * Determines if a page is queued
415
     */
416 5
    public function isPageInQueue(int $uid, bool $unprocessed_only = true, bool $timed_only = false, int $timestamp = 0): bool
417
    {
418
        // TODO: Looks like the check is obsolete as PHP doesn't allow other than ints for the $uid, PHP 7.2 Strict Mode
419 5
        if (!MathUtility::canBeInterpretedAsInteger($uid)) {
420
            throw new \InvalidArgumentException('Invalid parameter type', 1468931945);
421
        }
422
423 5
        $isPageInQueue = false;
424
425 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
426
        $statement = $queryBuilder
427 5
            ->from($this->tableName)
428 5
            ->count('*')
429 5
            ->where(
430 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
431
            );
432
433 5
        if (false !== $unprocessed_only) {
434 2
            $statement->andWhere(
435 2
                $queryBuilder->expr()->eq('exec_time', 0)
436
            );
437
        }
438
439 5
        if (false !== $timed_only) {
440 1
            $statement->andWhere(
441 1
                $queryBuilder->expr()->neq('scheduled', 0)
442
            );
443
        }
444
445 5
        if ($timestamp) {
446 1
            $statement->andWhere(
447 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
448
            );
449
        }
450
451
        // TODO: Currently it's not working if page doesn't exists. See tests
452
        $count = $statement
453 5
            ->execute()
454 5
            ->fetchColumn(0);
455
456 5
        if (false !== $count && $count > 0) {
457 4
            $isPageInQueue = true;
458
        }
459
460 5
        return $isPageInQueue;
461
    }
462
463
    /**
464
     * Method to check if a page is in the queue which is timed for a
465
     * date when it should be crawled
466
     */
467 1
    public function isPageInQueueTimed(int $uid, bool $show_unprocessed = true): bool
468
    {
469 1
        return $this->isPageInQueue($uid, $show_unprocessed);
470
    }
471
472
    public function getAvailableSets(): array
473
    {
474
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
475
        $statement = $queryBuilder
476
            ->selectLiteral('count(*) as count_value')
477
            ->addSelect('set_id', 'scheduled')
478
            ->from($this->tableName)
479
            ->orderBy('scheduled', 'desc')
480
            ->groupBy('set_id', 'scheduled')
481
            ->execute();
482
483
        $rows = [];
484
        while ($row = $statement->fetch()) {
485
            $rows[] = $row;
486
        }
487
488
        return $rows;
489
    }
490
491
    public function findByQueueId(string $queueId): ?array
492
    {
493
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
494
        $queueRec = $queryBuilder
495
            ->select('*')
496
            ->from($this->tableName)
497
            ->where(
498
                $queryBuilder->expr()->eq('qid', $queryBuilder->createNamedParameter($queueId))
499
            )
500
            ->execute()
501
            ->fetch();
502
        return is_array($queueRec) ? $queueRec : null;
503
    }
504
}
505