Completed
Push — add/use-native-mysql-locks-whe... ( 9e3d6a...2a0ea7 )
by
unknown
216:49 queued 208:02
created

Jetpack_Sync_Queue::flush_all()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 0
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
require_once dirname( __FILE__ ) . '/class.jetpack-sync-settings.php';
4
5
/**
6
 * A buffer of items from the queue that can be checked out
7
 */
8
class Jetpack_Sync_Queue_Buffer {
9
	public $id;
10
	public $items_with_ids;
11
12
	public function __construct( $id, $items_with_ids ) {
13
		$this->id             = $id;
14
		$this->items_with_ids = $items_with_ids;
15
	}
16
17
	public function get_items() {
18
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
19
	}
20
21
	public function get_item_values() {
22
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
23
	}
24
25
	public function get_item_ids() {
26
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
27
	}
28
}
29
30
/**
31
 * A persistent queue that can be flushed in increments of N items,
32
 * and which blocks reads until checked-out buffers are checked in or
33
 * closed. This uses raw SQL for two reasons: speed, and not triggering
34
 * tons of added_option callbacks.
35
 */
36
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
37
	const QUEUE_LOCK_TIMEOUT = 300; // 5 minutes
38
	public $id;
39
	private $row_iterator;
40
	private $use_named_lock;
41
	private $named_lock_name;
42
43
	static $in_memory_lock; // in-process lock
0 ignored issues
show
Coding Style introduced by
The visibility should be declared for property $in_memory_lock.

The PSR-2 coding standard requires that all properties in a class have their visibility explicitly declared. If you declare a property using

class A {
    var $property;
}

the property is implicitly global.

To learn more about the PSR-2, please see the PHP-FIG site on the PSR-2.

Loading history...
44
45
	function __construct( $id ) {
46
		$this->id             = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
47
		$this->row_iterator   = 0;
48
		$this->use_named_lock = (bool) Jetpack_Sync_Settings::get_setting( 'use_mysql_named_lock' );
49
50
		if ( $this->use_named_lock ) {
51
			global $wpdb;
52
			// named lock names are GLOBAL PER SERVER so should be namespaced to this particular
53
			// wp installation, site and queue
54
			$this->named_lock_name = 'jpsq_' . $wpdb->prefix . $this->id;
55
			if ( strlen( $this->named_lock_name ) > 64 ) {
56
				error_log("Warning: Named lock key is > 64 chars: '{$this->named_lock_name}'");
57
			}
58
		}
59
	}
60
61
	function add( $item ) {
62
		global $wpdb;
63
		$added = false;
64
		// this basically tries to add the option until enough time has elapsed that
65
		// it has a unique (microtime-based) option key
66
		while ( ! $added ) {
67
			$rows_added = $wpdb->query( $wpdb->prepare(
68
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
69
				$this->get_next_data_row_option_name(),
70
				serialize( $item ),
71
				'no'
72
			) );
73
			$added      = ( 0 !== $rows_added );
74
		}
75
76
		do_action( 'jpsq_item_added' );
77
	}
78
79
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
80
	function add_all( $items ) {
81
		global $wpdb;
82
		$base_option_name = $this->get_next_data_row_option_name();
83
84
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
85
86
		$rows = array();
87
88
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
89
			$option_name  = esc_sql( $base_option_name . '-' . $i );
90
			$option_value = esc_sql( serialize( $items[ $i ] ) );
91
			$rows[]       = "('$option_name', '$option_value', 'no')";
92
		}
93
94
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
95
96
		if ( count( $items ) === $rows_added ) {
97
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
98
		}
99
100
		do_action( 'jpsq_items_added', $rows_added );
101
	}
102
103
	// Peek at the front-most item on the queue without checking it out
104
	function peek( $count = 1 ) {
105
		$items = $this->fetch_items( $count );
106
		if ( $items ) {
107
			return Jetpack_Sync_Utils::get_item_values( $items );
108
		}
109
110
		return array();
111
	}
112
113
	// lag is the difference in time between the age of the oldest item
114
	// (aka first or frontmost item) and the current time
115
	function lag() {
116
		global $wpdb;
117
118
		$first_item_name = $wpdb->get_var( $wpdb->prepare(
119
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
120
			"jpsq_{$this->id}-%"
121
		) );
122
123
		if ( ! $first_item_name ) {
124
			return 0;
125
		}
126
127
		// break apart the item name to get the timestamp
128
		$matches = null;
129
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
130
			return microtime( true ) - floatval( $matches[1] );
131
		} else {
132
			return 0;
133
		}
134
	}
