1 | <?php |
||
36 | class QueueRepository extends AbstractRepository |
||
37 | { |
||
38 | /** |
||
39 | * @var string |
||
40 | */ |
||
41 | protected $tableName = 'tx_crawler_queue'; |
||
42 | |||
43 | /** |
||
44 | * This method is used to find the youngest entry for a given process. |
||
45 | * |
||
46 | * @param Process $process |
||
47 | * |
||
48 | * @return Queue $entry |
||
49 | */ |
||
50 | 1 | public function findYoungestEntryForProcess(Process $process) |
|
51 | { |
||
52 | 1 | return $this->getFirstOrLastObjectByProcess($process, 'exec_time ASC'); |
|
53 | } |
||
54 | |||
55 | /** |
||
56 | * This method is used to find the oldest entry for a given process. |
||
57 | * |
||
58 | * @param Process $process |
||
59 | * |
||
60 | * @return Queue |
||
61 | */ |
||
62 | 1 | public function findOldestEntryForProcess(Process $process) |
|
63 | { |
||
64 | 1 | return $this->getFirstOrLastObjectByProcess($process, 'exec_time DESC'); |
|
65 | } |
||
66 | |||
67 | /** |
||
68 | * This internal helper method is used to create an instance of an entry object |
||
69 | * |
||
70 | * @param Process $process |
||
71 | * @param string $orderby first matching item will be returned as object |
||
72 | * |
||
73 | * @return Queue |
||
74 | */ |
||
75 | 5 | protected function getFirstOrLastObjectByProcess($process, $orderby) |
|
76 | { |
||
77 | 5 | $db = $this->getDB(); |
|
78 | 5 | $where = 'process_id_completed=' . $db->fullQuoteStr($process->getProcess_id(), $this->tableName) . |
|
79 | 5 | ' AND exec_time > 0 '; |
|
80 | 5 | $limit = 1; |
|
81 | 5 | $groupby = ''; |
|
82 | |||
83 | 5 | $res = $db->exec_SELECTgetRows('*', 'tx_crawler_queue', $where, $groupby, $orderby, $limit); |
|
84 | 5 | if ($res) { |
|
85 | 4 | $first = $res[0]; |
|
86 | } else { |
||
87 | 1 | $first = []; |
|
88 | } |
||
89 | 5 | $resultObject = new Queue($first); |
|
90 | |||
91 | 5 | return $resultObject; |
|
92 | } |
||
93 | |||
94 | /** |
||
95 | * Counts all executed items of a process. |
||
96 | * |
||
97 | * @param Process $process |
||
98 | * |
||
99 | * @return int |
||
100 | */ |
||
101 | 1 | public function countExecutedItemsByProcess($process) |
|
102 | { |
||
103 | 1 | return $this->countItemsByWhereClause('exec_time > 0 AND process_id_completed = ' . $this->getDB()->fullQuoteStr( |
|
104 | 1 | $process->getProcess_id(), |
|
105 | 1 | $this->tableName |
|
106 | )); |
||
107 | } |
||
108 | |||
109 | /** |
||
110 | * Counts items of a process which yet have not been processed/executed |
||
111 | * |
||
112 | * @param Process $process |
||
113 | * |
||
114 | * @return int |
||
115 | */ |
||
116 | 1 | public function countNonExecutedItemsByProcess($process) |
|
117 | { |
||
118 | 1 | return $this->countItemsByWhereClause('exec_time = 0 AND process_id = ' . $this->getDB()->fullQuoteStr( |
|
119 | 1 | $process->getProcess_id(), |
|
120 | 1 | $this->tableName |
|
121 | )); |
||
122 | } |
||
123 | |||
124 | /** |
||
125 | * This method can be used to count all queue entrys which are |
||
126 | * scheduled for now or a earlier date. |
||
127 | * |
||
128 | * @return int |
||
129 | */ |
||
130 | 1 | public function countAllPendingItems() |
|
131 | { |
||
132 | 1 | return $this->countItemsByWhereClause('exec_time = 0 AND scheduled < ' . time()); |
|
133 | } |
||
134 | |||
135 | /** |
||
136 | * This method can be used to count all queue entrys which are |
||
137 | * scheduled for now or a earlier date and are assigned to a process. |
||
138 | * |
||
139 | * @return int |
||
140 | */ |
||
141 | 1 | public function countAllAssignedPendingItems() |
|
142 | { |
||
143 | 1 | return $this->countItemsByWhereClause("exec_time = 0 AND scheduled < " . time() . " AND process_id != ''"); |
|
144 | } |
||
145 | |||
146 | /** |
||
147 | * This method can be used to count all queue entrys which are |
||
148 | * scheduled for now or a earlier date and are not assigned to a process. |
||
149 | * |
||
150 | * @return int |
||
151 | */ |
||
152 | 1 | public function countAllUnassignedPendingItems() |
|
153 | { |
||
154 | 1 | return $this->countItemsByWhereClause("exec_time = 0 AND scheduled < " . time() . " AND process_id = ''"); |
|
155 | } |
||
156 | |||
157 | /** |
||
158 | * Internal method to count items by a given where clause |
||
159 | * |
||
160 | * @param string $where |
||
161 | * |
||
162 | * @return mixed |
||
163 | */ |
||
164 | 7 | protected function countItemsByWhereClause($where) |
|
165 | { |
||
166 | 7 | $db = $this->getDB(); |
|
167 | 7 | $rs = $db->exec_SELECTquery('count(*) as anz', $this->tableName, $where); |
|
168 | 7 | $res = $db->sql_fetch_assoc($rs); |
|
169 | |||
170 | 7 | return $res['anz']; |
|
171 | } |
||
172 | |||
173 | /** |
||
174 | * Count pending queue entries grouped by configuration key |
||
175 | * |
||
176 | * @return array |
||
177 | */ |
||
178 | 1 | public function countPendingItemsGroupedByConfigurationKey() |
|
179 | { |
||
180 | 1 | $db = $this->getDB(); |
|
181 | 1 | $res = $db->exec_SELECTquery( |
|
182 | 1 | "configuration, count(*) as unprocessed, sum(process_id != '') as assignedButUnprocessed", |
|
183 | 1 | $this->tableName, |
|
184 | 1 | 'exec_time = 0 AND scheduled < ' . time(), |
|
185 | 1 | 'configuration' |
|
186 | ); |
||
187 | 1 | $rows = []; |
|
188 | 1 | while ($row = $db->sql_fetch_assoc($res)) { |
|
189 | 1 | $rows[] = $row; |
|
190 | } |
||
191 | |||
192 | 1 | return $rows; |
|
193 | } |
||
194 | |||
195 | /** |
||
196 | * Get set id with unprocessed entries |
||
197 | * |
||
198 | * @param void |
||
199 | * |
||
200 | * @return array array of set ids |
||
201 | */ |
||
202 | 1 | public function getSetIdWithUnprocessedEntries() |
|
203 | { |
||
204 | 1 | $db = $this->getDB(); |
|
205 | 1 | $res = $db->exec_SELECTquery( |
|
206 | 1 | 'set_id', |
|
207 | 1 | $this->tableName, |
|
208 | 1 | 'exec_time = 0 AND scheduled < ' . time(), |
|
209 | 1 | 'set_id' |
|
210 | ); |
||
211 | 1 | $setIds = []; |
|
212 | 1 | while ($row = $db->sql_fetch_assoc($res)) { |
|
213 | 1 | $setIds[] = intval($row['set_id']); |
|
214 | } |
||
215 | |||
216 | 1 | return $setIds; |
|
217 | } |
||
218 | |||
219 | /** |
||
220 | * Get total queue entries by configuration |
||
221 | * |
||
222 | * @param array $setIds |
||
223 | * |
||
224 | * @return array totals by configuration (keys) |
||
225 | */ |
||
226 | 1 | public function getTotalQueueEntriesByConfiguration(array $setIds) |
|
227 | { |
||
228 | 1 | $totals = []; |
|
229 | 1 | if (count($setIds) > 0) { |
|
230 | 1 | $db = $this->getDB(); |
|
231 | 1 | $res = $db->exec_SELECTquery( |
|
232 | 1 | 'configuration, count(*) as c', |
|
233 | 1 | $this->tableName, |
|
234 | 1 | 'set_id in (' . implode(',', $setIds) . ') AND scheduled < ' . time(), |
|
235 | 1 | 'configuration' |
|
236 | ); |
||
237 | 1 | while ($row = $db->sql_fetch_assoc($res)) { |
|
238 | 1 | $totals[$row['configuration']] = $row['c']; |
|
239 | } |
||
240 | } |
||
241 | |||
242 | 1 | return $totals; |
|
243 | } |
||
244 | |||
245 | /** |
||
246 | * Get the timestamps of the last processed entries |
||
247 | * |
||
248 | * @param int $limit |
||
249 | * |
||
250 | * @return array |
||
251 | */ |
||
252 | 1 | public function getLastProcessedEntriesTimestamps($limit = 100) |
|
253 | { |
||
254 | 1 | $db = $this->getDB(); |
|
255 | 1 | $res = $db->exec_SELECTquery( |
|
256 | 1 | 'exec_time', |
|
257 | 1 | $this->tableName, |
|
258 | 1 | '', |
|
259 | 1 | '', |
|
260 | 1 | 'exec_time desc', |
|
261 | 1 | $limit |
|
262 | ); |
||
263 | |||
264 | 1 | $rows = []; |
|
265 | 1 | while (($row = $db->sql_fetch_assoc($res)) !== false) { |
|
266 | 1 | $rows[] = $row['exec_time']; |
|
267 | } |
||
268 | |||
269 | 1 | return $rows; |
|
270 | } |
||
271 | |||
272 | /** |
||
273 | * Get the last processed entries |
||
274 | * |
||
275 | * @param string $selectFields |
||
276 | * @param int $limit |
||
277 | * |
||
278 | * @return array |
||
279 | */ |
||
280 | 1 | public function getLastProcessedEntries($selectFields = '*', $limit = 100) |
|
281 | { |
||
282 | 1 | $db = $this->getDB(); |
|
283 | 1 | $res = $db->exec_SELECTquery( |
|
284 | 1 | $selectFields, |
|
285 | 1 | $this->tableName, |
|
286 | 1 | '', |
|
287 | 1 | '', |
|
288 | 1 | 'exec_time desc', |
|
289 | 1 | $limit |
|
290 | ); |
||
291 | |||
292 | 1 | $rows = []; |
|
293 | 1 | while (($row = $db->sql_fetch_assoc($res)) !== false) { |
|
294 | 1 | $rows[] = $row; |
|
295 | } |
||
296 | |||
297 | 1 | return $rows; |
|
298 | } |
||
299 | |||
300 | /** |
||
301 | * Get performance statistics data |
||
302 | * |
||
303 | * @param int $start timestamp |
||
304 | * @param int $end timestamp |
||
305 | * |
||
306 | * @return array performance data |
||
307 | */ |
||
308 | 1 | public function getPerformanceData($start, $end) |
|
309 | { |
||
310 | 1 | $db = $this->getDB(); |
|
311 | 1 | $res = $db->exec_SELECTquery( |
|
312 | 1 | 'process_id_completed, min(exec_time) as start, max(exec_time) as end, count(*) as urlcount', |
|
313 | 1 | $this->tableName, |
|
314 | 1 | 'exec_time != 0 and exec_time >= ' . intval($start) . ' and exec_time <= ' . intval($end), |
|
315 | 1 | 'process_id_completed' |
|
316 | ); |
||
317 | |||
318 | 1 | $rows = []; |
|
319 | 1 | while (($row = $db->sql_fetch_assoc($res)) !== false) { |
|
320 | 1 | $rows[$row['process_id_completed']] = $row; |
|
321 | } |
||
322 | |||
323 | 1 | return $rows; |
|
324 | } |
||
325 | |||
326 | /** |
||
327 | * This method is used to count all processes in the process table. |
||
328 | * |
||
329 | * @param string $where Where clause |
||
330 | * |
||
331 | * @return integer |
||
332 | */ |
||
333 | 8 | public function countAll($where = '1 = 1') |
|
337 | } |
||
338 |