Completed
Push — typo3v9 ( 6762b6...ea38b1 )
by Tomas Norre
05:52
created

QueueRepository::countAllUnassignedPendingItems()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 1.0189

Importance

Changes 0
Metric Value
cc 1
nc 1
nop 0
dl 0
loc 16
ccs 11
cts 15
cp 0.7332
crap 1.0189
rs 9.7333
c 0
b 0
f 0
1
<?php
2
namespace AOE\Crawler\Domain\Repository;
3
4
/***************************************************************
5
 *  Copyright notice
6
 *
7
 *  (c) 2019 AOE GmbH <[email protected]>
8
 *
9
 *  All rights reserved
10
 *
11
 *  This script is part of the TYPO3 project. The TYPO3 project is
12
 *  free software; you can redistribute it and/or modify
13
 *  it under the terms of the GNU General Public License as published by
14
 *  the Free Software Foundation; either version 3 of the License, or
15
 *  (at your option) any later version.
16
 *
17
 *  The GNU General Public License can be found at
18
 *  http://www.gnu.org/copyleft/gpl.html.
19
 *
20
 *  This script is distributed in the hope that it will be useful,
21
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
22
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23
 *  GNU General Public License for more details.
24
 *
25
 *  This copyright notice MUST APPEAR in all copies of the script!
26
 ***************************************************************/
27
28
use AOE\Crawler\Domain\Model\Process;
29
use AOE\Crawler\Domain\Model\Queue;
30
use TYPO3\CMS\Core\Database\ConnectionPool;
31
use TYPO3\CMS\Core\Database\Query\QueryBuilder;
32
use TYPO3\CMS\Core\Utility\GeneralUtility;
33
use TYPO3\CMS\Core\Utility\MathUtility;
34
35
/**
36
 * Class QueueRepository
37
 *
38
 * @package AOE\Crawler\Domain\Repository
39
 */
40
class QueueRepository extends AbstractRepository
41
{
42
    /**
43
     * @var string
44
     */
45
    protected $tableName = 'tx_crawler_queue';
46
47 63
    public function __construct()
48
    {
49
        // Left empty intentional
50 63
    }
51
52
    /**
53
     * @param $processId
54
     */
55 3
    public function unsetQueueProcessId($processId): void
56
    {
57 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
58
        $queryBuilder
59 3
            ->update($this->tableName)
60 3
            ->where(
61 3
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($processId))
62
            )
63 3
            ->set('process_id', '')
64 3
            ->execute();
65 3
    }
66
67
    /**
68
     * This method is used to find the youngest entry for a given process.
69
     *
70
     * @param Process $process
71
     *
72
     * @return array
73
     */
74 1
    public function findYoungestEntryForProcess(Process $process): array
75
    {
76 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
77
    }
78
79
    /**
80
     * This method is used to find the oldest entry for a given process.
81
     *
82
     * @param Process $process
83
     *
84
     * @return array
85
     */
86 1
    public function findOldestEntryForProcess(Process $process): array
87
    {
88 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
89
    }
90
91
    /**
92
     * This internal helper method is used to create an instance of an entry object
93
     *
94
     * @param Process $process
95
     * @param string $orderByField first matching item will be returned as object
96
     * @param string $orderBySorting sorting direction
97
     *
98
     * @return array
99
     */
100 5
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC'): array
101
    {
102 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
103
        $first = $queryBuilder
104 5
            ->select('*')
105 5
            ->from($this->tableName)
106 5
            ->where(
107 5
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
108 5
                $queryBuilder->expr()->gt('exec_time', 0)
109
            )
110 5
            ->setMaxResults(1)
111 5
            ->addOrderBy($orderByField, $orderBySorting)
112 5
            ->execute()->fetch(0);
113
114 5
        return $first ?: [];
115
    }
116
117
    /**
118
     * Counts all executed items of a process.
119
     *
120
     * @param Process $process
121
     *
122
     * @return int
123
     */
124 1
    public function countExecutedItemsByProcess($process): int