135
136
	function reset() {
137
		global $wpdb;
138
		$this->delete_checkout_id();
139
		$wpdb->query( $wpdb->prepare(
140
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
141
		) );
142
	}
143
144
	function size() {
145
		global $wpdb;
146
147
		return (int) $wpdb->get_var( $wpdb->prepare(
148
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
149
		) );
150
	}
151
152
	// we use this peculiar implementation because it's much faster than count(*)
153
	function has_any_items() {
154
		global $wpdb;
155
		$value = $wpdb->get_var( $wpdb->prepare(
156
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
157
		) );
158
159
		return ( $value === '1' );
160
	}
161
162
	function checkout( $buffer_size ) {
163
		$buffer_id = uniqid();
164
165
		$lock_result = $this->acquire_lock( $buffer_id );
166
167
		if ( is_wp_error( $lock_result ) ) {
168
			return $lock_result;
169
		}
170
171
		$items = $this->fetch_items( $buffer_size );
172
173
		if ( count( $items ) === 0 ) {
174
			return false;
175
		}
176
177
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
178
179
		return $buffer;
180
	}
181
182
	// this checks out rows until it either empties the queue or hits a certain memory limit
183
	// it loads the sizes from the DB first so that it doesn't accidentally
184
	// load more data into memory than it needs to.
185
	// The only way it will load more items than $max_size is if a single queue item
186
	// exceeds the memory limit, but in that case it will send that item by itself.
187
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
188
		$buffer_id = uniqid();
189
190
		$lock_result = $this->acquire_lock( $buffer_id );
191
		
192
		if ( is_wp_error( $lock_result ) ) {
193
			return $lock_result;
194
		}
195
196
		// get the map of buffer_id -> memory_size
197
		global $wpdb;
198
199
		$items_with_size = $wpdb->get_results(
200
			$wpdb->prepare(
201
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
202
				"jpsq_{$this->id}-%",
203
				$max_buffer_size
204
			),
205
			OBJECT
206
		);
207
208
		$total_memory = 0;
209
		$item_ids     = array();
210
211
		foreach ( $items_with_size as $item_with_size ) {
212
			$total_memory += $item_with_size->value_size;
213
214
			// if this is the first item and it exceeds memory, allow loop to continue
215
			// we will exit on the next iteration instead
216
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
217
				break;
218
			}
219
			$item_ids[] = $item_with_size->id;
220
		}
221
222
		$items = $this->fetch_items_by_id( $item_ids );
223
224
		if ( count( $items ) === 0 ) {
225
			$this->delete_checkout_id();
226
227
			return false;
228
		}
229
230
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
231
232
		return $buffer;
233
	}
234
235
	function checkin( $buffer ) {
236
		$released = $this->release_lock( $buffer );
237
238
		if ( is_wp_error( $released ) ) {
239
			return $released;
240
		}
241
242
		return true;
243
	}
244
245
	function close( $buffer, $ids_to_remove = null ) {
246
		$released = $this->release_lock( $buffer );
247
248
		if ( is_wp_error( $released ) ) {
249
			return $released;
250
		}
251
252
		// by default clear all items in the buffer
253
		if ( is_null( $ids_to_remove ) ) {
254
			$ids_to_remove = $buffer->get_item_ids();
255
		}
256
257
		global $wpdb;
258
259
		if ( count( $ids_to_remove ) > 0 ) {
260
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
261
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
262
			$wpdb->query( $query );
263
		}
264
265
		return true;
266
	}
267
268
	function flush_all() {
269
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
270
		$this->reset();
271
272
		return $items;
273
	}
274
275
	function get_all() {
276
		return $this->fetch_items();
277
	}
278
279
	// use with caution, this could allow multiple processes to delete
280
	// and send from the queue at the same time
281
	function force_checkin() {
282
		$this->delete_checkout_id();
283
	}
284
285
	// used to lock checkouts from the queue.
286
	// tries to wait up to $timeout seconds for the queue to be empty
287
	function lock( $timeout = 30 ) {
288
		$tries = 0;
289
290
		while ( $this->has_any_items() && $tries < $timeout ) {
291
			sleep( 1 );
292
			$tries += 1;
293
		}
294
295
		if ( $tries === 30 ) {
296
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
297
		}
298
299
		$lock_result = $this->acquire_lock( 'lock' );
300
301
		if ( is_wp_error( $lock_result ) ) {
302
			return $lock_result;
303
		}
304
305
		return true;
306
	}
307
308
	function unlock() {
309
		$this->delete_checkout_id();
310
	}
