Completed
Push — renovate/mocha-6.x ( f901fe...f3dd78 )
by
unknown
56:50 queued 50:25
created

packages/sync/src/Queue.php (2 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace Automattic\Jetpack\Sync;
4
5
use Automattic\Jetpack\Sync\Defaults;
6
7
/**
8
 * A persistent queue that can be flushed in increments of N items,
9
 * and which blocks reads until checked-out buffers are checked in or
10
 * closed. This uses raw SQL for two reasons: speed, and not triggering
11
 * tons of added_option callbacks.
12
 */
13
class Queue {
14
	public $id;
15
	private $row_iterator;
16
17
	function __construct( $id ) {
18
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
19
		$this->row_iterator = 0;
20
		$this->random_int   = mt_rand( 1, 1000000 );
21
	}
22
23
	function add( $item ) {
24
		global $wpdb;
25
		$added = false;
26
		// this basically tries to add the option until enough time has elapsed that
27
		// it has a unique (microtime-based) option key
28
		while ( ! $added ) {
29
			$rows_added = $wpdb->query(
30
				$wpdb->prepare(
31
					"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
32
					$this->get_next_data_row_option_name(),
33
					serialize( $item ),
34
					'no'
35
				)
36
			);
37
			$added      = ( 0 !== $rows_added );
38
		}
39
	}
40
41
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
42
	function add_all( $items ) {
43
		global $wpdb;
44
		$base_option_name = $this->get_next_data_row_option_name();
45
46
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
47
48
		$rows = array();
49
50
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
51
			$option_name  = esc_sql( $base_option_name . '-' . $i );
52
			$option_value = esc_sql( serialize( $items[ $i ] ) );
53
			$rows[]       = "('$option_name', '$option_value', 'no')";
54
		}
55
56
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
57
58
		if ( count( $items ) === $rows_added ) {
59
			return new \WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
60
		}
61
	}
62
63
	// Peek at the front-most item on the queue without checking it out
64
	function peek( $count = 1 ) {
65
		$items = $this->fetch_items( $count );
66
		if ( $items ) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $items of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
67
			return Utils::get_item_values( $items );
68
		}
69
70
		return array();
71
	}
72
73
	function peek_by_id( $item_ids ) {
74
		$items = $this->fetch_items_by_id( $item_ids );
75
		if ( $items ) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $items of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
76
			return Utils::get_item_values( $items );
77
		}
78
79
		return array();
80
	}
81
82
	// lag is the difference in time between the age of the oldest item
83
	// (aka first or frontmost item) and the current time
84
	function lag( $now = null ) {
85
		global $wpdb;
86
87
		$first_item_name = $wpdb->get_var(
88
			$wpdb->prepare(
89
				"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
90
				"jpsq_{$this->id}-%"
91
			)
92
		);
93
94
		if ( ! $first_item_name ) {
95
			return 0;
96
		}
97
98
		if ( null === $now ) {
99
			$now = microtime( true );
100
		}
101
102
		// break apart the item name to get the timestamp
103
		$matches = null;
104
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
105
			return $now - floatval( $matches[1] );
106
		} else {
107
			return 0;
108
		}
109
	}
110
111
	function reset() {
112
		global $wpdb;
113
		$this->delete_checkout_id();
114
		$wpdb->query(
115
			$wpdb->prepare(
116
				"DELETE FROM $wpdb->options WHERE option_name LIKE %s",
117
				"jpsq_{$this->id}-%"
118
			)
119
		);
120
	}
121
122
	function size() {
123
		global $wpdb;
124
125
		return (int) $wpdb->get_var(
126
			$wpdb->prepare(
127
				"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s",
128
				"jpsq_{$this->id}-%"
129
			)
130
		);
131
	}
132
133
	// we use this peculiar implementation because it's much faster than count(*)
134
	function has_any_items() {
135
		global $wpdb;
136
		$value = $wpdb->get_var(
137
			$wpdb->prepare(
138
				"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )",
139
				"jpsq_{$this->id}-%"
140
			)
141
		);
142
143
		return ( $value === '1' );
144
	}
145
146
	function checkout( $buffer_size ) {
147
		if ( $this->get_checkout_id() ) {
148
			return new \WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
149
		}
150
151
		$buffer_id = uniqid();
152
153
		$result = $this->set_checkout_id( $buffer_id );
154
155
		if ( ! $result || is_wp_error( $result ) ) {
156
			return $result;
157
		}
158
159
		$items = $this->fetch_items( $buffer_size );
160
161
		if ( count( $items ) === 0 ) {
162
			return false;
163
		}
164
165
		$buffer = new Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
166
167
		return $buffer;
168
	}
