Completed
Push — typo3v9 ( b7e311...b919e2 )
by Tomas Norre
09:03 queued 07:35
created

QueueRepository   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 482
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 74.63%

Importance

Changes 0
Metric Value
dl 0
loc 482
ccs 206
cts 276
cp 0.7463
rs 9.6
c 0
b 0
f 0
wmc 35
lcom 1
cbo 3

21 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A unsetQueueProcessId() 0 10 1
A findYoungestEntryForProcess() 0 4 1
A findOldestEntryForProcess() 0 4 1
A getFirstOrLastObjectByProcess() 0 20 2
A countExecutedItemsByProcess() 0 14 1
A countNonExecutedItemsByProcess() 0 14 1
A getUnprocessedItems() 0 12 1
A countUnprocessedItems() 0 4 1
A countAllPendingItems() 0 15 1
A countAllAssignedPendingItems() 0 15 1
A countAllUnassignedPendingItems() 0 15 1
A countPendingItemsGroupedByConfigurationKey() 0 15 1
A getSetIdWithUnprocessedEntries() 0 19 2
A getTotalQueueEntriesByConfiguration() 0 30 3
A getLastProcessedEntriesTimestamps() 0 16 2
A getLastProcessedEntries() 0 16 2
A getPerformanceData() 0 21 2
B isPageInQueue() 0 44 7
A isPageInQueueTimed() 0 5 1
A getAvailableSets() 0 17 2
1
<?php
2
namespace AOE\Crawler\Domain\Repository;
3
4
/***************************************************************
5
 *  Copyright notice
6
 *
7
 *  (c) 2017 AOE GmbH <[email protected]>
8
 *
9
 *  All rights reserved
10
 *
11
 *  This script is part of the TYPO3 project. The TYPO3 project is
12
 *  free software; you can redistribute it and/or modify
13
 *  it under the terms of the GNU General Public License as published by
14
 *  the Free Software Foundation; either version 3 of the License, or
15
 *  (at your option) any later version.
16
 *
17
 *  The GNU General Public License can be found at
18
 *  http://www.gnu.org/copyleft/gpl.html.
19
 *
20
 *  This script is distributed in the hope that it will be useful,
21
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
22
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23
 *  GNU General Public License for more details.
24
 *
25
 *  This copyright notice MUST APPEAR in all copies of the script!
26
 ***************************************************************/
27
28
use AOE\Crawler\Domain\Model\Process;
29
use AOE\Crawler\Domain\Model\Queue;
30
use TYPO3\CMS\Core\Database\ConnectionPool;
31
use TYPO3\CMS\Core\Database\Query\QueryBuilder;
32
use TYPO3\CMS\Core\Utility\GeneralUtility;
33
use TYPO3\CMS\Core\Utility\MathUtility;
34
35
/**
36
 * Class QueueRepository
37
 *
38
 * @package AOE\Crawler\Domain\Repository
39
 */
