Completed
Push — add/stop-sync-filter ( 16f5d3...72a4a0 )
by
unknown
09:47
created

Jetpack_Sync_Queue::checkout()   B

Complexity

Conditions 5
Paths 4

Size

Total Lines 23
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 12
c 0
b 0
f 0
nc 4
nop 1
dl 0
loc 23
rs 8.5906
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
		$this->blacklist_names = array( 'jetpack_skipped_sync_post_ids', 'jetpack_skipped_sync_comment_ids' );
0 ignored issues
show
Bug introduced by
The property blacklist_names does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
42
43
		add_action( 'deleted_post', array( $this, 'maybe_update_post_blacklist' ) );
44
		add_action( 'deleted_comment', array( $this, 'maybe_update_comment_blacklist' ) );
45
	}
46
47
	function add( $item ) {
48
		/**
49
		 * Filters whether to skip adding an item to the sync que.
50
		 *
51
		 * Passing true to the filter will prevent the item being synced
52
		 * we keep a blacklist of post ids and comment ids so that we can
53
		 * an accurate checksum of what is being synced.
54
		 *
55
		 * @since 4.1.0
56
		 *
57
		 * @param boolean false whether to skip syncing an item
58
		 * @param mixed $item an Array containing array( $action, $args, $user_id, $microtime );
59
		 *
60
		 * @see Jetpack_Sync_Queue->add()
61
		 */
62
		if ( apply_filters( 'jetpack_skip_sync_item', false, $item ) ) {
63
			$this->add_skipped_item( $item );
64
			return false;
65
		}
66
		$this->remove_skipped_item( $item );
67
		$this->_add( $item );
68
	}
69
70
	private function _add( $item ) {
71
		global $wpdb;
72
		$added = false;
73
		// this basically tries to add the option until enough time has elapsed that
74
		// it has a unique (microtime-based) option key
75
		while ( ! $added ) {
76
			$rows_added = $wpdb->query( $wpdb->prepare(
77
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
78
				$this->get_next_data_row_option_name(),
79
				serialize( $item ),
80
				'no'
81
			) );
82
			$added      = ( $rows_added !== 0 );
83
		}
84
	}
85
86
	function add_skipped_item( $item ) {
87
		list( $action, $args ) = $item;
88
		switch( $action ) {
89
			case 'wp_insert_post':
90
				$this->add_skipped_id( $args[0], 'post' );
91
				break;
92
93
			case 'wp_insert_comment':
94
			case 'comment_unapproved_':
95
			case 'comment_approved_':
96
			case 'comment_unapproved_trackback':
97
			case 'comment_approved_trackback':
98
			case 'comment_unapproved_pingback':
99
			case 'comment_approved_pingback':
100
				$this->add_skipped_id( $args[0], 'comment' );
101
				break;
102
103
			case 'added_option':
104
			case 'updated_option':
105
			case 'deleted_option':
106
				if ( in_array( $args[0], $this->blacklist_names ) ) {
107
					// we have to make sure that the blacklist always gets synced
108
					$this->_add( $item );
109
				}
110
				break;
111
		}
112
	}
113
114
	function remove_skipped_item( $item ) {
115
		list( $action, $args ) = $item;
116
		switch ( $action ) {
117
			case 'wp_insert_post':
118
				$this->remove_skipped_id( $args[0], 'post' );
119
				break;
120
121
			case 'wp_insert_comment':
122
			case 'comment_unapproved_':
123
			case 'comment_approved_':
124
			case 'comment_unapproved_trackback':
125
			case 'comment_approved_trackback':
126
			case 'comment_unapproved_pingback':
127
			case 'comment_approved_pingback':
128
				$this->remove_skipped_id( $args[0], 'comment' );
129
				break;
130
		}
131
	}
132
133
	function add_skipped_id( $object_id, $type ) {
134
		$option_name = 'jetpack_skipped_sync_' . $type . '_ids';
135
		$blacklist = get_option( $option_name, array() );
136
		if ( ! in_array( $object_id, $blacklist ) ) {
137
			$blacklist[] = $object_id;
138
			update_option( $option_name, $blacklist );
139
		}
140
	}
141
142
	function remove_skipped_id( $object_id, $type ) {
143
		$option_name = 'jetpack_skipped_sync_' . $type . '_ids';
144
		$blacklist = get_option( $option_name, array() );
145
		if ( in_array( $object_id, $blacklist ) ) {
146
			$blacklist = $blacklist = array_diff( $blacklist, array( $object_id ) );
147
			update_option( $option_name, $blacklist );
148
		}
149
	}
150
151
	function maybe_update_post_blacklist( $post_id ) {
152
		$this->remove_skipped_id( $post_id, 'post' );
153
	}
