Completed
Push — add/use-native-mysql-locks-whe... ( 2a0ea7...b223ed )
by
unknown
216:20 queued 206:39
created

Jetpack_Sync_Queue::peek()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nc 2
nop 1
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
require_once dirname( __FILE__ ) . '/class.jetpack-sync-settings.php';
4
5
/**
6
 * A buffer of items from the queue that can be checked out
7
 */
8
class Jetpack_Sync_Queue_Buffer {
9
	public $id;
10
	public $items_with_ids;
11
12
	public function __construct( $id, $items_with_ids ) {
13
		$this->id             = $id;
14
		$this->items_with_ids = $items_with_ids;
15
	}
16
17
	public function get_items() {
18
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
19
	}
20
21
	public function get_item_values() {
22
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
23
	}
24
25
	public function get_item_ids() {
26
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
27
	}
28
}
29
30
interface Jetpack_Sync_Lock {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
31
	const QUEUE_LOCK_TIMEOUT = 300; // 5 minutes
32
	public function acquire_lock( $wait = 0 );
33
	public function release_lock();
34
	public function reset();
35
}
36
37
class Jetpack_Sync_Lock_Transient implements Jetpack_Sync_Lock {
0 ignored issues
show
Coding Style Compatibility introduced by
Each interface must be in a file by itself

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
38
	private $id;
39
40
	function __construct( $id ) {
41
		$this->id = $id;
42
	}
43
44
	public function acquire_lock( $wait = 0 ) {
45
		if ( $this->get_lock_value() ) {
46
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
47
		}
48
49
		$result = $this->set_lock();
50
51
		if ( ! $result ) {
52
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
53
		} elseif ( is_wp_error( $result ) ) {
54
			return $result;
55
		}
56
57
		return true;
58
	}
59
60
	public function release_lock() {
61
		$checkout_id = $this->get_lock_value();
62
63
		if ( ! $checkout_id ) {
64
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
65
		}
66
67
		$this->delete_lock_value();
68
69
		return true;
70
	}
71
72
	public function reset() {
73
		$this->delete_lock_value();
74
	}
75
76
	private function get_lock_value() {
77
		return get_transient( $this->get_checkout_transient_name() );
78
	}
79
80
	private function set_lock() {
81
		return set_transient( $this->get_checkout_transient_name(), microtime( true ), self::QUEUE_LOCK_TIMEOUT ); // 5 minute timeout
82
	}
83
84
	private function delete_lock_value() {
85
		delete_transient( $this->get_checkout_transient_name() );
86
	}
87
88
	private function get_checkout_transient_name() {
89
		return "jpsq_{$this->id}_checkout";
90
	}
91
}
92
93
class Jetpack_Sync_Lock_MySQL {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
94
	private $id;
95
	private $named_lock_name;
96
	static $in_memory_lock; // in-process lock
0 ignored issues
show
Coding Style introduced by
The visibility should be declared for property $in_memory_lock.

The PSR-2 coding standard requires that all properties in a class have their visibility explicitly declared. If you declare a property using

class A {
    var $property;
}

the property is implicitly global.

To learn more about the PSR-2, please see the PHP-FIG site on the PSR-2.

Loading history...
97
98
	function __construct( $id ) {
99
		$this->id = $id;
100
		global $wpdb;
101
		// named lock names are GLOBAL PER SERVER so should be namespaced to this particular
102
		// wp installation, site and queue
103
		$this->named_lock_name = 'jpsq_' . $wpdb->prefix . $this->id;
104
		if ( strlen( $this->named_lock_name ) > 64 ) {
105
			error_log("Warning: Named lock key is > 64 chars: '{$this->named_lock_name}'");
106
		}
107
	}
108
109
	public function acquire_lock( $wait = 0 ) {
110
		// use mysql to prevent cross-process concurrent access, 
111
		// and in-memory lock to prevent in-process concurrent access
112
		if ( self::$in_memory_lock && ( microtime( true ) < self::$in_memory_lock + self::QUEUE_LOCK_TIMEOUT ) ) {
113
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
114
		}
115
116
		global $wpdb;
117
118
		$acquired_lock = $wpdb->get_var( $wpdb->prepare( "SELECT GET_LOCK( %s, %d )", $this->named_lock_name, $wait ) );
119
120
		if ( $acquired_lock == 0 ) {
121
			return new WP_Error( 'lock_failed', 'Another process has the lock' );
122
		}
123
124
		// lock memory and DB
125
		self::$in_memory_lock = microtime( true );
126
127
		return true;
128
	}
129
130
	public function release_lock( $buffer_id ) {
0 ignored issues
show
Unused Code introduced by
The parameter $buffer_id is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
131
		global $wpdb;
132
		$result = $wpdb->get_var( $wpdb->prepare( "SELECT RELEASE_LOCK( %s )", $this->named_lock_name ) );
133
134
		if ( $result != '1' ) {
135
			error_log("Warning: released lock that wasn't locked: {$this->named_lock_name}");
136
		}
137
138
		self::$in_memory_lock = null;
139
140
		return true;
141
	}
142
143
	public function reset() {
144
		$this->release_lock( 'anything' );
145
	}
146
}
147
148
/**
149
 * A persistent queue that can be flushed in increments of N items,
150
 * and which blocks reads until checked-out buffers are checked in or
151
 * closed. This uses raw SQL for two reasons: speed, and not triggering
152
 * tons of added_option callbacks.
153
 */
