Completed
Push — updated/check-for-simple-xml ( 9b6a47 )
by
unknown
10:14
created

Jetpack_Sync_Queue::fetch_items()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 16
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nc 4
nop 1
dl 0
loc 16
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
		$this->random_int = mt_rand( 1, 1000000 );
0 ignored issues
show
Bug introduced by
The property random_int does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
42
	}
43
44
	function add( $item ) {
45
		global $wpdb;
46
		$added = false;
47
		// this basically tries to add the option until enough time has elapsed that
48
		// it has a unique (microtime-based) option key
49
		while ( ! $added ) {
50
			$rows_added = $wpdb->query( $wpdb->prepare(
51
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
52
				$this->get_next_data_row_option_name(),
53
				$this->serialize( $item ),
54
				'no'
55
			) );
56
			$added      = ( 0 !== $rows_added );
57
		}
58
	}
59
60
	function serialize( $item ) {
61
		if ( $item instanceof SimpleXMLElement ) {
62
			$item = $item->asXML();
63
		}
64
		return serialize( $item );
65
	}
66
67
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
68
	function add_all( $items ) {
69
		global $wpdb;
70
		$base_option_name = $this->get_next_data_row_option_name();
71
72
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
73
74
		$rows = array();
75
76
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
77
			$option_name  = esc_sql( $base_option_name . '-' . $i );
78
			$option_value = esc_sql( $this->serialize( $items[ $i ] ) );
79
			$rows[]       = "('$option_name', '$option_value', 'no')";
80
		}
81
82
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
83
84
		if ( count( $items ) === $rows_added ) {
85
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
86
		}
87
	}
88
89
	// Peek at the front-most item on the queue without checking it out
90
	function peek( $count = 1 ) {
91
		$items = $this->fetch_items( $count );
92
		if ( $items ) {
93
			return Jetpack_Sync_Utils::get_item_values( $items );
94
		}
95
96
		return array();
97
	}
98
99
	// lag is the difference in time between the age of the oldest item
100
	// (aka first or frontmost item) and the current time
101
	function lag() {
102
		global $wpdb;
103
104
		$first_item_name = $wpdb->get_var( $wpdb->prepare(
105
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
106
			"jpsq_{$this->id}-%"
107
		) );
108
109
		if ( ! $first_item_name ) {
110
			return 0;
111
		}
112
113
		// break apart the item name to get the timestamp
114
		$matches = null;
115
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
116
			return microtime( true ) - floatval( $matches[1] );
117
		} else {
118
			return 0;
119
		}
120
	}
121
122
	function reset() {
123
		global $wpdb;
124
		$this->delete_checkout_id();
125
		$wpdb->query( $wpdb->prepare(
126
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
127
		) );
128
	}
129
130
	function size() {
131
		global $wpdb;
132
133
		return (int) $wpdb->get_var( $wpdb->prepare(
134
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
135
		) );
136
	}
137
138
	// we use this peculiar implementation because it's much faster than count(*)
139
	function has_any_items() {
140
		global $wpdb;
141
		$value = $wpdb->get_var( $wpdb->prepare(
142
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
143
		) );
144
145
		return ( $value === '1' );
146
	}
147
148
	function checkout( $buffer_size ) {
149
		if ( $this->get_checkout_id() ) {
150
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
151
		}
152
153
		$buffer_id = uniqid();
154
155
		$result = $this->set_checkout_id( $buffer_id );
156
157
		if ( ! $result || is_wp_error( $result ) ) {
158
			return $result;
159
		}
160
161
		$items = $this->fetch_items( $buffer_size );
162
163
		if ( count( $items ) === 0 ) {
164
			return false;
165
		}
166
167
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
168
169
		return $buffer;
170
	}
171
172
	// this checks out rows until it either empties the queue or hits a certain memory limit
173
	// it loads the sizes from the DB first so that it doesn't accidentally
174
	// load more data into memory than it needs to.
175
	// The only way it will load more items than $max_size is if a single queue item
176
	// exceeds the memory limit, but in that case it will send that item by itself.
177
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
178
		if ( $this->get_checkout_id() ) {
179
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
180
		}
181
182
		$buffer_id = uniqid();
183
184
		$result = $this->set_checkout_id( $buffer_id );
185
186
		if ( ! $result || is_wp_error( $result ) ) {
187
			return $result;
188
		}
189
190
		// get the map of buffer_id -> memory_size
191
		global $wpdb;
192
193
		$items_with_size = $wpdb->get_results(
194
			$wpdb->prepare(
195
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
196
				"jpsq_{$this->id}-%",
197
				$max_buffer_size
198
			),
199
			OBJECT
200
		);
201
202
		if ( count( $items_with_size ) === 0 ) {
203
			return false;
204
		}
205
206
		$total_memory = 0;
207
208
		$min_item_id = $max_item_id = $items_with_size[0]->id;
209
210
		foreach ( $items_with_size as $id => $item_with_size ) {
211
			$total_memory += $item_with_size->value_size;
212
213
			// if this is the first item and it exceeds memory, allow loop to continue
214
			// we will exit on the next iteration instead
215
			if ( $total_memory > $max_memory && $id > 0 ) {
216
				break;
217
			}
218
219
			$max_item_id = $item_with_size->id;
220
		}
221
222
		$query = $wpdb->prepare( 
223
			"SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",
224
			$min_item_id,
225
			$max_item_id
226
		);
227
228
		$items = $wpdb->get_results( $query, OBJECT );
229
		foreach ( $items as $item ) {
230
			$item->value = maybe_unserialize( $item->value );
231
		}
232
233
		if ( count( $items ) === 0 ) {
234
			$this->delete_checkout_id();
235
236
			return false;
237
		}
