Completed
Push — fix/photon-filter-srcset ( 8990e0...8f0544 )
by
unknown
122:55 queued 111:44
created

sync/class.jetpack-sync-queue.php (1 issue)

Labels
Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
		$this->random_int = random_int( 1, 1000000 );
0 ignored issues
show
The property random_int does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
42
	}
43
44
	function add( $item ) {
45
		global $wpdb;
46
		$added = false;
47
		// this basically tries to add the option until enough time has elapsed that
48
		// it has a unique (microtime-based) option key
49
		while ( ! $added ) {
50
			$rows_added = $wpdb->query( $wpdb->prepare(
51
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
52
				$this->get_next_data_row_option_name(),
53
				serialize( $item ),
54
				'no'
55
			) );
56
			$added      = ( 0 !== $rows_added );
57
		}
58
59
		do_action( 'jpsq_item_added' );
60
	}
61
62
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
63
	function add_all( $items ) {
64
		global $wpdb;
65
		$base_option_name = $this->get_next_data_row_option_name();
66
67
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
68
69
		$rows = array();
70
71
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
72
			$option_name  = esc_sql( $base_option_name . '-' . $i );
73
			$option_value = esc_sql( serialize( $items[ $i ] ) );
74
			$rows[]       = "('$option_name', '$option_value', 'no')";
75
		}
76
77
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
78
79
		if ( count( $items ) === $rows_added ) {
80
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
81
		}
82
83
		do_action( 'jpsq_items_added', $rows_added );
84
	}
85
86
	// Peek at the front-most item on the queue without checking it out
87
	function peek( $count = 1 ) {
88
		$items = $this->fetch_items( $count );
89
		if ( $items ) {
90
			return Jetpack_Sync_Utils::get_item_values( $items );
91
		}
92
93
		return array();
94
	}
95
96
	// lag is the difference in time between the age of the oldest item
97
	// (aka first or frontmost item) and the current time
98
	function lag() {
99
		global $wpdb;
100
101
		$first_item_name = $wpdb->get_var( $wpdb->prepare(
102
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
103
			"jpsq_{$this->id}-%"
104
		) );
105
106
		if ( ! $first_item_name ) {
107
			return 0;
108
		}
109
110
		// break apart the item name to get the timestamp
111
		$matches = null;
112
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
113
			return microtime( true ) - floatval( $matches[1] );
114
		} else {
115
			return 0;
116
		}
117
	}
118
119
	function reset() {
120
		global $wpdb;
121
		$this->delete_checkout_id();
122
		$wpdb->query( $wpdb->prepare(
123
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
124
		) );
125
	}
126
127
	function size() {
128
		global $wpdb;
129
130
		return (int) $wpdb->get_var( $wpdb->prepare(
131
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
132
		) );
133
	}
134
135
	// we use this peculiar implementation because it's much faster than count(*)
136
	function has_any_items() {
137
		global $wpdb;
138
		$value = $wpdb->get_var( $wpdb->prepare(
139
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
140
		) );
141
142
		return ( $value === '1' );
143
	}
144
145
	function checkout( $buffer_size ) {
146
		if ( $this->get_checkout_id() ) {
147
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
148
		}
149
150
		$buffer_id = uniqid();
151
152
		$result = $this->set_checkout_id( $buffer_id );
153
154
		if ( ! $result || is_wp_error( $result ) ) {
155
			return $result;
156
		}
157
158
		$items = $this->fetch_items( $buffer_size );
159
160
		if ( count( $items ) === 0 ) {
161
			return false;
162
		}
163
164
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
165
166
		return $buffer;
167
	}
168
169
	// this checks out rows until it either empties the queue or hits a certain memory limit
170
	// it loads the sizes from the DB first so that it doesn't accidentally
171
	// load more data into memory than it needs to.
172
	// The only way it will load more items than $max_size is if a single queue item
173
	// exceeds the memory limit, but in that case it will send that item by itself.
174
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
175
		if ( $this->get_checkout_id() ) {
176
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
177
		}
178
179
		$buffer_id = uniqid();
180
181
		$result = $this->set_checkout_id( $buffer_id );
182
183
		if ( ! $result || is_wp_error( $result ) ) {
184
			return $result;
185
		}
186
187
		// get the map of buffer_id -> memory_size
188
		global $wpdb;
189
190
		$items_with_size = $wpdb->get_results(
191
			$wpdb->prepare(
192
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
193
				"jpsq_{$this->id}-%",
194
				$max_buffer_size
195
			),
196
			OBJECT
197
		);
198
199
		$total_memory = 0;
200
		$item_ids     = array();
201
202
		foreach ( $items_with_size as $item_with_size ) {
203
			$total_memory += $item_with_size->value_size;
204
205
			// if this is the first item and it exceeds memory, allow loop to continue
206
			// we will exit on the next iteration instead
207
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
208
				break;
209
			}
210
			$item_ids[] = $item_with_size->id;
211
		}
212
213
		$items = $this->fetch_items_by_id( $item_ids );