169
170
	// this checks out rows until it either empties the queue or hits a certain memory limit
171
	// it loads the sizes from the DB first so that it doesn't accidentally
172
	// load more data into memory than it needs to.
173
	// The only way it will load more items than $max_size is if a single queue item
174
	// exceeds the memory limit, but in that case it will send that item by itself.
175
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
176
		if ( $this->get_checkout_id() ) {
177
			return new \WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
178
		}
179
180
		$buffer_id = uniqid();
181
182
		$result = $this->set_checkout_id( $buffer_id );
183
184
		if ( ! $result || is_wp_error( $result ) ) {
185
			return $result;
186
		}
187
188
		// get the map of buffer_id -> memory_size
189
		global $wpdb;
190
191
		$items_with_size = $wpdb->get_results(
192
			$wpdb->prepare(
193
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
194
				"jpsq_{$this->id}-%",
195
				$max_buffer_size
196
			),
197
			OBJECT
198
		);
199
200
		if ( count( $items_with_size ) === 0 ) {
201
			return false;
202
		}
203
204
		$total_memory = 0;
205
206
		$min_item_id = $max_item_id = $items_with_size[0]->id;
207
208
		foreach ( $items_with_size as $id => $item_with_size ) {
209
			$total_memory += $item_with_size->value_size;
210
211
			// if this is the first item and it exceeds memory, allow loop to continue
212
			// we will exit on the next iteration instead
213
			if ( $total_memory > $max_memory && $id > 0 ) {
214
				break;
215
			}
216
217
			$max_item_id = $item_with_size->id;
218
		}
219
220
		$query = $wpdb->prepare(
221
			"SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",
222
			$min_item_id,
223
			$max_item_id
224
		);
225
226
		$items = $wpdb->get_results( $query, OBJECT );
227
		foreach ( $items as $item ) {
228
			$item->value = maybe_unserialize( $item->value );
229
		}
230
231
		if ( count( $items ) === 0 ) {
232
			$this->delete_checkout_id();
233
234
			return false;
235
		}
236
237
		$buffer = new Queue_Buffer( $buffer_id, $items );
238
239
		return $buffer;
240
	}
241
242
	function checkin( $buffer ) {
243
		$is_valid = $this->validate_checkout( $buffer );
244
245
		if ( is_wp_error( $is_valid ) ) {
246
			return $is_valid;
247
		}
248
249
		$this->delete_checkout_id();
250
251
		return true;
252
	}
253
254
	function close( $buffer, $ids_to_remove = null ) {
255
		$is_valid = $this->validate_checkout( $buffer );
256
257
		if ( is_wp_error( $is_valid ) ) {
258
			return $is_valid;
259
		}
260
261
		$this->delete_checkout_id();
262
263
		// by default clear all items in the buffer
264
		if ( is_null( $ids_to_remove ) ) {
265
			$ids_to_remove = $buffer->get_item_ids();
266
		}
267
268
		global $wpdb;
269
270
		if ( count( $ids_to_remove ) > 0 ) {
271
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
272
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
273
			$wpdb->query( $query );
274
		}
275
276
		return true;
277
	}
278
279
	function flush_all() {
280
		$items = Utils::get_item_values( $this->fetch_items() );
281
		$this->reset();
282
283
		return $items;
284
	}
285
286
	function get_all() {
287
		return $this->fetch_items();
288
	}
289
290
	// use with caution, this could allow multiple processes to delete
291
	// and send from the queue at the same time
292
	function force_checkin() {
293
		$this->delete_checkout_id();
294
	}
295
296
	// used to lock checkouts from the queue.
297
	// tries to wait up to $timeout seconds for the queue to be empty
298
	function lock( $timeout = 30 ) {
299
		$tries = 0;
300
301
		while ( $this->has_any_items() && $tries < $timeout ) {
302
			sleep( 1 );
303
			$tries += 1;
304
		}
305
306
		if ( $tries === 30 ) {
307
			return new \WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
308
		}
309
310
		if ( $this->get_checkout_id() ) {
311
			return new \WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
312
		}
313
314
		// hopefully this means we can acquire a checkout?
315
		$result = $this->set_checkout_id( 'lock' );
316
317
		if ( ! $result || is_wp_error( $result ) ) {
318
			return $result;
319
		}
320
321
		return true;
322
	}