154
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
155
	public $id;
156
	private $row_iterator;
157
	private $lock;
158
159
	function __construct( $id ) {
160
		$this->id             = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
161
		$this->row_iterator   = 0;
162
163
		if ( (bool) Jetpack_Sync_Settings::get_setting( 'use_mysql_named_lock' ) ) {
164
			$this->lock = new Jetpack_Sync_Lock_MySQL( $this->id );
165
		} else {
166
			$this->lock = new Jetpack_Sync_Lock_Transient( $this->id );
167
		}
168
	}
169
170
	function add( $item ) {
171
		global $wpdb;
172
		$added = false;
173
		// this basically tries to add the option until enough time has elapsed that
174
		// it has a unique (microtime-based) option key
175
		while ( ! $added ) {
176
			$rows_added = $wpdb->query( $wpdb->prepare(
177
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
178
				$this->get_next_data_row_option_name(),
179
				serialize( $item ),
180
				'no'
181
			) );
182
			$added      = ( 0 !== $rows_added );
183
		}
184
185
		do_action( 'jpsq_item_added' );
186
	}
187
188
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
189
	function add_all( $items ) {
190
		global $wpdb;
191
		$base_option_name = $this->get_next_data_row_option_name();
192
193
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
194
195
		$rows = array();
196
197
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
198
			$option_name  = esc_sql( $base_option_name . '-' . $i );
199
			$option_value = esc_sql( serialize( $items[ $i ] ) );
200
			$rows[]       = "('$option_name', '$option_value', 'no')";
201
		}
202
203
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
204
205
		if ( count( $items ) === $rows_added ) {
206
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
207
		}
208
209
		do_action( 'jpsq_items_added', $rows_added );
210
	}
211
212
	// Peek at the front-most item on the queue without checking it out
213
	function peek( $count = 1 ) {
214
		$items = $this->fetch_items( $count );
215
		if ( $items ) {
216
			return Jetpack_Sync_Utils::get_item_values( $items );
217
		}
218
219
		return array();
220
	}
221
222
	// lag is the difference in time between the age of the oldest item
223
	// (aka first or frontmost item) and the current time
224
	function lag() {
225
		global $wpdb;
226
227
		$first_item_name = $wpdb->get_var( $wpdb->prepare(
228
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
229
			"jpsq_{$this->id}-%"
230
		) );
231
232
		if ( ! $first_item_name ) {
233
			return 0;
234
		}
235
236
		// break apart the item name to get the timestamp
237
		$matches = null;
238
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
239
			return microtime( true ) - floatval( $matches[1] );
240
		} else {
241
			return 0;
242
		}
243
	}
244
245
	function reset() {
246
		global $wpdb;
247
		$this->lock->reset();
248
		$wpdb->query( $wpdb->prepare(
249
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
250
		) );
251
	}
252
253
	function size() {
254
		global $wpdb;
255
256
		return (int) $wpdb->get_var( $wpdb->prepare(
257
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
258
		) );
259
	}
260
261
	// we use this peculiar implementation because it's much faster than count(*)
262
	function has_any_items() {
263
		global $wpdb;
264
		$value = $wpdb->get_var( $wpdb->prepare(
265
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
266
		) );
267
268
		return ( $value === '1' );
269
	}
270
271
	function checkout( $buffer_size ) {
272
		$buffer_id = uniqid();
273
274
		$lock_result = $this->acquire_lock( $buffer_id );
0 ignored issues
show
Unused Code introduced by
The call to Jetpack_Sync_Queue::acquire_lock() has too many arguments starting with $buffer_id.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
275
276
		if ( is_wp_error( $lock_result ) ) {
277
			return $lock_result;
278
		}
279
280
		$items = $this->fetch_items( $buffer_size );
281
282
		if ( count( $items ) === 0 ) {
283
			return false;
284
		}
285
286
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
287
288
		return $buffer;
289
	}
290
291
	// this checks out rows until it either empties the queue or hits a certain memory limit
292
	// it loads the sizes from the DB first so that it doesn't accidentally
293
	// load more data into memory than it needs to.
294
	// The only way it will load more items than $max_size is if a single queue item
295
	// exceeds the memory limit, but in that case it will send that item by itself.
296
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
297
		$lock_result = $this->acquire_lock();
298
		