125
    {
126 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
127
        $count = $queryBuilder
128 1
            ->count('*')
129 1
            ->from($this->tableName)
130 1
            ->where(
131 1
                $queryBuilder->expr()->eq('process_id_completed', $queryBuilder->createNamedParameter($process->getProcessId())),
132 1
                $queryBuilder->expr()->gt('exec_time', 0)
133
            )
134 1
            ->execute()
135 1
            ->fetchColumn(0);
136
137 1
        return $count;
138
    }
139
140
    /**
141
     * Counts items of a process which yet have not been processed/executed
142
     *
143
     * @param Process $process
144
     *
145
     * @return int
146
     */
147 1
    public function countNonExecutedItemsByProcess($process): int
148
    {
149 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
150
        $count = $queryBuilder
151 1
            ->count('*')
152 1
            ->from($this->tableName)
153 1
            ->where(
154 1
                $queryBuilder->expr()->eq('process_id', $queryBuilder->createNamedParameter($process->getProcessId())),
155 1
                $queryBuilder->expr()->eq('exec_time', 0)
156
            )
157 1
            ->execute()
158 1
            ->fetchColumn(0);
159
160 1
        return $count;
161
    }
162
163
    /**
164
     * get items which have not been processed yet
165
     *
166
     * @return array
167
     */
168 3
    public function getUnprocessedItems(): array
169
    {
170 3
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
171
        $unprocessedItems = $queryBuilder
172 3
            ->select('*')
173 3
            ->from($this->tableName)
174 3
            ->where(
175 3
                $queryBuilder->expr()->eq('exec_time', 0)
176
            )
177 3
            ->execute()->fetchAll();
178
179 3
        return $unprocessedItems;
180
    }
181
182
    /**
183
     * Count items which have not been processed yet
184
     *
185
     * @return int
186
     */
187 3
    public function countUnprocessedItems(): int
188
    {
189 3
        return count($this->getUnprocessedItems());
190
    }
191
192
    /**
193
     * This method can be used to count all queue entrys which are
194
     * scheduled for now or a earlier date.
195
     *
196
     * @return int
197
     */
198 2
    public function countAllPendingItems(): int
199
    {
200 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
201
        $count = $queryBuilder
202 2
            ->count('*')
203 2
            ->from($this->tableName)
204 2
            ->where(
205 2
                $queryBuilder->expr()->eq('process_scheduled', 0),
206 2
                $queryBuilder->expr()->eq('exec_time', 0),
207 2
                $queryBuilder->expr()->lte('scheduled', time())
208
            )
209 2
            ->execute()
210 2
            ->fetchColumn(0);
211
212 2
        return $count;
213
    }
214
215
    /**
216
     * This method can be used to count all queue entrys which are
217
     * scheduled for now or a earlier date and are assigned to a process.
218
     *
219
     * @return int
220
     */
221 2
    public function countAllAssignedPendingItems(): int
222
    {
223 2
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
224
        $count = $queryBuilder
225 2
            ->count('*')
226 2
            ->from($this->tableName)
227 2
            ->where(
228 2
                $queryBuilder->expr()->neq('process_id', '""'),
229 2
                $queryBuilder->expr()->eq('exec_time', 0),
230 2
                $queryBuilder->expr()->lte('scheduled', time())
231
            )
232 2
            ->execute()
233 2
            ->fetchColumn(0);
234
235 2
        return $count;
236
    }
237
238
    /**
239
     * This method can be used to count all queue entrys which are
240
     * scheduled for now or a earlier date and are not assigned to a process.
241
     *
242
     * @return int
243
     */
244 1
    public function countAllUnassignedPendingItems(): int
245
    {
246 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
247
        $count = $queryBuilder
248 1
            ->count('*')
249 1
            ->from($this->tableName)
250 1
            ->where(
251 1
                $queryBuilder->expr()->eq('process_id', '""'),
252 1
                $queryBuilder->expr()->eq('exec_time', 0),
253 1
                $queryBuilder->expr()->lte('scheduled', time())
254
            )
255 1
            ->execute()
256 1
            ->fetchColumn(0);
257
258 1
        return $count;
259
    }