238
239
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
240
241
		return $buffer;
242
	}
243
244
	function checkin( $buffer ) {
245
		$is_valid = $this->validate_checkout( $buffer );
246
247
		if ( is_wp_error( $is_valid ) ) {
248
			return $is_valid;
249
		}
250
251
		$this->delete_checkout_id();
252
253
		return true;
254
	}
255
256
	function close( $buffer, $ids_to_remove = null ) {
257
		$is_valid = $this->validate_checkout( $buffer );
258
259
		if ( is_wp_error( $is_valid ) ) {
260
			return $is_valid;
261
		}
262
263
		$this->delete_checkout_id();
264
265
		// by default clear all items in the buffer
266
		if ( is_null( $ids_to_remove ) ) {
267
			$ids_to_remove = $buffer->get_item_ids();
268
		}
269
270
		global $wpdb;
271
272
		if ( count( $ids_to_remove ) > 0 ) {
273
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
274
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
275
			$wpdb->query( $query );
276
		}
277
278
		return true;
279
	}
280
281
	function flush_all() {
282
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
283
		$this->reset();
284
285
		return $items;
286
	}
287
288
	function get_all() {
289
		return $this->fetch_items();
290
	}
291
292
	// use with caution, this could allow multiple processes to delete
293
	// and send from the queue at the same time
294
	function force_checkin() {
295
		$this->delete_checkout_id();
296
	}
297
298
	// used to lock checkouts from the queue.
299
	// tries to wait up to $timeout seconds for the queue to be empty
300
	function lock( $timeout = 30 ) {
301
		$tries = 0;
302
303
		while ( $this->has_any_items() && $tries < $timeout ) {
304
			sleep( 1 );
305
			$tries += 1;
306
		}
307
308
		if ( $tries === 30 ) {
309
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
310
		}
311
312
		if ( $this->get_checkout_id() ) {
313
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
314
		}
315
316
		// hopefully this means we can acquire a checkout?
317
		$result = $this->set_checkout_id( 'lock' );
318
319
		if ( ! $result || is_wp_error( $result ) ) {
320
			return $result;
321
		}
322
323
		return true;
324
	}
325
326
	function unlock() {
327
		return $this->delete_checkout_id();
328
	}
329
330
	private function get_checkout_id() {
331
		global $wpdb;
332
		$checkout_value = $wpdb->get_var( 
333
			$wpdb->prepare(
334
				"SELECT option_value FROM $wpdb->options WHERE option_name = %s", 
335
				$this->get_lock_option_name()
336
			)
337
		);
338
339
		if ( $checkout_value ) {
340
			list( $checkout_id, $timestamp ) = explode( ':', $checkout_value );
341
			if ( intval( $timestamp ) > time() ) {
342
				return $checkout_id;
343
			}
344
		}
345
346
		return false;
347
	}
348
349
	private function set_checkout_id( $checkout_id ) {
350
		global $wpdb;
351
352
		$expires = time() + Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout;
0 ignored issues
show
Bug introduced by
The property default_sync_queue_lock_timeout cannot be accessed from this context as it is declared private in class Jetpack_Sync_Defaults.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
353
		$updated_num = $wpdb->query(
354
			$wpdb->prepare(
355
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", 
356
				"$checkout_id:$expires",
357
				$this->get_lock_option_name()
358
			)
359
		);
360
361
		if ( ! $updated_num ) {
362
			$updated_num = $wpdb->query(
363
				$wpdb->prepare(
364
					"INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )", 
365
					$this->get_lock_option_name(),
366
					"$checkout_id:$expires"
367
				)
368
			);
369
		}
370
371
		return $updated_num;
372
	}
373
374
	private function delete_checkout_id() {
375
		global $wpdb;
376
		// rather than delete, which causes fragmentation, we update in place
377
		return $wpdb->query(
378
			$wpdb->prepare( 
379
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", 
380
				"0:0",
381
				$this->get_lock_option_name() 
382
			) 
383
		);
384
385
	}
386
387
	private function get_lock_option_name() {
388
		return "jpsq_{$this->id}_checkout";
389
	}
390
391
	private function get_next_data_row_option_name() {
392
		// this option is specifically chosen to, as much as possible, preserve time order
393
		// and minimise the possibility of collisions between multiple processes working
394
		// at the same time
395
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
396
		// @see: http://php.net/manual/en/function.microtime.php
397
		$timestamp = sprintf( '%.6f', microtime( true ) );
398
399
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
400
		if ( $this->row_iterator === PHP_INT_MAX ) {
401
			$this->row_iterator = 0;
402
		} else {
403
			$this->row_iterator += 1;
404
		}
405
406
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
407
	}
408
409
	private function fetch_items( $limit = null ) {
410
		global $wpdb;
411
412
		if ( $limit ) {
413
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
414
		} else {
415
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
416
		}
417
418
		$items = $wpdb->get_results( $query_sql, OBJECT );
419
		foreach ( $items as $item ) {
420
			$item->value = maybe_unserialize( $item->value );
421
		}
422
423
		return $items;
424
	}
425
426
	private function validate_checkout( $buffer ) {
427
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
428
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
429
		}
430
431
		$checkout_id = $this->get_checkout_id();
432
433
		if ( ! $checkout_id ) {
434
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
435
		}
436
437
		if ( $checkout_id != $buffer->id ) {
438
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
439
		}
440
441
		return true;
442
	}
443
}
444
445
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
446
447
	static function get_item_values( $items ) {
448
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
449
	}
450
451
	static function get_item_ids( $items ) {
452
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
453
	}
454
455
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
456
		return $item->value;
457
	}
458
459
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
460
		return $item->id;
461
	}
462
}
463