Completed
Push — add/stop-sync-filter ( 639cc9...16f5d3 )
by
unknown
17:45 queued 08:00
created

Jetpack_Sync_Queue::validate_checkout()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 9
nc 4
nop 1
dl 0
loc 17
rs 9.2
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
		$this->blacklist_names = array( 'jetpack_skipped_sync_post_ids', 'jetpack_skipped_sync_comment_ids' );
0 ignored issues
show
Bug introduced by
The property blacklist_names does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
42
	}
43
44
	function add( $item ) {
45
		/**
46
		 * Filters whether to skip adding an item to the sync que.
47
		 *
48
		 * Passing true to the filter will prevent the item to not be added to the que.
49
		 * $item = array( $action, $args, $user_id, $microtime );
50
		 *
51
		 * @since 4.1.0
52
		 *
53
		 * @see Jetpack_Sync_Queue->add()
54
		 */
55
		if ( apply_filters( 'jetpack_skip_sync_item', false, $item ) ) {
56
			$this->store_skipped_item( $item );
57
			return false;
58
		}
59
60
		$this->_add( $item );
61
	}
62
63
	private function _add( $item ) {
64
		global $wpdb;
65
		$added = false;
66
		// this basically tries to add the option until enough time has elapsed that
67
		// it has a unique (microtime-based) option key
68
		while ( ! $added ) {
69
			$rows_added = $wpdb->query( $wpdb->prepare(
70
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
71
				$this->get_next_data_row_option_name(),
72
				serialize( $item ),
73
				'no'
74
			) );
75
			$added      = ( $rows_added !== 0 );
76
		}
77
	}
78
79
	function store_skipped_item( $item ) {
80
		list( $action, $args ) = $item;
81
		switch( $action ) {
82
			case 'wp_insert_post':
83
				$this->store_skipped_id( $args[0], 'post' );
84
				break;
85
86
			case 'wp_insert_comment':
87
				$this->store_skipped_id( $args[0], 'comment' );
88
				break;
89
90
			case 'added_option':
91
			case 'updated_option':
92
			case 'deleted_option':
93
				if ( in_array( $args[0], $this->blacklist_names ) ) {
94
					// we have to make sure that the blacklist always gets synced
95
					$this->_add( $item );
96
				}
97
				break;
98
99
		}
100
	}
101
102
	function store_skipped_id( $post_id, $type = 'post' ) {
103
		$option_name = 'jetpack_skipped_sync_' . $type . '_ids';
104
		$blacklist = get_option( $option_name, array() );
105
		if ( ! in_array( $post_id, $blacklist ) ) {
106
			$new_blacklist = $blacklist;
107
			$new_blacklist[] = $post_id;
108
			update_option( $option_name, $new_blacklist );
109
		}
110
	}
111
112
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
113
	function add_all( $items ) {
114
		global $wpdb;
115
		$base_option_name = $this->get_next_data_row_option_name();
116
117
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
118
119
		$rows = array();
120
121
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
122
			if ( apply_filters( 'jetpack_skip_sync_item', false, $items[ $i ]) ) {
123
				continue;
124
			}
125
			$option_name  = esc_sql( $base_option_name . '-' . $i );
126
			$option_value = esc_sql( serialize( $items[ $i ] ) );
127
			$rows[]       = "('$option_name', '$option_value', 'no')";
128
		}
129
130
		if ( empty( $rows ) ) {
131
			return false;
132
		}
133
134
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
135
136
		if ( $rows_added !== count( $rows ) ) {
137
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
138
		}
139
	}
140
141
	// Peek at the front-most item on the queue without checking it out
142
	function peek( $count = 1 ) {
143
		$items = $this->fetch_items( $count );
144
		if ( $items ) {
145
			return Jetpack_Sync_Utils::get_item_values( $items );
146
		}
147
148
		return array();
149
	}
150
151
	// lag is the difference in time between the age of the oldest item and the current time