214
215
		if ( count( $items ) === 0 ) {
216
			$this->delete_checkout_id();
217
218
			return false;
219
		}
220
221
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
222
223
		return $buffer;
224
	}
225
226
	function checkin( $buffer ) {
227
		$is_valid = $this->validate_checkout( $buffer );
228
229
		if ( is_wp_error( $is_valid ) ) {
230
			return $is_valid;
231
		}
232
233
		$this->delete_checkout_id();
234
235
		return true;
236
	}
237
238
	function close( $buffer, $ids_to_remove = null ) {
239
		$is_valid = $this->validate_checkout( $buffer );
240
241
		if ( is_wp_error( $is_valid ) ) {
242
			return $is_valid;
243
		}
244
245
		$this->delete_checkout_id();
246
247
		// by default clear all items in the buffer
248
		if ( is_null( $ids_to_remove ) ) {
249
			$ids_to_remove = $buffer->get_item_ids();
250
		}
251
252
		global $wpdb;
253
254
		if ( count( $ids_to_remove ) > 0 ) {
255
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
256
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
257
			$wpdb->query( $query );
258
		}
259
260
		return true;
261
	}
262
263
	function flush_all() {
264
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
265
		$this->reset();
266
267
		return $items;
268
	}
269
270
	function get_all() {
271
		return $this->fetch_items();
272
	}
273
274
	// use with caution, this could allow multiple processes to delete
275
	// and send from the queue at the same time
276
	function force_checkin() {
277
		$this->delete_checkout_id();
278
	}
279
280
	// used to lock checkouts from the queue.
281
	// tries to wait up to $timeout seconds for the queue to be empty
282
	function lock( $timeout = 30 ) {
283
		$tries = 0;
284
285
		while ( $this->has_any_items() && $tries < $timeout ) {
286
			sleep( 1 );
287
			$tries += 1;
288
		}
289
290
		if ( $tries === 30 ) {
291
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
292
		}
293
294
		if ( $this->get_checkout_id() ) {
295
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
296
		}
297
298
		// hopefully this means we can acquire a checkout?
299
		$result = $this->set_checkout_id( 'lock' );
300
301
		if ( ! $result || is_wp_error( $result ) ) {
302
			return $result;
303
		}
304
305
		return true;
306
	}
307
308
	function unlock() {
309
		$this->delete_checkout_id();
310
	}
311
312
	private function get_checkout_id() {
313
		return get_transient( $this->get_checkout_transient_name() );
314
	}
315
316
	private function set_checkout_id( $checkout_id ) {
317
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5 * 60 ); // 5 minute timeout
318
	}
319
320
	private function delete_checkout_id() {
321
		delete_transient( $this->get_checkout_transient_name() );
322
	}
323
324
	private function get_checkout_transient_name() {
325
		return "jpsq_{$this->id}_checkout";
326
	}
327
328
	private function get_next_data_row_option_name() {
329
		// this option is specifically chosen to, as much as possible, preserve time order
330
		// and minimise the possibility of collisions between multiple processes working
331
		// at the same time
332
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
333
		// @see: http://php.net/manual/en/function.microtime.php
334
		$timestamp = sprintf( '%.6f', microtime( true ) );
335
336
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
337
		if ( $this->row_iterator === PHP_INT_MAX ) {
338
			$this->row_iterator = 0;
339
		} else {
340
			$this->row_iterator += 1;
341
		}
342
343
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
344
	}
345
346
	private function fetch_items( $limit = null ) {
347
		global $wpdb;
348
349
		if ( $limit ) {
350
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
351
		} else {
352
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
353
		}
354
355
		$items = $wpdb->get_results( $query_sql, OBJECT );
356
		foreach ( $items as $item ) {
357
			$item->value = maybe_unserialize( $item->value );
358
		}
359
360
		return $items;
361
	}
362
363
	private function fetch_items_by_id( $item_ids ) {
364
		global $wpdb;
365
366
		if ( count( $item_ids ) > 0 ) {
367
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
368
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
369
			$items = $wpdb->get_results( $query, OBJECT );
370
			foreach ( $items as $item ) {
371
				$item->value = maybe_unserialize( $item->value );
372
			}
373
374
			return $items;
375
		} else {
376
			return array();
377
		}
378
	}
379
380
	private function validate_checkout( $buffer ) {
381
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
382
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
383
		}
384
385
		$checkout_id = $this->get_checkout_id();
386
387
		if ( ! $checkout_id ) {
388
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
389
		}
390
391
		if ( $checkout_id != $buffer->id ) {
392
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
393
		}
394
395
		return true;
396
	}
397
}
398
399
class Jetpack_Sync_Utils {
400
401
	static function get_item_values( $items ) {
402
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
403
	}
404
405
	static function get_item_ids( $items ) {
406
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
407
	}
408
409
	static private function get_item_value( $item ) {
410
		return $item->value;
411
	}
412
413
	static private function get_item_id( $item ) {
414
		return $item->id;
415
	}
416
}
417