Completed
Push — dependencies/composer ( 599eac...16ed26 )
by Christian
16:22
created

QueueRepository::countUnprocessedItems()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
nc 1
nop 0
dl 0
loc 4
ccs 0
cts 4
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
namespace AOE\Crawler\Domain\Repository;
3
4
/***************************************************************
5
 *  Copyright notice
6
 *
7
 *  (c) 2017 AOE GmbH <[email protected]>
8
 *
9
 *  All rights reserved
10
 *
11
 *  This script is part of the TYPO3 project. The TYPO3 project is
12
 *  free software; you can redistribute it and/or modify
13
 *  it under the terms of the GNU General Public License as published by
14
 *  the Free Software Foundation; either version 3 of the License, or
15
 *  (at your option) any later version.
16
 *
17
 *  The GNU General Public License can be found at
18
 *  http://www.gnu.org/copyleft/gpl.html.
19
 *
20
 *  This script is distributed in the hope that it will be useful,
21
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
22
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23
 *  GNU General Public License for more details.
24
 *
25
 *  This copyright notice MUST APPEAR in all copies of the script!
26
 ***************************************************************/
27
28
use AOE\Crawler\Domain\Model\Process;
29
use AOE\Crawler\Domain\Model\Queue;
30
31
/**
32
 * Class QueueRepository
33
 *
34
 * @package AOE\Crawler\Domain\Repository
35
 */