40
class QueueRepository extends AbstractRepository
41
{
42
    /**
43
     * @var string
44
     */
45
    protected $tableName = 'tx_crawler_queue';
46
47
    /**
48
     * @var QueryBuilder
49
     */
50
    protected $queryBuilder = QueryBuilder::class;
51
52
    /**
53
     * QueueRepository constructor.
54
     */
55 65
    public function __construct()
56
    {
57 65
        $this->queryBuilder = GeneralUtility::makeInstance(ConnectionPool::class)->getQueryBuilderForTable($this->tableName);
58 65
    }
59
60
    /**
61
     * @param $processId
62
     */
63 3
    public function unsetQueueProcessId($processId)
64
    {
65 3
        $this->queryBuilder
66 3
            ->update($this->tableName)
67 3
            ->where(
68 3
                $this->queryBuilder->expr()->eq('process_id', $this->queryBuilder->createNamedParameter($processId))
69
            )
70 3
            ->set('process_id', '')
71 3
            ->execute();
72 3
    }
73
74
    /**
75
     * This method is used to find the youngest entry for a given process.
76
     *
77
     * @param Process $process
78
     *
79
     * @return Queue $entry
80
     */
81 1
    public function findYoungestEntryForProcess(Process $process)
82
    {
83 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time');
84
    }
85
86
    /**
87
     * This method is used to find the oldest entry for a given process.
88
     *
89
     * @param Process $process
90
     *
91
     * @return Queue
92
     */
93 1
    public function findOldestEntryForProcess(Process $process)
94
    {
95 1
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time', 'DESC');
96
    }
97
98
    /**
99
     * This internal helper method is used to create an instance of an entry object
100
     *
101
     * @param Process $process
102
     * @param string $orderByField first matching item will be returned as object
103
     * @param string $orderBySorting sorting direction
104
     *
105
     * @return Queue
106
     */
107 4
    protected function getFirstOrLastObjectByProcess($process, $orderByField, $orderBySorting = 'ASC')
108
    {
109 4
        $resultObject = new \stdClass();
0 ignored issues
show
Bug Compatibility introduced by
The expression new \stdClass(); of type stdClass adds the type stdClass to the return on line 125 which is incompatible with the return type documented by AOE\Crawler\Domain\Repos...stOrLastObjectByProcess of type AOE\Crawler\Domain\Model\Queue.
Loading history...
110 4
        $first = $this->queryBuilder
111 4
            ->select('*')
112 4
            ->from($this->tableName)
113 4
            ->where(
114 4
                $this->queryBuilder->expr()->eq('process_id_completed', $this->queryBuilder->createNamedParameter($process->getProcess_id())),
115 4
                $this->queryBuilder->expr()->gt('exec_time', 0)
116
            )
117 4
            ->setMaxResults(1)
118 4
            ->addOrderBy($orderByField, $orderBySorting)
119 4
            ->execute();
120
121 4
        while ($row = $first->fetch()) {
122 4
            $resultObject = new Queue($row);
123
        }
124
125 4
        return $resultObject;
126
    }
127
128
    /**
129
     * Counts all executed items of a process.
130
     *
131
     * @param Process $process
132
     *
133
     * @return int
134
     */
135 1
    public function countExecutedItemsByProcess($process)
136
    {
137 1
        $count = $this->queryBuilder
138 1
            ->count('*')
139 1
            ->from($this->tableName)
140 1
            ->where(
141 1
                $this->queryBuilder->expr()->eq('process_id_completed', $this->queryBuilder->createNamedParameter($process->getProcess_id())),
142 1
                $this->queryBuilder->expr()->gt('exec_time', 0)
143
            )
144 1
            ->execute()
145 1
            ->fetchColumn(0);
146
147 1
        return $count;
148
    }
149
150
    /**
151
     * Counts items of a process which yet have not been processed/executed
152
     *
153
     * @param Process $process
154
     *
155
     * @return int
156
     */
157 1
    public function countNonExecutedItemsByProcess($process)
158
    {
159 1
        $count = $this->queryBuilder
160 1
            ->count('*')
161 1
            ->from($this->tableName)
162 1
            ->where(
163 1
                $this->queryBuilder->expr()->eq('process_id', $this->queryBuilder->createNamedParameter($process->getProcess_id())),
164 1
                $this->queryBuilder->expr()->eq('exec_time', 0)
165
            )
166 1
            ->execute()
167 1
            ->fetchColumn(0);
168
169 1
        return $count;
170
    }
171
172
    /**
173
     * get items which have not been processed yet
174
     *
175
     * @return array
176
     */
177 4
    public function getUnprocessedItems()
178
    {
179 4
        $unprocessedItems = $this->queryBuilder
180 4
            ->select('*')
181 4
            ->from($this->tableName)
182 4
            ->where(
183 4
                $this->queryBuilder->expr()->eq('exec_time', 0)
184
            )
185 4
            ->execute()->fetchAll();
186
187 4
        return $unprocessedItems;
188
    }
189
190
    /**
191
     * Count items which have not been processed yet
192
     *
193
     * @return int
194
     */
195 3
    public function countUnprocessedItems()
196
    {
197 3
        return count($this->getUnprocessedItems());
198
    }
199
200
    /**
201
     * This method can be used to count all queue entrys which are
202
     * scheduled for now or a earlier date.
203
     *
204
     * @return int
205
     */
206 2
    public function countAllPendingItems()
207
    {
208 2
        $count = $this->queryBuilder
209 2
            ->count('*')
210 2
            ->from($this->tableName)
211 2
            ->where(
212 2
                $this->queryBuilder->expr()->eq('process_scheduled', 0),
213 2
                $this->queryBuilder->expr()->eq('exec_time', 0),
214 2
                $this->queryBuilder->expr()->lte('scheduled', time())
215
            )
216 2
            ->execute()
217 2
            ->fetchColumn(0);
218
219 2
        return $count;
220
    }
221
222
    /**
223
     * This method can be used to count all queue entrys which are
224
     * scheduled for now or a earlier date and are assigned to a process.
225
     *
226
     * @return int
227
     */
228 2
    public function countAllAssignedPendingItems()
229
    {
230 2
        $count = $this->queryBuilder
231 2
            ->count('*')
232 2
            ->from($this->tableName)
233 2
            ->where(
234 2
                $this->queryBuilder->expr()->neq('process_id', '""'),
235 2
                $this->queryBuilder->expr()->eq('exec_time', 0),
236 2
                $this->queryBuilder->expr()->lte('scheduled', time())
237
            )
238 2
            ->execute()
239 2
            ->fetchColumn(0);
240
241 2
        return $count;
242
    }
243
244
    /**
245
     * This method can be used to count all queue entrys which are
246
     * scheduled for now or a earlier date and are not assigned to a process.
247
     *
248
     * @return int
249
     */
250 1
    public function countAllUnassignedPendingItems()
251
    {
252 1
        $count = $this->queryBuilder
253 1
            ->count('*')
254 1
            ->from($this->tableName)
255 1
            ->where(
256 1
                $this->queryBuilder->expr()->eq('process_id', '""'),
257 1
                $this->queryBuilder->expr()->eq('exec_time', 0),
258 1
                $this->queryBuilder->expr()->lte('scheduled', time())
259
            )
260 1
            ->execute()
261 1
            ->fetchColumn(0);
262
263 1
        return $count;
264
    }
265
266
    /**
267
     * Count pending queue entries grouped by configuration key
268
     *
269
     * @return array
270
     */
271 1
    public function countPendingItemsGroupedByConfigurationKey()
272
    {
273 1
        $statement = $this->queryBuilder
274 1
            ->from($this->tableName)
275 1
            ->selectLiteral('count(*) as unprocessed', 'sum(process_id != \'\') as assignedButUnprocessed')
276 1
            ->addSelect('configuration')
277 1
            ->where(
278 1
                $this->queryBuilder->expr()->eq('exec_time', 0),
279 1
                $this->queryBuilder->expr()->lt('scheduled', time())
280
            )
281 1
            ->groupBy('configuration')
282 1
            ->execute();
283
284 1
        return $statement->fetchAll();
285
    }
286
287
    /**
288
     * Get set id with unprocessed entries
289
     *
290
     * @param void
291
     *
292
     * @return array array of set ids
293
     */
294 1
    public function getSetIdWithUnprocessedEntries()
295
    {
296 1
        $statement = $this->queryBuilder
297 1
            ->select('set_id')
298 1
            ->from($this->tableName)
299 1
            ->where(
300 1
                $this->queryBuilder->expr()->lt('scheduled', time()),
301 1
                $this->queryBuilder->expr()->eq('exec_time', 0)
302
            )
303 1
            ->addGroupBy('set_id')
304 1
            ->execute();
305
306 1
        $setIds = [];
307 1
        while ($row = $statement->fetch()) {
308 1
            $setIds[] = intval($row['set_id']);
309
        }
310
311 1
        return $setIds;
312
    }
313
314
    /**
315
     * Get total queue entries by configuration
316
     *
317
     * @param array $setIds
318
     *
319
     * @return array totals by configuration (keys)
320
     */
321 1
    public function getTotalQueueEntriesByConfiguration(array $setIds)
322
    {
323 1
        $totals = [];
324 1
        if (count($setIds) > 0) {
325 1
            $statement = $this->queryBuilder
326 1
                ->from($this->tableName)
327 1
                ->selectLiteral('count(*) as c')
328 1
                ->addSelect('configuration')
329 1
                ->where(
330 1
                    $this->queryBuilder->expr()->in('set_id', implode(',', $setIds)),
331 1
                    $this->queryBuilder->expr()->lt('scheduled', time())
332
                )
333 1
                ->groupBy('configuration')
334 1
                ->execute();
335
            /*
336
            $db = $this->getDB();
337
            $res = $db->exec_SELECTquery(
338
                'configuration, count(*) as c',
339
                $this->tableName,
340
                'set_id in (' . implode(',', $setIds) . ') AND scheduled < ' . time(),
341
                'configuration'
342
            );
343
            */
344 1
            while ($row = $statement->fetch()) {
345 1
                $totals[$row['configuration']] = $row['c'];
346
            }
347
        }
348
349 1
        return $totals;
350
    }
351
352
    /**
353
     * Get the timestamps of the last processed entries
354
     *
355
     * @param int $limit
356
     *
357
     * @return array
358
     */
359 1
    public function getLastProcessedEntriesTimestamps($limit = 100)
360
    {
361 1
        $statement = $this->queryBuilder
362 1
            ->select('exec_time')
363 1
            ->from($this->tableName)
364 1
            ->addOrderBy('exec_time', 'desc')
365 1
            ->setMaxResults($limit)
366 1
            ->execute();
367
368 1
        $rows = [];
369 1
        while ($row = $statement->fetch()) {
370 1
            $rows[] = $row['exec_time'];
371
        }
372
373 1
        return $rows;
374
    }
375
376
    /**
377
     * Get the last processed entries
378
     *
379
     * @param int $limit
380
     *
381
     * @return array
382
     */
383 1
    public function getLastProcessedEntries($limit = 100)
384
    {
385 1
        $statement = $this->queryBuilder
386 1
            ->from($this->tableName)
387 1
            ->select('*')
388 1
            ->orderBy('exec_time', 'desc')
389 1
            ->setMaxResults($limit)
390 1
            ->execute();
391
392 1
        $rows = [];
393 1
        while (($row = $statement->fetch()) !== false) {
394 1
            $rows[] = $row;
395
        }
396
397 1
        return $rows;
398
    }
399
400
    /**
401
     * Get performance statistics data
402
     *
403
     * @param int $start timestamp
404
     * @param int $end timestamp
405
     *
406
     * @return array performance data
407
     */
408 1
    public function getPerformanceData($start, $end)
409
    {
410 1
        $statement = $this->queryBuilder
411 1
            ->from($this->tableName)
412 1
            ->selectLiteral('min(exec_time) as start', 'max(exec_time) as end', 'count(*) as urlcount')
413 1
            ->addSelect('process_id_completed')
414 1
            ->where(
415 1
                $this->queryBuilder->expr()->neq('exec_time', 0),
416 1
                $this->queryBuilder->expr()->gte('exec_time', $this->queryBuilder->createNamedParameter($start)),
417 1
                $this->queryBuilder->expr()->lte('exec_time', $this->queryBuilder->createNamedParameter($end))
418
            )
419 1
            ->groupBy('process_id_completed')
420 1
            ->execute();
421
422 1
        $rows = [];
423 1
        while ($row = $statement->fetch()) {
424 1
            $rows[$row['process_id_completed']] = $row;
425
        }
426
427 1
        return $rows;
428
    }
429
430
    /**
431
     * Determines if a page is queued
432
     *
433
     * @param $uid
434
     * @param bool $unprocessed_only
435
     * @param bool $timed_only
436
     * @param bool $timestamp
437
     *
438
     * @return bool
439
     */
440 5
    public function isPageInQueue($uid, $unprocessed_only = true, $timed_only = false, $timestamp = false)
441
    {
442 5
        if (!MathUtility::canBeInterpretedAsInteger($uid)) {
443 1
            throw new \InvalidArgumentException('Invalid parameter type', 1468931945);
444
        }
445
446 4
        $isPageInQueue = false;
447
448 4
        $statement = $this->queryBuilder
449 4
            ->from($this->tableName)
450 4
            ->count('*')
451 4
            ->where(
452 4
                $this->queryBuilder->expr()->eq('page_id', $this->queryBuilder->createNamedParameter($uid))
453
            );
454
455 4
        if (false !== $unprocessed_only) {
456 2
            $statement->andWhere(
457 2
                $this->queryBuilder->expr()->eq('exec_time', 0)
458
            );
459
        }
460
461 4
        if (false !== $timed_only) {
462 1
            $statement->andWhere(
463 1
                $this->queryBuilder->expr()->neq('scheduled', 0)
464
            );
465
        }
466
467 4
        if (false !== $timestamp) {
468 1
            $statement->andWhere(
469 1
                $this->queryBuilder->expr()->eq('scheduled', $this->queryBuilder->createNamedParameter($timestamp))
470
            );
471
        }
472
473
        // TODO: Currently it's not working if page doesn't exists. See tests
474
        $statement
475 4
            ->execute()
476 4
            ->fetchColumn(0);
477
478 4
        if (false !== $statement && $statement > 0) {
479 4
            $isPageInQueue = true;
480
        }
481
482 4
        return $isPageInQueue;
483
    }
484
485
    /**
486
     * Method to check if a page is in the queue which is timed for a
487
     * date when it should be crawled
488
     *
489
     * @param int $uid uid of the page
490
     * @param boolean $show_unprocessed only respect unprocessed pages
491
     *
492
     * @return boolean
493
     *
494
     */
495 1
    public function isPageInQueueTimed($uid, $show_unprocessed = true)
496
    {
497 1
        $uid = intval($uid);
498 1
        return $this->isPageInQueue($uid, $show_unprocessed);
499
    }
500
501
    /**
502
     * @return array
503
     */
504 1
    public function getAvailableSets()
505
    {
506 1
        $statement = $this->queryBuilder
507 1
            ->selectLiteral('count(*) as count_value')
508 1
            ->addSelect('set_id', 'scheduled')
509 1
            ->from($this->tableName)
510 1
            ->orderBy('scheduled', 'desc')
511 1
            ->groupBy('set_id', 'scheduled')
512 1
            ->execute();
513
514 1
        $rows = [];
515 1
        while ($row = $statement->fetch()) {
516 1
            $rows[] = $row;
517
        }
518
519 1
        return $rows;
520
    }
521
}
522