Completed
Push — update/grunion-personal-data ( beacb7...22114a )
by
unknown
14:02
created

sync/class.jetpack-sync-queue.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
		$this->random_int = mt_rand( 1, 1000000 );
42
	}
43
44
	function add( $item ) {
45
		global $wpdb;
46
		$added = false;
47
		// this basically tries to add the option until enough time has elapsed that
48
		// it has a unique (microtime-based) option key
49
		while ( ! $added ) {
50
			$rows_added = $wpdb->query( $wpdb->prepare(
51
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
52
				$this->get_next_data_row_option_name(),
53
				serialize( $item ),
54
				'no'
55
			) );
56
			$added      = ( 0 !== $rows_added );
57
		}
58
	}
59
60
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
61
	function add_all( $items ) {
62
		global $wpdb;
63
		$base_option_name = $this->get_next_data_row_option_name();
64
65
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
66
67
		$rows = array();
68
69
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
70
			$option_name  = esc_sql( $base_option_name . '-' . $i );
71
			$option_value = esc_sql( serialize( $items[ $i ] ) );
72
			$rows[]       = "('$option_name', '$option_value', 'no')";
73
		}
74
75
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
76
77
		if ( count( $items ) === $rows_added ) {
78
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
79
		}
80
	}
81
82
	// Peek at the front-most item on the queue without checking it out
83
	function peek( $count = 1 ) {
84
		$items = $this->fetch_items( $count );
85
		if ( $items ) {
86
			return Jetpack_Sync_Utils::get_item_values( $items );
87
		}
88
89
		return array();
90
	}
91
92
	// lag is the difference in time between the age of the oldest item
93
	// (aka first or frontmost item) and the current time
94
	function lag() {
95
		global $wpdb;
96
97
		$first_item_name = $wpdb->get_var( $wpdb->prepare(
98
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
99
			"jpsq_{$this->id}-%"
100
		) );
101
102
		if ( ! $first_item_name ) {
103
			return 0;
104
		}
105
106
		// break apart the item name to get the timestamp
107
		$matches = null;
108
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
109
			return microtime( true ) - floatval( $matches[1] );
110
		} else {
111
			return 0;
112
		}
113
	}
114
115
	function reset() {
116
		global $wpdb;
117
		$this->delete_checkout_id();
118
		$wpdb->query( $wpdb->prepare(
119
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
120
		) );
121
	}
122
123
	function size() {
124
		global $wpdb;
125
126
		return (int) $wpdb->get_var( $wpdb->prepare(
127
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
128
		) );
129
	}
130
131
	// we use this peculiar implementation because it's much faster than count(*)
132
	function has_any_items() {
133
		global $wpdb;
134
		$value = $wpdb->get_var( $wpdb->prepare(
135
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
136
		) );
137
138
		return ( $value === '1' );
139
	}
140
141
	function checkout( $buffer_size ) {
142
		if ( $this->get_checkout_id() ) {
143
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
144
		}
145
146
		$buffer_id = uniqid();
147
148
		$result = $this->set_checkout_id( $buffer_id );
149
150
		if ( ! $result || is_wp_error( $result ) ) {
151
			return $result;
152
		}
153
154
		$items = $this->fetch_items( $buffer_size );
155
156
		if ( count( $items ) === 0 ) {
157
			return false;
158
		}
159
160
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
161
162
		return $buffer;
163
	}
164
165
	// this checks out rows until it either empties the queue or hits a certain memory limit
166
	// it loads the sizes from the DB first so that it doesn't accidentally
167
	// load more data into memory than it needs to.
168
	// The only way it will load more items than $max_size is if a single queue item
169
	// exceeds the memory limit, but in that case it will send that item by itself.
170
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
171
		if ( $this->get_checkout_id() ) {
172
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
173
		}
174
175
		$buffer_id = uniqid();
176
177
		$result = $this->set_checkout_id( $buffer_id );
178
179
		if ( ! $result || is_wp_error( $result ) ) {
180
			return $result;
181
		}
182
183
		// get the map of buffer_id -> memory_size
184
		global $wpdb;
185
186
		$items_with_size = $wpdb->get_results(
187
			$wpdb->prepare(
188
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
189
				"jpsq_{$this->id}-%",
190
				$max_buffer_size
191
			),
192
			OBJECT
193
		);
194
195
		if ( count( $items_with_size ) === 0 ) {
196
			return false;
197
		}
198
199
		$total_memory = 0;
200
201
		$min_item_id = $max_item_id = $items_with_size[0]->id;
202
203
		foreach ( $items_with_size as $id => $item_with_size ) {
204
			$total_memory += $item_with_size->value_size;
205
206
			// if this is the first item and it exceeds memory, allow loop to continue
207
			// we will exit on the next iteration instead
208
			if ( $total_memory > $max_memory && $id > 0 ) {
209
				break;
210
			}
211
212
			$max_item_id = $item_with_size->id;
213
		}
214
215
		$query = $wpdb->prepare( 
216
			"SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",
217
			$min_item_id,
218
			$max_item_id
219
		);
220
221
		$items = $wpdb->get_results( $query, OBJECT );
222
		foreach ( $items as $item ) {
223
			$item->value = maybe_unserialize( $item->value );
224
		}
225
226
		if ( count( $items ) === 0 ) {
227
			$this->delete_checkout_id();
228
229
			return false;
230
		}
231
232
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
233
234
		return $buffer;
235
	}