260
261
    /**
262
     * Count pending queue entries grouped by configuration key
263
     *
264
     * @return array
265
     */
266 1
    public function countPendingItemsGroupedByConfigurationKey(): array
267
    {
268 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
269
        $statement = $queryBuilder
270 1
            ->from($this->tableName)
271 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
272 1
            ->addSelect('configuration')
273 1
            ->where(
274 1
                $queryBuilder->expr()->eq('exec_time', 0),
275 1
                $queryBuilder->expr()->lt('scheduled', time())
276
            )
277 1
            ->groupBy('configuration')
278 1
            ->execute();
279
280 1
        return $statement->fetchAll();
281
    }
282
283
    /**
284
     * Get set id with unprocessed entries
285
     *
286
     * @return array array of set ids
287
     */
288 1
    public function getSetIdWithUnprocessedEntries(): array
289
    {
290 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
291
        $statement = $queryBuilder
292 1
            ->select('set_id')
293 1
            ->from($this->tableName)
294 1
            ->where(
295 1
                $queryBuilder->expr()->lt('scheduled', time()),
296 1
                $queryBuilder->expr()->eq('exec_time', 0)
297
            )
298 1
            ->addGroupBy('set_id')
299 1
            ->execute();
300
301 1
        $setIds = [];
302 1
        while ($row = $statement->fetch()) {
303 1
            $setIds[] = intval($row['set_id']);
304
        }
305
306 1
        return $setIds;
307
    }
308
309
    /**
310
     * Get total queue entries by configuration
311
     *
312
     * @param array $setIds
313
     *
314
     * @return array totals by configuration (keys)
315
     */
316 1
    public function getTotalQueueEntriesByConfiguration(array $setIds): array
317
    {
318 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
319 1
        $totals = [];
320 1
        if (count($setIds) > 0) {
321
            $statement = $queryBuilder
322 1
                ->from($this->tableName)
323 1
                ->selectLiteral('count(*) as c')
324 1
                ->addSelect('configuration')
325 1
                ->where(
326 1
                    $queryBuilder->expr()->in('set_id', implode(',', $setIds)),
327 1
                    $queryBuilder->expr()->lt('scheduled', time())
328
                )
329 1
                ->groupBy('configuration')
330 1
                ->execute();
331
332 1
            while ($row = $statement->fetch()) {
333 1
                $totals[$row['configuration']] = $row['c'];
334
            }
335
        }
336
337 1
        return $totals;
338
    }
339
340
    /**
341
     * Get the timestamps of the last processed entries
342
     *
343
     * @param int $limit
344
     *
345
     * @return array
346
     */
347 1
    public function getLastProcessedEntriesTimestamps($limit = 100): array
348
    {
349 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
350
        $statement = $queryBuilder
351 1
            ->select('exec_time')
352 1
            ->from($this->tableName)
353 1
            ->addOrderBy('exec_time', 'desc')
354 1
            ->setMaxResults($limit)
355 1
            ->execute();
356
357 1
        $rows = [];
358 1
        while ($row = $statement->fetch()) {
359 1
            $rows[] = $row['exec_time'];
360
        }
361
362 1
        return $rows;
363
    }
364
365
    /**
366
     * Get the last processed entries
367
     *
368
     * @param int $limit
369
     *
370
     * @return array
371
     */
372 1
    public function getLastProcessedEntries($limit = 100): array
373
    {
374 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
375
        $statement = $queryBuilder
376 1
            ->from($this->tableName)
377 1
            ->select('*')
378 1
            ->orderBy('exec_time', 'desc')
379 1
            ->setMaxResults($limit)
380 1
            ->execute();
381
382 1
        $rows = [];
383 1
        while (($row = $statement->fetch()) !== false) {
384 1
            $rows[] = $row;
385
        }
386
387 1
        return $rows;
388
    }
389
390
    /**
391
     * Get performance statistics data
392
     *
393
     * @param int $start timestamp
394
     * @param int $end timestamp
395
     *
396
     * @return array performance data
397
     */