311
312
	private function acquire_lock( $buffer_id ) {
313
		if ( $this->use_named_lock ) {
314
			$lockval = self::$in_memory_lock;
0 ignored issues
show
Unused Code introduced by
$lockval is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
315
			
316
			// use mysql to prevent cross-process concurrent access, 
317
			// and in-memory lock to prevent in-process concurrent access
318
			if ( self::$in_memory_lock && ( microtime( true ) < self::$in_memory_lock + self::QUEUE_LOCK_TIMEOUT ) ) {
319
				return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
320
			}
321
322
			global $wpdb;
323
324
			$acquired_lock = $wpdb->get_var( $wpdb->prepare( "SELECT GET_LOCK( %s, 0 )", $this->named_lock_name ) );
325
326
			if ( $acquired_lock == 0 ) {
327
				return new WP_Error( 'lock_failed', 'Another process has the lock' );
328
			}
329
330
			// lock memory and DB
331
			self::$in_memory_lock = microtime( true );
332
333
			return true;
334
		} else {
335
			if ( $this->get_checkout_id() ) {
336
				return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
337
			}
338
339
			$result = $this->set_checkout_id( $buffer_id );
340
341
			if ( ! $result ) {
342
				return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
343
			} elseif ( is_wp_error( $result ) ) {
344
				return $result;
345
			}
346
347
			return true;
348
		}
349
	}
350
351
	private function release_lock( $buffer ) {
352
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
353
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
354
		}
355
356
		if ( $this->use_named_lock ) {
357
			// TODO
0 ignored issues
show
Coding Style introduced by
Comment refers to a TODO task

This check looks TODO comments that have been left in the code.

``TODO``s show that something is left unfinished and should be attended to.

Loading history...
358
			global $wpdb;
359
			$result = $wpdb->get_var( $wpdb->prepare( "SELECT RELEASE_LOCK( %s )", $this->named_lock_name ) );
360
361
			if ( $result != '1' ) {
362
				error_log("Warning: released lock that wasn't locked: {$this->named_lock_name}");
363
			}
364
365
			self::$in_memory_lock = null;
366
		} else {
367
			$checkout_id = $this->get_checkout_id();
368
369
			if ( ! $checkout_id ) {
370
				return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
371
			}
372
373
			if ( $checkout_id != $buffer->id ) {
374
				return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
375
			}
376
377
			$this->delete_checkout_id();
378
		}
379
380
		return true;
381
	}
382
383
	private function get_checkout_id() {
384
		return get_transient( $this->get_checkout_transient_name() );
385
	}
386
387
	private function set_checkout_id( $checkout_id ) {
388
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, self::QUEUE_LOCK_TIMEOUT ); // 5 minute timeout
389
	}
390
391
	private function delete_checkout_id() {
392
		delete_transient( $this->get_checkout_transient_name() );
393
	}
394
395
	private function get_checkout_transient_name() {
396
		return "jpsq_{$this->id}_checkout";
397
	}
398
399
	private function get_next_data_row_option_name() {
400
		// this option is specifically chosen to, as much as possible, preserve time order
401
		// and minimise the possibility of collisions between multiple processes working
402
		// at the same time
403
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
404
		// @see: http://php.net/manual/en/function.microtime.php
405
		$timestamp = sprintf( '%.6f', microtime( true ) );
406
407
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
408
		if ( $this->row_iterator === PHP_INT_MAX ) {
409
			$this->row_iterator = 0;
410
		} else {
411
			$this->row_iterator += 1;
412
		}
413
414
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
415
	}
416
417
	private function fetch_items( $limit = null ) {
418
		global $wpdb;
419
420
		if ( $limit ) {
421
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
422
		} else {
423
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
424
		}
425
426
		$items = $wpdb->get_results( $query_sql, OBJECT );
427
		foreach ( $items as $item ) {
428
			$item->value = maybe_unserialize( $item->value );
429
		}
430
431
		return $items;
432
	}
433
434
	private function fetch_items_by_id( $item_ids ) {
435
		global $wpdb;
436
437
		if ( count( $item_ids ) > 0 ) {
438
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
439
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
440
			$items = $wpdb->get_results( $query, OBJECT );
441
			foreach ( $items as $item ) {
442
				$item->value = maybe_unserialize( $item->value );
443
			}
444
445
			return $items;
446
		} else {
447
			return array();
448
		}
449
	}
450
}
451
452
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
453
454
	static function get_item_values( $items ) {
455
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
456
	}
457
458
	static function get_item_ids( $items ) {
459
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
460
	}
461
462
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
463
		return $item->value;
464
	}
465
466
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
467
		return $item->id;
468
	}
469
}
470