323
324
	function unlock() {
325
		return $this->delete_checkout_id();
326
	}
327
328
	/**
329
	 * This option is specifically chosen to, as much as possible, preserve time order
330
	 * and minimise the possibility of collisions between multiple processes working
331
	 * at the same time.
332
	 *
333
	 * @return string
334
	 */
335
	protected function generate_option_name_timestamp() {
336
		return sprintf( '%.6f', microtime( true ) );
337
	}
338
339
	private function get_checkout_id() {
340
		global $wpdb;
341
		$checkout_value = $wpdb->get_var(
342
			$wpdb->prepare(
343
				"SELECT option_value FROM $wpdb->options WHERE option_name = %s",
344
				$this->get_lock_option_name()
345
			)
346
		);
347
348
		if ( $checkout_value ) {
349
			list( $checkout_id, $timestamp ) = explode( ':', $checkout_value );
350
			if ( intval( $timestamp ) > time() ) {
351
				return $checkout_id;
352
			}
353
		}
354
355
		return false;
356
	}
357
358
	private function set_checkout_id( $checkout_id ) {
359
		global $wpdb;
360
361
		$expires     = time() + Defaults::$default_sync_queue_lock_timeout;
362
		$updated_num = $wpdb->query(
363
			$wpdb->prepare(
364
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
365
				"$checkout_id:$expires",
366
				$this->get_lock_option_name()
367
			)
368
		);
369
370
		if ( ! $updated_num ) {
371
			$updated_num = $wpdb->query(
372
				$wpdb->prepare(
373
					"INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )",
374
					$this->get_lock_option_name(),
375
					"$checkout_id:$expires"
376
				)
377
			);
378
		}
379
380
		return $updated_num;
381
	}
382
383
	private function delete_checkout_id() {
384
		global $wpdb;
385
		// rather than delete, which causes fragmentation, we update in place
386
		return $wpdb->query(
387
			$wpdb->prepare(
388
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
389
				'0:0',
390
				$this->get_lock_option_name()
391
			)
392
		);
393
394
	}
395
396
	private function get_lock_option_name() {
397
		return "jpsq_{$this->id}_checkout";
398
	}
399
400
	private function get_next_data_row_option_name() {
401
		$timestamp = $this->generate_option_name_timestamp();
402
403
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
404
		if ( $this->row_iterator === PHP_INT_MAX ) {
405
			$this->row_iterator = 0;
406
		} else {
407
			$this->row_iterator += 1;
408
		}
409
410
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
411
	}
412
413
	private function fetch_items( $limit = null ) {
414
		global $wpdb;
415
416
		if ( $limit ) {
417
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
418
		} else {
419
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
420
		}
421
422
		return $this->query_for_items( $query_sql );
423
	}
424
425
	private function fetch_items_by_id( $items_ids ) {
426
		global $wpdb;
427
		$ids_placeholders = implode( ', ', array_fill( 0, count( $items_ids ), '%s' ) );
428
429
		$query_sql = $wpdb->prepare(
430
			"
431
			SELECT option_name AS id, option_value AS value
432
			FROM $wpdb->options
433
			WHERE option_name IN ( $ids_placeholders )",
434
			$items_ids
435
		);
436
437
		return $this->query_for_items( $query_sql );
438
	}
439
440
	private function query_for_items( $query_sql ) {
441
		global $wpdb;
442
443
		$items = $wpdb->get_results( $query_sql, OBJECT );
444
		array_walk(
445
			$items,
446
			function( $item ) {
447
				$item->value = maybe_unserialize( $item->value );
448
			}
449
		);
450
		return $items;
451
	}
452
453
	private function validate_checkout( $buffer ) {
454
		if ( ! $buffer instanceof Queue_Buffer ) {
455
			return new \WP_Error( 'not_a_buffer', 'You must checkin an instance of Automattic\\Jetpack\\Sync\\Queue_Buffer' );
456
		}
457
458
		$checkout_id = $this->get_checkout_id();
459
460
		if ( ! $checkout_id ) {
461
			return new \WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
462
		}
463
464
		if ( $checkout_id != $buffer->id ) {
465
			return new \WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
466
		}
467
468
		return true;
469
	}
470
}
471