154
155
	function maybe_update_comment_blacklist( $comment_id ) {
156
		$this->remove_skipped_id( $comment_id, 'comment' );
157
	}
158
159
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
160
	function add_all( $items ) {
161
		global $wpdb;
162
		$base_option_name = $this->get_next_data_row_option_name();
163
164
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
165
166
		$rows = array();
167
168
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
169
			/** This action is already documented in sync/class.jetpack-sync-queue.php */
170
			if ( apply_filters( 'jetpack_skip_sync_item', false, $items[ $i ]) ) {
171
				continue;
172
			}
173
			$option_name  = esc_sql( $base_option_name . '-' . $i );
174
			$option_value = esc_sql( serialize( $items[ $i ] ) );
175
			$rows[]       = "('$option_name', '$option_value', 'no')";
176
		}
177
178
		if ( empty( $rows ) ) {
179
			return false;
180
		}
181
182
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
183
184
		if ( $rows_added !== count( $rows ) ) {
185
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
186
		}
187
	}
188
189
	// Peek at the front-most item on the queue without checking it out
190
	function peek( $count = 1 ) {
191
		$items = $this->fetch_items( $count );
192
		if ( $items ) {
193
			return Jetpack_Sync_Utils::get_item_values( $items );
194
		}
195
196
		return array();
197
	}
198
199
	// lag is the difference in time between the age of the oldest item and the current time
200
	function lag() {
201
		global $wpdb;
202
203
		$last_item_name = $wpdb->get_var( $wpdb->prepare(
204
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
205
			"jpsq_{$this->id}-%"
206
		) );
207
208
		if ( ! $last_item_name ) {
209
			return null;
210
		}
211
212
		// break apart the item name to get the timestamp
213
		$matches = null;
214
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $last_item_name, $matches ) ) {
215
			return microtime( true ) - floatval( $matches[1] );
216
		} else {
217
			return null;
218
		}
219
	}
220
221
	function reset() {
222
		global $wpdb;
223
		$this->delete_checkout_id();
224
		$wpdb->query( $wpdb->prepare(
225
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
226
		) );
227
	}
228
229
	function size() {
230
		global $wpdb;
231
232
		return (int) $wpdb->get_var( $wpdb->prepare(
233
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
234
		) );
235
	}
236
237
	// we use this peculiar implementation because it's much faster than count(*)
238
	function has_any_items() {
239
		global $wpdb;
240
		$value = $wpdb->get_var( $wpdb->prepare(
241
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
242
		) );
243
244
		return ( $value === "1" );
245
	}
246
247
	function checkout( $buffer_size ) {
248
		if ( $this->get_checkout_id() ) {
249
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
250
		}
251
252
		$buffer_id = uniqid();
253
254
		$result = $this->set_checkout_id( $buffer_id );
255
256
		if ( ! $result || is_wp_error( $result ) ) {
257
			return $result;
258
		}
259
260
		$items = $this->fetch_items( $buffer_size );
261
262
		if ( count( $items ) === 0 ) {
263
			return false;
264
		}
265
266
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
267
268
		return $buffer;
269
	}
270
271
	// this checks out rows until it either empties the queue or hits a certain memory limit
272
	// it loads the sizes from the DB first so that it doesn't accidentally
273
	// load more data into memory than it needs to.
274
	// The only way it will load more items than $max_size is if a single queue item 
275
	// exceeds the memory limit, but in that case it will send that item by itself.
276
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
277
		if ( $this->get_checkout_id() ) {
278
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
279
		}
280
281
		$buffer_id = uniqid();
282
283
		$result = $this->set_checkout_id( $buffer_id );
284
285
		if ( ! $result || is_wp_error( $result ) ) {
286
			return $result;
287
		}
288
289
		// get the map of buffer_id -> memory_size
290
		global $wpdb;
291
292
		$items_with_size = $wpdb->get_results(
293
			$wpdb->prepare(
294
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
295
				"jpsq_{$this->id}-%",
296
				$max_buffer_size
297
			),
298
			OBJECT
299
		);
300
301
		$total_memory = 0;
302
		$item_ids     = array();
303
304
		foreach ( $items_with_size as $item_with_size ) {
305
			$total_memory += $item_with_size->value_size;
306
307
			// if this is the first item and it exceeds memory, allow loop to continue
308
			// we will exit on the next iteration instead
309
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
310
				break;
311
			}
312
			$item_ids[] = $item_with_size->id;
313
		}
314
315
		$items = $this->fetch_items_by_id( $item_ids );
316
317
		if ( count( $items ) === 0 ) {
318
			$this->delete_checkout_id();
319
320
			return false;
321
		}
322
323
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
324
325
		return $buffer;
326
	}