152
	function lag() {
153
		global $wpdb;
154
155
		$last_item_name = $wpdb->get_var( $wpdb->prepare(
156
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
157
			"jpsq_{$this->id}-%"
158
		) );
159
160
		if ( ! $last_item_name ) {
161
			return null;
162
		}
163
164
		// break apart the item name to get the timestamp
165
		$matches = null;
166
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $last_item_name, $matches ) ) {
167
			return microtime( true ) - floatval( $matches[1] );
168
		} else {
169
			return null;
170
		}
171
	}
172
173
	function reset() {
174
		global $wpdb;
175
		$this->delete_checkout_id();
176
		$wpdb->query( $wpdb->prepare(
177
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
178
		) );
179
	}
180
181
	function size() {
182
		global $wpdb;
183
184
		return (int) $wpdb->get_var( $wpdb->prepare(
185
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
186
		) );
187
	}
188
189
	// we use this peculiar implementation because it's much faster than count(*)
190
	function has_any_items() {
191
		global $wpdb;
192
		$value = $wpdb->get_var( $wpdb->prepare(
193
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
194
		) );
195
196
		return ( $value === "1" );
197
	}
198
199
	function checkout( $buffer_size ) {
200
		if ( $this->get_checkout_id() ) {
201
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
202
		}
203
204
		$buffer_id = uniqid();
205
206
		$result = $this->set_checkout_id( $buffer_id );
207
208
		if ( ! $result || is_wp_error( $result ) ) {
209
			return $result;
210
		}
211
212
		$items = $this->fetch_items( $buffer_size );
213
214
		if ( count( $items ) === 0 ) {
215
			return false;
216
		}
217
218
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
219
220
		return $buffer;
221
	}
222
223
	// this checks out rows until it either empties the queue or hits a certain memory limit
224
	// it loads the sizes from the DB first so that it doesn't accidentally
225
	// load more data into memory than it needs to.
226
	// The only way it will load more items than $max_size is if a single queue item 
227
	// exceeds the memory limit, but in that case it will send that item by itself.
228
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
229
		if ( $this->get_checkout_id() ) {
230
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
231
		}
232
233
		$buffer_id = uniqid();
234
235
		$result = $this->set_checkout_id( $buffer_id );
236
237
		if ( ! $result || is_wp_error( $result ) ) {
238
			return $result;
239
		}
240
241
		// get the map of buffer_id -> memory_size
242
		global $wpdb;
243
244
		$items_with_size = $wpdb->get_results(
245
			$wpdb->prepare(
246
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
247
				"jpsq_{$this->id}-%",
248
				$max_buffer_size
249
			),
250
			OBJECT
251
		);
252
253
		$total_memory = 0;
254
		$item_ids     = array();
255
256
		foreach ( $items_with_size as $item_with_size ) {
257
			$total_memory += $item_with_size->value_size;
258
259
			// if this is the first item and it exceeds memory, allow loop to continue
260
			// we will exit on the next iteration instead
261
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
262
				break;
263
			}
264
			$item_ids[] = $item_with_size->id;
265
		}
266
267
		$items = $this->fetch_items_by_id( $item_ids );
268
269
		if ( count( $items ) === 0 ) {
270
			$this->delete_checkout_id();
271
272
			return false;
273
		}
274
275
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
276
277
		return $buffer;
278
	}
279
280
	function checkin( $buffer ) {
281
		$is_valid = $this->validate_checkout( $buffer );
282
283
		if ( is_wp_error( $is_valid ) ) {
284
			return $is_valid;
285
		}
286
287
		$this->delete_checkout_id();
288
289
		return true;
290
	}
291
292
	function close( $buffer, $ids_to_remove = null ) {
293
		$is_valid = $this->validate_checkout( $buffer );
294
295
		if ( is_wp_error( $is_valid ) ) {
296
			return $is_valid;
297
		}
298
299
		$this->delete_checkout_id();
300
301
		// by default clear all items in the buffer
302
		if ( is_null( $ids_to_remove ) ) {
303
			$ids_to_remove = $buffer->get_item_ids();
304
		}
305
306
		global $wpdb;
307
308
		if ( count( $ids_to_remove ) > 0 ) {
309
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
310
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
311
			$wpdb->query( $query );
312
		}
313
314
		return true;
315
	}