398 1
    public function getPerformanceData($start, $end): array
399
    {
400 1
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
401
        $statement = $queryBuilder
402 1
            ->from($this->tableName)
403 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
404 1
            ->addSelect('process_id_completed')
405 1
            ->where(
406 1
                $queryBuilder->expr()->neq('exec_time', 0),
407 1
                $queryBuilder->expr()->gte('exec_time', $queryBuilder->createNamedParameter($start, \PDO::PARAM_INT)),
408 1
                $queryBuilder->expr()->lte('exec_time', $queryBuilder->createNamedParameter($end, \PDO::PARAM_INT))
409
            )
410 1
            ->groupBy('process_id_completed')
411 1
            ->execute();
412
413 1
        $rows = [];
414 1
        while ($row = $statement->fetch()) {
415 1
            $rows[$row['process_id_completed']] = $row;
416
        }
417
418 1
        return $rows;
419
    }
420
421
    /**
422
     * Determines if a page is queued
423
     *
424
     * @param $uid
425
     * @param bool $unprocessed_only
426
     * @param bool $timed_only
427
     * @param bool $timestamp
428
     *
429
     * @return bool
430
     */
431 6
    public function isPageInQueue($uid, $unprocessed_only = true, $timed_only = false, $timestamp = false): bool
432
    {
433 6
        if (!MathUtility::canBeInterpretedAsInteger($uid)) {
434 1
            throw new \InvalidArgumentException('Invalid parameter type', 1468931945);
435
        }
436
437 5
        $isPageInQueue = false;
438
439 5
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
440
        $statement = $queryBuilder
441 5
            ->from($this->tableName)
442 5
            ->count('*')
443 5
            ->where(
444 5
                $queryBuilder->expr()->eq('page_id', $queryBuilder->createNamedParameter($uid, \PDO::PARAM_INT))
445
            );
446
447 5
        if (false !== $unprocessed_only) {
448 2
            $statement->andWhere(
449 2
                $queryBuilder->expr()->eq('exec_time', 0)
450
            );
451
        }
452
453 5
        if (false !== $timed_only) {
454 1
            $statement->andWhere(
455 1
                $queryBuilder->expr()->neq('scheduled', 0)
456
            );
457
        }
458
459 5
        if (false !== $timestamp) {
460 1
            $statement->andWhere(
461 1
                $queryBuilder->expr()->eq('scheduled', $queryBuilder->createNamedParameter($timestamp, \PDO::PARAM_INT))
462
            );
463
        }
464
465
        // TODO: Currently it's not working if page doesn't exists. See tests
466
        $count = $statement
467 5
            ->execute()
468 5
            ->fetchColumn(0);
469
470 5
        if (false !== $count && $count > 0) {
471 4
            $isPageInQueue = true;
472
        }
473
474 5
        return $isPageInQueue;
475
    }
476
477
    /**
478
     * Method to check if a page is in the queue which is timed for a
479
     * date when it should be crawled
480
     *
481
     * @param int $uid uid of the page
482
     * @param boolean $show_unprocessed only respect unprocessed pages
483
     *
484
     * @return bool
485
     *
486
     */
487 1
    public function isPageInQueueTimed($uid, $show_unprocessed = true): bool
488
    {
489 1
        $uid = intval($uid);
490 1
        return $this->isPageInQueue($uid, $show_unprocessed);
491
    }
492
493
    /**
494
     * @return array
495
     */
496
    public function getAvailableSets(): array
497
    {
498
        $queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
499
        $statement = $queryBuilder
500
            ->selectLiteral('count(*) as count_value')
501
            ->addSelect('set_id', 'scheduled')
502
            ->from($this->tableName)
503
            ->orderBy('scheduled', 'desc')
504
            ->groupBy('set_id', 'scheduled')
505
            ->execute();
506
507
        $rows = [];
508
        while ($row = $statement->fetch()) {
509
            $rows[] = $row;
510
        }
511
512
        return $rows;
513
    }
514
}
515