236
237
	function checkin( $buffer ) {
238
		$is_valid = $this->validate_checkout( $buffer );
239
240
		if ( is_wp_error( $is_valid ) ) {
241
			return $is_valid;
242
		}
243
244
		$this->delete_checkout_id();
245
246
		return true;
247
	}
248
249
	function close( $buffer, $ids_to_remove = null ) {
250
		$is_valid = $this->validate_checkout( $buffer );
251
252
		if ( is_wp_error( $is_valid ) ) {
253
			return $is_valid;
254
		}
255
256
		$this->delete_checkout_id();
257
258
		// by default clear all items in the buffer
259
		if ( is_null( $ids_to_remove ) ) {
260
			$ids_to_remove = $buffer->get_item_ids();
261
		}
262
263
		global $wpdb;
264
265
		if ( count( $ids_to_remove ) > 0 ) {
266
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
267
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
268
			$wpdb->query( $query );
269
		}
270
271
		return true;
272
	}
273
274
	function flush_all() {
275
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
276
		$this->reset();
277
278
		return $items;
279
	}
280
281
	function get_all() {
282
		return $this->fetch_items();
283
	}
284
285
	// use with caution, this could allow multiple processes to delete
286
	// and send from the queue at the same time
287
	function force_checkin() {
288
		$this->delete_checkout_id();
289
	}
290
291
	// used to lock checkouts from the queue.
292
	// tries to wait up to $timeout seconds for the queue to be empty
293
	function lock( $timeout = 30 ) {
294
		$tries = 0;
295
296
		while ( $this->has_any_items() && $tries < $timeout ) {
297
			sleep( 1 );
298
			$tries += 1;
299
		}
300
301
		if ( $tries === 30 ) {
302
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
303
		}
304
305
		if ( $this->get_checkout_id() ) {
306
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
307
		}
308
309
		// hopefully this means we can acquire a checkout?
310
		$result = $this->set_checkout_id( 'lock' );
311
312
		if ( ! $result || is_wp_error( $result ) ) {
313
			return $result;
314
		}
315
316
		return true;
317
	}
318
319
	function unlock() {
320
		return $this->delete_checkout_id();
321
	}
322
323
	private function get_checkout_id() {
324
		global $wpdb;
325
		$checkout_value = $wpdb->get_var( 
326
			$wpdb->prepare(
327
				"SELECT option_value FROM $wpdb->options WHERE option_name = %s", 
328
				$this->get_lock_option_name()
329
			)
330
		);
331
332
		if ( $checkout_value ) {
333
			list( $checkout_id, $timestamp ) = explode( ':', $checkout_value );
334
			if ( intval( $timestamp ) > time() ) {
335
				return $checkout_id;
336
			}
337
		}
338
339
		return false;
340
	}
341
342
	private function set_checkout_id( $checkout_id ) {
343
		global $wpdb;
344
345
		$expires = time() + Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout;
346
		$updated_num = $wpdb->query(
347
			$wpdb->prepare(
348
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", 
349
				"$checkout_id:$expires",
350
				$this->get_lock_option_name()
351
			)
352
		);
353
354
		if ( ! $updated_num ) {
355
			$updated_num = $wpdb->query(
356
				$wpdb->prepare(
357
					"INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )", 
358
					$this->get_lock_option_name(),
359
					"$checkout_id:$expires"
360
				)
361
			);
362
		}
363
364
		return $updated_num;
365
	}
366
367
	private function delete_checkout_id() {
368
		global $wpdb;
369
		// rather than delete, which causes fragmentation, we update in place
370
		return $wpdb->query(
371
			$wpdb->prepare( 
372
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", 
373
				"0:0",
374
				$this->get_lock_option_name() 
375
			) 
376
		);
377
378
	}
379
380
	private function get_lock_option_name() {
381
		return "jpsq_{$this->id}_checkout";
382
	}
383
384
	private function get_next_data_row_option_name() {
385
		// this option is specifically chosen to, as much as possible, preserve time order
386
		// and minimise the possibility of collisions between multiple processes working
387
		// at the same time
388
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
389
		// @see: http://php.net/manual/en/function.microtime.php
390
		$timestamp = sprintf( '%.6f', microtime( true ) );
391
392
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
393
		if ( $this->row_iterator === PHP_INT_MAX ) {
394
			$this->row_iterator = 0;
395
		} else {
396
			$this->row_iterator += 1;
397
		}
398
399
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
400
	}
401
402
	private function fetch_items( $limit = null ) {
403
		global $wpdb;
404
405
		if ( $limit ) {
406
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
407
		} else {
408
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
409
		}
410
411
		$items = $wpdb->get_results( $query_sql, OBJECT );
412
		foreach ( $items as $item ) {
413
			$item->value = maybe_unserialize( $item->value );
414
		}
415
416
		return $items;
417
	}
418
419
	private function validate_checkout( $buffer ) {
420
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
421
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
422
		}
423
424
		$checkout_id = $this->get_checkout_id();
425
426
		if ( ! $checkout_id ) {
427
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
428
		}
429
430
		if ( $checkout_id != $buffer->id ) {
431
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
432
		}
433
434
		return true;
435
	}
436
}
437
438
class Jetpack_Sync_Utils {
439
440
	static function get_item_values( $items ) {
441
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
442
	}
443
444
	static function get_item_ids( $items ) {
445
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
446
	}
447
448
	static private function get_item_value( $item ) {
449
		return $item->value;
450
	}
451
452
	static private function get_item_id( $item ) {
453
		return $item->id;
454
	}
455
}
456