316
317
	function flush_all() {
318
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
319
		$this->reset();
320
321
		return $items;
322
	}
323
324
	function get_all() {
325
		return $this->fetch_items();
326
	}
327
328
	// use with caution, this could allow multiple processes to delete
329
	// and send from the queue at the same time
330
	function force_checkin() {
331
		$this->delete_checkout_id();
332
	}
333
334
	// used to lock checkouts from the queue.
335
	// tries to wait up to $timeout seconds for the queue to be empty
336
	function lock( $timeout = 30 ) {
337
		$tries = 0;
338
339
		while ( $this->has_any_items() && $tries < $timeout ) {
340
			sleep( 1 );
341
			$tries += 1;
342
		}
343
344
		if ( $tries === 30 ) {
345
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
346
		}
347
348
		if ( $this->get_checkout_id() ) {
349
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
350
		}
351
352
		// hopefully this means we can acquire a checkout?
353
		$result = $this->set_checkout_id( 'lock' );
354
355
		if ( ! $result || is_wp_error( $result ) ) {
356
			return $result;
357
		}
358
359
		return true;
360
	}
361
362
	function unlock() {
363
		$this->delete_checkout_id();
364
	}
365
366
	private function get_checkout_id() {
367
		return get_transient( $this->get_checkout_transient_name() );
368
	}
369
370
	private function set_checkout_id( $checkout_id ) {
371
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5 * 60 ); // 5 minute timeout
372
	}
373
374
	private function delete_checkout_id() {
375
		delete_transient( $this->get_checkout_transient_name() );
376
	}
377
378
	private function get_checkout_transient_name() {
379
		return "jpsq_{$this->id}_checkout";
380
	}
381
382
	private function get_next_data_row_option_name() {
383
		// this option is specifically chosen to, as much as possible, preserve time order
384
		// and minimise the possibility of collisions between multiple processes working 
385
		// at the same time
386
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
387
		// @see: http://php.net/manual/en/function.microtime.php
388
		$timestamp = sprintf( '%.6f', microtime( true ) );
389
390
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
391
		if ( $this->row_iterator === PHP_INT_MAX ) {
392
			$this->row_iterator = 0;
393
		} else {
394
			$this->row_iterator += 1;
395
		}
396
397
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
398
	}
399
400
	private function fetch_items( $limit = null ) {
401
		global $wpdb;
402
403
		if ( $limit ) {
404
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
405
		} else {
406
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
407
		}
408
409
		$items = $wpdb->get_results( $query_sql, OBJECT );
410
		foreach ( $items as $item ) {
411
			$item->value = maybe_unserialize( $item->value );
412
		}
413
414
		return $items;
415
	}
416
417
	private function fetch_items_by_id( $item_ids ) {
418
		global $wpdb;
419
420
		if ( count( $item_ids ) > 0 ) {
421
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
422
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
423
			$items = $wpdb->get_results( $query, OBJECT );
424
			foreach ( $items as $item ) {
425
				$item->value = maybe_unserialize( $item->value );
426
			}
427
428
			return $items;
429
		} else {
430
			return array();
431
		}
432
	}
433
434
	private function validate_checkout( $buffer ) {
435
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
436
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
437
		}
438
439
		$checkout_id = $this->get_checkout_id();
440
441
		if ( ! $checkout_id ) {
442
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
443
		}
444
445
		if ( $checkout_id != $buffer->id ) {
446
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
447
		}
448
449
		return true;
450
	}
451
}
452
453
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
454
455
	static function get_item_values( $items ) {
456
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
457
	}
458
459
	static function get_item_ids( $items ) {
460
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
461
	}
462
463
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
464
		return $item->value;
465
	}
466
467
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
468
		return $item->id;
469
	}
470
}
471