Test Failed
Push — develop ( 33b521...cdbd76 )
by Paul
10:16 queued 21s
created

GeolocateReviews::scheduleNextBatchIfNeeded()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 5
c 1
b 0
f 1
dl 0
loc 7
rs 10
cc 2
nc 2
nop 4
1
<?php
2
3
namespace GeminiLabs\SiteReviews\Commands;
4
5
use GeminiLabs\SiteReviews\Database;
6
use GeminiLabs\SiteReviews\Database\Query;
7
use GeminiLabs\SiteReviews\Defaults\StatDefaults;
8
use GeminiLabs\SiteReviews\Geolocation;
9
use GeminiLabs\SiteReviews\Modules\Notice;
10
use GeminiLabs\SiteReviews\Modules\Queue;
11
use GeminiLabs\SiteReviews\Response;
12
13
class GeolocateReviews extends AbstractCommand
14
{
15
    /**
16
     * IP-API batch requests allow a maximum of 100 IPs per request.
17
     */
18
    public const BATCH_SIZE = 100;
19
20
    /**
21
     * Number of rows per insert query.
22
     */
23
    public const INSERT_CHUNK_SIZE = 500;
24
25
    /**
26
     * Transient key for processing lock.
27
     */
28
    public const LOCK_KEY = 'glsr_geolocation_processing_lock';
29
30
    /**
31
     * Key used for the queued action.
32
     */
33
    public const QUEUED_ACTION_KEY = 'queue/geolocation';
34
35
    /**
36
     * Integer number of rows to fetch per database query in generator.
37
     */
38
    public const ROW_BATCH_SIZE = 500;
39
40
    public function handle(): void
41
    {
42
        $this->queue(true);
43
    }
44
45
    /**
46
     * Process a batch of IPs for geolocation data.
47
     *
48
     * Fetches IPs, retrieves geolocation data, inserts stats, and updates post meta.
49
     *
50
     * @param int $offset Offset for IP query
51
     */
52
    public function process(int $offset = 0): void
53
    {
54
        $offset = max(0, $offset);
55
        $ipAddresses = $this->fetchIpsNeedingGeolocation($offset);
56
        if (empty($ipAddresses)) {
57
            glsr_log()->info("Geolocation: No IPs to process at offset {$offset}");
58
            return;
59
        }
60
        $response = $this->fetchRemoteGeolocationData($ipAddresses);
61
        $results = $response->body();
62
        if (empty($results)) {
63
            glsr_log()->warning("Geolocation: No geolocation data retrieved at offset {$offset}");
64
            return;
65
        }
66
        $validResults = $this->filterValidGeolocationResults($results);
67
        if (empty($validResults)) {
68
            glsr_log()->warning("Geolocation: No valid geolocation results at offset {$offset}");
69
            return;
70
        }
71
        $this->processResults($validResults);
72
        $this->scheduleNextBatchIfNeeded($offset, static::BATCH_SIZE, $ipAddresses);
73
    }
74
75
    /**
76
     * Start processing via WP-Cron.
77
     */
78
    public function queue(bool $notify = false): bool
79
    {
80
        if (!glsr(Queue::class)->isPending(static::QUEUED_ACTION_KEY)) {
81
            $this->releaseLock();
82
        }
83
        if (get_transient(static::LOCK_KEY)) { // Prevent concurrent processing
84
            if ($notify) {
85
                glsr(Notice::class)->addWarning(
86
                    _x('Geolocation processing is already in progress.', 'admin-text', 'site-reviews')
87
                );
88
            }
89
            return false;
90
        }
91
        if (!$ipsToProcess = $this->countIpsNeedingGeolocation()) {
92
            if ($notify) {
93
                glsr(Notice::class)->addInfo(
94
                    _x('All valid IP addresses already have been geolocated.', 'admin-text', 'site-reviews')
95
                );
96
            }
97
            return false;
98
        }
99
        $this->lock();
100
        glsr(Queue::class)->once(time(), static::QUEUED_ACTION_KEY, ['offset' => 0], true);
101
        if ($notify) {
102
            glsr(Notice::class)->addSuccess(sprintf(
103
                _x('Successfully queued geolocation processing of %d IP addresses.', 'admin-text', 'site-reviews'),
104
                $ipsToProcess
105
            ));
106
        }
107
        return true;
108
    }
109
110
    public function response(): array
111
    {
112
        return [
113
            'notices' => glsr(Notice::class)->get(),
114
        ];
115
    }
116
117
    protected function countIpsNeedingGeolocation(): int
118
    {
119
        $sql = "
120
            SELECT COUNT(r.ID)
121
            FROM table|ratings AS r
122
            LEFT JOIN table|stats AS s ON (r.ID = s.rating_id)
123
            WHERE 1=1
124
            AND r.ip_address IS NOT NULL
125
            AND r.ip_address != ''
126
            AND r.ip_address != '127.0.0.1'
127
            AND r.ip_address != 'unknown'
128
            AND s.rating_id IS NULL
129
        ";
130
        $query = glsr(Query::class)->sql($sql);
131
        return (int) glsr(Database::class)->dbGetVar($query);
132
    }
133
134
    /**
135
     * @param int $offset Offset for pagination
136
     */
137
    protected function fetchIpsNeedingGeolocation(int $offset): array
138
    {
139
        $sql = "
140
            SELECT DISTINCT r.ip_address
141
            FROM table|ratings AS r
142
            LEFT JOIN table|stats AS s ON (r.ID = s.rating_id)
143
            WHERE 1=1
144
            AND r.ip_address IS NOT NULL
145
            AND r.ip_address != ''
146
            AND r.ip_address != '127.0.0.1'
147
            AND r.ip_address != 'unknown'
148
            AND s.rating_id IS NULL
149
            LIMIT %d OFFSET %d
150
        ";
151
        $query = glsr(Query::class)->sql($sql, static::BATCH_SIZE, $offset);
152
        return glsr(Database::class)->dbGetCol($query);
153
    }
154
155
    /**
156
     * @param string[] $ipAddresses IPs to fetch data for
157
     */
158
    protected function fetchRemoteGeolocationData(array $ipAddresses): Response
159
    {
160
        $response = glsr(Geolocation::class)->batchLookup($ipAddresses);
161
        $remainingRequests = (int) $response->headers['x-rl'];
162
        $resetTime = max((int) $response->headers['x-ttl'], 60); // Min 60 seconds
163
        if (0 === $remainingRequests && $resetTime > 0) {
164
            glsr_log()->warning("Geolocation: Rate limit reached, waiting {$resetTime} seconds");
165
            sleep($resetTime);
166
        } else {
167
            if (422 === $response->code) {
168
                glsr_log()->error('Geolocation: 422 Unprocessable Entity, invalid batch request');
169
            }
170
            if (429 === $response->code) {
171
                glsr_log()->warning("Geolocation: 429 Too Many Requests, waiting {$resetTime} seconds");
172
                sleep($resetTime);
173
            }
174
        }
175
        return $response;
176
    }
177
178
    /**
179
     * @param array $results Geolocation API results
180
     */
181
    protected function filterValidGeolocationResults(array $results): array
182
    {
183
        return array_filter($results, function ($result) {
184
            $query = $result['query'] ?? '';
185
            $status = $result['status'] ?? '';
186
            return 'success' === $status && !empty($query);
187
        });
188
    }
189
190
    protected function lock(int $duration = \HOUR_IN_SECONDS): void
191
    {
192
        set_transient(static::LOCK_KEY, true, $duration);
193
    }
194
195
    /**
196
     * @param \Generator $generator Generator yielding ratings data
197
     * @param array      $results   Valid geolocation results
198
     */
199
    protected function prepareAndInsert(\Generator $generator, array $results): void
200
    {
201
        $data = [];
202
        $postmeta = [];
203
        $postmetaCol = [
204
            'post_id', 'meta_key', 'meta_value',
205
        ];
206
        $statsCol = array_keys(glsr(StatDefaults::class)->defaults());
207
        foreach ($generator as $item) {
208
            $result = current(array_filter($results, fn ($r) => $r['query'] === $item['ip_address']));
209
            $result = glsr(StatDefaults::class)->restrict(
210
                wp_parse_args($item, $result)
211
            );
212
            $data[] = $result;
213
            $postmeta[] = [
214
                'post_id' => $item['review_id'],
215
                'meta_key' => '_geolocation',
216
                'meta_value' => maybe_serialize(array_diff_key($result, ['rating_id' => 0])),
217
            ];
218
            if (count($data) >= static::INSERT_CHUNK_SIZE) {
219
                glsr(Database::class)->insertBulk('stats', $data, $statsCol);
220
                glsr(Database::class)->insertBulk('postmeta', $postmeta, $postmetaCol);
221
                $postmeta = [];
222
                $data = [];
223
            }
224
        }
225
        if (!empty($data) && !empty($postmeta)) {
226
            glsr(Database::class)->insertBulk('stats', $data, $statsCol);
227
            glsr(Database::class)->insertBulk('postmeta', $postmeta, $postmetaCol);
228
        }
229
    }
230
231
    protected function processResults(array $results): void
232
    {
233
        $validIps = wp_list_pluck($results, 'query');
234
        $generator = $this->resultsGenerator($validIps);
235
        $this->prepareAndInsert($generator, $results);
236
    }
237
238
    /**
239
     * Release the processing lock.
240
     */
241
    protected function releaseLock(): void
242
    {
243
        delete_transient(static::LOCK_KEY);
244
    }
245
246
    /**
247
     * Generator to yield ratings data for a list of IP addresses.
248
     * Uses pagination to handle large result sets efficiently.
249
     *
250
     * @param string[] $ipAddresses List of IPs to query
251
     */
252
    protected function resultsGenerator(array $ipAddresses): \Generator
253
    {
254
        $ipChunks = array_chunk($ipAddresses, static::BATCH_SIZE);
255
        foreach ($ipChunks as $chunk) {
256
            $offset = 0;
257
            $placeholders = implode(',', array_fill(0, count($chunk), '%s'));
258
            do {
259
                $sql = "
260
                    SELECT ip_address, ID AS rating_id, review_id
261
                    FROM table|ratings
262
                    WHERE ip_address IN ($placeholders)
263
                    LIMIT %d OFFSET %d
264
                ";
265
                $query = glsr(Query::class)->sql($sql, array_merge($chunk, [static::ROW_BATCH_SIZE, $offset]));
266
                $results = glsr(Database::class)->dbGetResults($query, \ARRAY_A);
267
                foreach ($results as $row) {
268
                    yield $row;
269
                }
270
                $offset += static::ROW_BATCH_SIZE;
271
                $hasResults = !empty($results);
272
                unset($results); // Free memory
273
            } while ($hasResults);
274
        }
275
    }
276
277
    /**
278
     * Schedule the next batch of IPs or release the lock if no more IPs remain.
279
     *
280
     * @param int   $offset      Current offset
281
     * @param int   $batchSize   Size of the current batch
282
     * @param array $ipAddresses Current batch of IPs
283
     */
284
    protected function scheduleNextBatchIfNeeded(int $offset, int $batchSize, array $ipAddresses, int $delay = 60): void
285
    {
286
        if (count($ipAddresses) === $batchSize) {
287
            $timestamp = time() + max(0, $delay);
288
            glsr(Queue::class)->once($timestamp, static::QUEUED_ACTION_KEY, ['offset' => $offset + $batchSize], true);
289
        } else {
290
            $this->releaseLock();
291
        }
292
    }
293
}
294