299
		if ( is_wp_error( $lock_result ) ) {
300
			return $lock_result;
301
		}
302
303
		// get the map of buffer_id -> memory_size
304
		global $wpdb;
305
306
		$items_with_size = $wpdb->get_results(
307
			$wpdb->prepare(
308
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
309
				"jpsq_{$this->id}-%",
310
				$max_buffer_size
311
			),
312
			OBJECT
313
		);
314
315
		$total_memory = 0;
316
		$item_ids     = array();
317
318
		foreach ( $items_with_size as $item_with_size ) {
319
			$total_memory += $item_with_size->value_size;
320
321
			// if this is the first item and it exceeds memory, allow loop to continue
322
			// we will exit on the next iteration instead
323
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
324
				break;
325
			}
326
			$item_ids[] = $item_with_size->id;
327
		}
328
329
		$items = $this->fetch_items_by_id( $item_ids );
330
331
		if ( count( $items ) === 0 ) {
332
			$this->release_lock();
333
			return false;
334
		}
335
336
		$buffer = new Jetpack_Sync_Queue_Buffer( uniqid(), $items );
337
338
		return $buffer;
339
	}
340
341
	function checkin( $buffer ) {
0 ignored issues
show
Unused Code introduced by
The parameter $buffer is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
342
		return $this->release_lock();
343
	}
344
345
	function close( $buffer, $ids_to_remove = null ) {
346
		$released = $this->release_lock();
347
348
		if ( is_wp_error( $released ) ) {
349
			return $released;
350
		}
351
352
		// by default clear all items in the buffer
353
		if ( is_null( $ids_to_remove ) ) {
354
			$ids_to_remove = $buffer->get_item_ids();
355
		}
356
357
		global $wpdb;
358
359
		if ( count( $ids_to_remove ) > 0 ) {
360
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
361
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
362
			$wpdb->query( $query );
363
		}
364
365
		return true;
366
	}
367
368
	function flush_all() {
369
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
370
		$this->reset();
371
372
		return $items;
373
	}
374
375
	function get_all() {
376
		return $this->fetch_items();
377
	}
378
379
	// use with caution, this could allow multiple processes to delete
380
	// and send from the queue at the same time
381
	function force_checkin() {
382
		$this->release_lock();
383
	}
384
385
	// used to lock checkouts from the queue.
386
	// tries to wait up to $timeout seconds for the queue to be empty
387
	function lock( $timeout = 30 ) {
388
		$tries = 0;
389
390
		while ( $this->has_any_items() && $tries < $timeout ) {
391
			sleep( 1 );
392
			$tries += 1;
393
		}
394
395
		if ( $tries === 30 ) {
396
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
397
		}
398
399
		return $this->acquire_lock();
400
	}
401
402
	function unlock() {
403
		$this->release_lock();
404
	}
405
406
	private function acquire_lock() {
407
		return $this->lock->acquire_lock();
408
	}
409
410
	private function release_lock() {
411
		return $this->lock->release_lock();
0 ignored issues
show
Bug introduced by
The call to release_lock() misses a required argument $buffer_id.

This check looks for function calls that miss required arguments.

Loading history...
412
	}
413
414
	private function get_next_data_row_option_name() {
415
		// this option is specifically chosen to, as much as possible, preserve time order
416
		// and minimise the possibility of collisions between multiple processes working
417
		// at the same time
418
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
419
		// @see: http://php.net/manual/en/function.microtime.php
420
		$timestamp = sprintf( '%.6f', microtime( true ) );
421
422
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
423
		if ( $this->row_iterator === PHP_INT_MAX ) {
424
			$this->row_iterator = 0;
425
		} else {
426
			$this->row_iterator += 1;
427
		}
428
429
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
430
	}
431
432
	private function fetch_items( $limit = null ) {
433
		global $wpdb;
434
435
		if ( $limit ) {
436
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
437
		} else {
438
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
439
		}
440
441
		$items = $wpdb->get_results( $query_sql, OBJECT );
442
		foreach ( $items as $item ) {
443
			$item->value = maybe_unserialize( $item->value );
444
		}
445
446
		return $items;
447
	}
448
449
	private function fetch_items_by_id( $item_ids ) {
450
		global $wpdb;
451
452
		if ( count( $item_ids ) > 0 ) {
453
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
454
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
455
			$items = $wpdb->get_results( $query, OBJECT );
456
			foreach ( $items as $item ) {
457
				$item->value = maybe_unserialize( $item->value );
458
			}
459
460
			return $items;
461
		} else {
462
			return array();
463
		}
464
	}
465
}
466
467
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
468
469
	static function get_item_values( $items ) {
470
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
471
	}
472
473
	static function get_item_ids( $items ) {
474
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
475
	}
476
477
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
478
		return $item->value;
479
	}
480
481
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
482
		return $item->id;
483
	}
484
}
485