36
class QueueRepository extends AbstractRepository
37
{
38
    /**
39
     * @var string
40
     */
41
    protected $tableName = 'tx_crawler_queue';
42
43
    /**
44
     * This method is used to find the youngest entry for a given process.
45
     *
46
     * @param Process $process
47
     *
48
     * @return Queue $entry
49
     */
50
    public function findYoungestEntryForProcess(Process $process)
51
    {
52
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time ASC');
53
    }
54
55
    /**
56
     * This method is used to find the oldest entry for a given process.
57
     *
58
     * @param Process $process
59
     *
60
     * @return Queue
61
     */
62
    public function findOldestEntryForProcess(Process $process)
63
    {
64
        return $this->getFirstOrLastObjectByProcess($process, 'exec_time DESC');
65
    }
66
67
    /**
68
     * This internal helper method is used to create an instance of an entry object
69
     *
70
     * @param Process $process
71
     * @param string $orderby first matching item will be returned as object
72
     *
73
     * @return Queue
74
     */
75
    protected function getFirstOrLastObjectByProcess($process, $orderby)
76
    {
77
        $db = $this->getDB();
78
        $where = 'process_id_completed=' . $db->fullQuoteStr($process->getProcess_id(), $this->tableName) .
79
                   ' AND exec_time > 0 ';
80
        $limit = 1;
81
        $groupby = '';
82
83
        $res = $db->exec_SELECTgetRows('*', 'tx_crawler_queue', $where, $groupby, $orderby, $limit);
84
        if ($res) {
85
            $first = $res[0];
86
        } else {
87
            $first = [];
88
        }
89
        $resultObject = new Queue($first);
90
91
        return $resultObject;
92
    }
93
94
    /**
95
     * Counts all executed items of a process.
96
     *
97
     * @param Process $process
98
     *
99
     * @return int
100
     */
101
    public function countExecutedItemsByProcess($process)
102
    {
103
        return $this->countItemsByWhereClause('exec_time > 0 AND process_id_completed = ' . $this->getDB()->fullQuoteStr(
104
            $process->getProcess_id(),
105
                $this->tableName
106
        ));
107
    }
108
109
    /**
110
     * Counts items of a process which yet have not been processed/executed
111
     *
112
     * @param Process $process
113
     *
114
     * @return int
115
     */
116
    public function countNonExecutedItemsByProcess($process)
117
    {
118
        return $this->countItemsByWhereClause('exec_time = 0 AND process_id = ' . $this->getDB()->fullQuoteStr(
119
            $process->getProcess_id(),
120
                $this->tableName
121
        ));
122
    }
123
124
    /**
125
     * Count items which have not been processed yet
126
     * 
127
     * @return int
128
     */
129
    public function countUnprocessedItems()
130
    {
131
        return $this->countItemsByWhereClause("exec_time=0 AND process_scheduled=0 AND scheduled<=" . time());
132
    }
133
134
    /**
135
     * This method can be used to count all queue entrys which are
136
     * scheduled for now or a earlier date.
137
     *
138
     * @return int
139
     */
140
    public function countAllPendingItems()
141
    {
142
        return $this->countItemsByWhereClause('exec_time = 0 AND scheduled < ' . time());
143
    }
144
145
    /**
146
     * This method can be used to count all queue entrys which are
147
     * scheduled for now or a earlier date and are assigned to a process.
148
     *
149
     * @return int
150
     */
151
    public function countAllAssignedPendingItems()
152
    {
153
        return $this->countItemsByWhereClause("exec_time = 0 AND scheduled < " . time() . " AND process_id != ''");
154
    }
155
156
    /**
157
     * This method can be used to count all queue entrys which are
158
     * scheduled for now or a earlier date and are not assigned to a process.
159
     *
160
     * @return int
161
     */
162
    public function countAllUnassignedPendingItems()
163
    {
164
        return $this->countItemsByWhereClause("exec_time = 0 AND scheduled < " . time() . " AND process_id = ''");
165
    }
166
167
    /**
168
     * Internal method to count items by a given where clause
169
     *
170
     * @param string $where
171
     *
172
     * @return mixed
173
     */
174
    protected function countItemsByWhereClause($where)
175
    {
176
        $db = $this->getDB();
177
        $rs = $db->exec_SELECTquery('count(*) as anz', $this->tableName, $where);
178
        $res = $db->sql_fetch_assoc($rs);
179
180
        return $res['anz'];
181
    }
182
183
    /**
184
     * Count pending queue entries grouped by configuration key
185
     *
186
     * @return array
187
     */
188
    public function countPendingItemsGroupedByConfigurationKey()
189
    {
190
        $db = $this->getDB();
191
        $res = $db->exec_SELECTquery(
192
            "configuration, count(*) as unprocessed, sum(process_id != '') as assignedButUnprocessed",
193
            $this->tableName,
194
            'exec_time = 0 AND scheduled < ' . time(),
195
            'configuration'
196
        );
197
        $rows = [];
198
        while ($row = $db->sql_fetch_assoc($res)) {
199
            $rows[] = $row;
200
        }
201
202
        return $rows;
203
    }
204
205
    /**
206
     * Get set id with unprocessed entries
207
     *
208
     * @param void
209
     *
210
     * @return array array of set ids
211
     */
212
    public function getSetIdWithUnprocessedEntries()
213
    {
214
        $db = $this->getDB();
215
        $res = $db->exec_SELECTquery(
216
            'set_id',
217
            $this->tableName,
218
            'exec_time = 0 AND scheduled < ' . time(),
219
            'set_id'
220
        );
221
        $setIds = [];
222
        while ($row = $db->sql_fetch_assoc($res)) {
223
            $setIds[] = intval($row['set_id']);
224
        }
225
226
        return $setIds;
227
    }
228
229
    /**
230
     * Get total queue entries by configuration
231
     *
232
     * @param array $setIds
233
     *
234
     * @return array totals by configuration (keys)
235
     */
236
    public function getTotalQueueEntriesByConfiguration(array $setIds)
237
    {
238
        $totals = [];
239
        if (count($setIds) > 0) {
240
            $db = $this->getDB();
241
            $res = $db->exec_SELECTquery(
242
                'configuration, count(*) as c',
243
                $this->tableName,
244
                'set_id in (' . implode(',', $setIds) . ') AND scheduled < ' . time(),
245
                'configuration'
246
            );
247
            while ($row = $db->sql_fetch_assoc($res)) {
248
                $totals[$row['configuration']] = $row['c'];
249
            }
250
        }
251
252
        return $totals;
253
    }
254
255
    /**
256
     * Get the timestamps of the last processed entries
257
     *
258
     * @param int $limit
259
     *
260
     * @return array
261
     */
262
    public function getLastProcessedEntriesTimestamps($limit = 100)
263
    {
264
        $db = $this->getDB();
265
        $res = $db->exec_SELECTquery(
266
            'exec_time',
267
            $this->tableName,
268
            '',
269
            '',
270
            'exec_time desc',
271
            $limit
272
        );
273
274
        $rows = [];
275
        while (($row = $db->sql_fetch_assoc($res)) !== false) {
276
            $rows[] = $row['exec_time'];
277
        }
278
279
        return $rows;
280
    }
281
282
    /**
283
     * Get the last processed entries
284
     *
285
     * @param string $selectFields
286
     * @param int $limit
287
     *
288
     * @return array
289
     */
290
    public function getLastProcessedEntries($selectFields = '*', $limit = 100)
291
    {
292
        $db = $this->getDB();
293
        $res = $db->exec_SELECTquery(
294
            $selectFields,
295
            $this->tableName,
296
            '',
297
            '',
298
            'exec_time desc',
299
            $limit
300
        );
301
302
        $rows = [];
303
        while (($row = $db->sql_fetch_assoc($res)) !== false) {
304
            $rows[] = $row;
305
        }
306
307
        return $rows;
308
    }
309
310
    /**
311
     * Get performance statistics data
312
     *
313
     * @param int $start timestamp
314
     * @param int $end timestamp
315
     *
316
     * @return array performance data
317
     */
318
    public function getPerformanceData($start, $end)
319
    {
320
        $db = $this->getDB();
321
        $res = $db->exec_SELECTquery(
322
            'process_id_completed, min(exec_time) as start, max(exec_time) as end, count(*) as urlcount',
323
            $this->tableName,
324
            'exec_time != 0 and exec_time >= ' . intval($start) . ' and exec_time <= ' . intval($end),
325
            'process_id_completed'
326
        );
327
328
        $rows = [];
329
        while (($row = $db->sql_fetch_assoc($res)) !== false) {
330
            $rows[$row['process_id_completed']] = $row;
331
        }
332
333
        return $rows;
334
    }
335
336
    /**
337
     * This method is used to count all processes in the process table.
338
     *
339
     * @param  string $where Where clause
340
     *
341
     * @return integer
342
     */
343
    public function countAll($where = '1 = 1')
344
    {
345
        return $this->countByWhere($where);
346
    }
347
}
348