327
328
	function checkin( $buffer ) {
329
		$is_valid = $this->validate_checkout( $buffer );
330
331
		if ( is_wp_error( $is_valid ) ) {
332
			return $is_valid;
333
		}
334
335
		$this->delete_checkout_id();
336
337
		return true;
338
	}
339
340
	function close( $buffer, $ids_to_remove = null ) {
341
		$is_valid = $this->validate_checkout( $buffer );
342
343
		if ( is_wp_error( $is_valid ) ) {
344
			return $is_valid;
345
		}
346
347
		$this->delete_checkout_id();
348
349
		// by default clear all items in the buffer
350
		if ( is_null( $ids_to_remove ) ) {
351
			$ids_to_remove = $buffer->get_item_ids();
352
		}
353
354
		global $wpdb;
355
356
		if ( count( $ids_to_remove ) > 0 ) {
357
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
358
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
359
			$wpdb->query( $query );
360
		}
361
362
		return true;
363
	}
364
365
	function flush_all() {
366
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
367
		$this->reset();
368
369
		return $items;
370
	}
371
372
	function get_all() {
373
		return $this->fetch_items();
374
	}
375
376
	// use with caution, this could allow multiple processes to delete
377
	// and send from the queue at the same time
378
	function force_checkin() {
379
		$this->delete_checkout_id();
380
	}
381
382
	// used to lock checkouts from the queue.
383
	// tries to wait up to $timeout seconds for the queue to be empty
384
	function lock( $timeout = 30 ) {
385
		$tries = 0;
386
387
		while ( $this->has_any_items() && $tries < $timeout ) {
388
			sleep( 1 );
389
			$tries += 1;
390
		}
391
392
		if ( $tries === 30 ) {
393
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
394
		}
395
396
		if ( $this->get_checkout_id() ) {
397
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
398
		}
399
400
		// hopefully this means we can acquire a checkout?
401
		$result = $this->set_checkout_id( 'lock' );
402
403
		if ( ! $result || is_wp_error( $result ) ) {
404
			return $result;
405
		}
406
407
		return true;
408
	}
409
410
	function unlock() {
411
		$this->delete_checkout_id();
412
	}
413
414
	private function get_checkout_id() {
415
		return get_transient( $this->get_checkout_transient_name() );
416
	}
417
418
	private function set_checkout_id( $checkout_id ) {
419
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5 * 60 ); // 5 minute timeout
420
	}
421
422
	private function delete_checkout_id() {
423
		delete_transient( $this->get_checkout_transient_name() );
424
	}
425
426
	private function get_checkout_transient_name() {
427
		return "jpsq_{$this->id}_checkout";
428
	}
429
430
	private function get_next_data_row_option_name() {
431
		// this option is specifically chosen to, as much as possible, preserve time order
432
		// and minimise the possibility of collisions between multiple processes working 
433
		// at the same time
434
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
435
		// @see: http://php.net/manual/en/function.microtime.php
436
		$timestamp = sprintf( '%.6f', microtime( true ) );
437
438
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
439
		if ( $this->row_iterator === PHP_INT_MAX ) {
440
			$this->row_iterator = 0;
441
		} else {
442
			$this->row_iterator += 1;
443
		}
444
445
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
446
	}
447
448
	private function fetch_items( $limit = null ) {
449
		global $wpdb;
450
451
		if ( $limit ) {
452
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
453
		} else {
454
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
455
		}
456
457
		$items = $wpdb->get_results( $query_sql, OBJECT );
458
		foreach ( $items as $item ) {
459
			$item->value = maybe_unserialize( $item->value );
460
		}
461
462
		return $items;
463
	}
464
465
	private function fetch_items_by_id( $item_ids ) {
466
		global $wpdb;
467
468
		if ( count( $item_ids ) > 0 ) {
469
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
470
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
471
			$items = $wpdb->get_results( $query, OBJECT );
472
			foreach ( $items as $item ) {
473
				$item->value = maybe_unserialize( $item->value );
474
			}
475
476
			return $items;
477
		} else {
478
			return array();
479
		}
480
	}
481
482
	private function validate_checkout( $buffer ) {
483
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
484
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
485
		}
486
487
		$checkout_id = $this->get_checkout_id();
488
489
		if ( ! $checkout_id ) {
490
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
491
		}
492
493
		if ( $checkout_id != $buffer->id ) {
494
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
495
		}
496
497
		return true;
498
	}
499
}
500
501
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
502
503
	static function get_item_values( $items ) {
504
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
505
	}
506
507
	static function get_item_ids( $items ) {
508
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
509
	}
510
511
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
512
		return $item->value;
513
	}
514
515
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
516
		return $item->id;
517
	}
518
}
519