Completed
Push — add/use-native-mysql-locks-whe... ( be4675...9e3d6a )
by
unknown
160:01 queued 150:59
created

Jetpack_Sync_Queue::acquire_lock()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 19
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
eloc 11
nc 5
nop 1
dl 0
loc 19
rs 8.8571
c 1
b 0
f 0
1
<?php
2
3
require_once dirname( __FILE__ ) . '/class.jetpack-sync-settings.php';
4
5
/**
6
 * A buffer of items from the queue that can be checked out
7
 */
8
class Jetpack_Sync_Queue_Buffer {
9
	public $id;
10
	public $items_with_ids;
11
12
	public function __construct( $id, $items_with_ids ) {
13
		$this->id             = $id;
14
		$this->items_with_ids = $items_with_ids;
15
	}
16
17
	public function get_items() {
18
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
19
	}
20
21
	public function get_item_values() {
22
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
23
	}
24
25
	public function get_item_ids() {
26
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
27
	}
28
}
29
30
/**
31
 * A persistent queue that can be flushed in increments of N items,
32
 * and which blocks reads until checked-out buffers are checked in or
33
 * closed. This uses raw SQL for two reasons: speed, and not triggering
34
 * tons of added_option callbacks.
35
 */
36
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
37
	public $id;
38
	private $row_iterator;
39
	private $use_named_lock;
40
41
	function __construct( $id ) {
42
		$this->id             = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
43
		$this->row_iterator   = 0;
44
		$this->use_named_lock = (bool) Jetpack_Sync_Settings::get_setting( 'use_mysql_named_lock' );
45
	}
46
47
	function add( $item ) {
48
		global $wpdb;
49
		$added = false;
50
		// this basically tries to add the option until enough time has elapsed that
51
		// it has a unique (microtime-based) option key
52
		while ( ! $added ) {
53
			$rows_added = $wpdb->query( $wpdb->prepare(
54
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
55
				$this->get_next_data_row_option_name(),
56
				serialize( $item ),
57
				'no'
58
			) );
59
			$added      = ( 0 !== $rows_added );
60
		}
61
62
		do_action( 'jpsq_item_added' );
63
	}
64
65
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
66
	function add_all( $items ) {
67
		global $wpdb;
68
		$base_option_name = $this->get_next_data_row_option_name();
69
70
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
71
72
		$rows = array();
73
74
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
75
			$option_name  = esc_sql( $base_option_name . '-' . $i );
76
			$option_value = esc_sql( serialize( $items[ $i ] ) );
77
			$rows[]       = "('$option_name', '$option_value', 'no')";
78
		}
79
80
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
81
82
		if ( count( $items ) === $rows_added ) {
83
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
84
		}
85
86
		do_action( 'jpsq_items_added', $rows_added );
87
	}
88
89
	// Peek at the front-most item on the queue without checking it out
90
	function peek( $count = 1 ) {
91
		$items = $this->fetch_items( $count );
92
		if ( $items ) {
93
			return Jetpack_Sync_Utils::get_item_values( $items );
94
		}
95
96
		return array();
97
	}
98
99
	// lag is the difference in time between the age of the oldest item
100
	// (aka first or frontmost item) and the current time
101
	function lag() {
102
		global $wpdb;
103
104
		$first_item_name = $wpdb->get_var( $wpdb->prepare(
105
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
106
			"jpsq_{$this->id}-%"
107
		) );
108
109
		if ( ! $first_item_name ) {
110
			return 0;
111
		}
112
113
		// break apart the item name to get the timestamp
114
		$matches = null;
115
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
116
			return microtime( true ) - floatval( $matches[1] );
117
		} else {
118
			return 0;
119
		}
120
	}
121
122
	function reset() {
123
		global $wpdb;
124
		$this->delete_checkout_id();
125
		$wpdb->query( $wpdb->prepare(
126
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
127
		) );
128
	}
129
130
	function size() {
131
		global $wpdb;
132
133
		return (int) $wpdb->get_var( $wpdb->prepare(
134
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
135
		) );
136
	}
137
138
	// we use this peculiar implementation because it's much faster than count(*)
139
	function has_any_items() {
140
		global $wpdb;
141
		$value = $wpdb->get_var( $wpdb->prepare(
142
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
143
		) );
144
145
		return ( $value === '1' );
146
	}
147
148
	function checkout( $buffer_size ) {
149
		$buffer_id = uniqid();
150
151
		$lock_result = $this->acquire_lock( $buffer_id );
152
153
		if ( is_wp_error( $lock_result ) ) {
154
			return $lock_result;
155
		}
156
157
		$items = $this->fetch_items( $buffer_size );
158
159
		if ( count( $items ) === 0 ) {
160
			return false;
161
		}
162
163
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
164
165
		return $buffer;
166
	}
167
168
	// this checks out rows until it either empties the queue or hits a certain memory limit
169
	// it loads the sizes from the DB first so that it doesn't accidentally
170
	// load more data into memory than it needs to.
171
	// The only way it will load more items than $max_size is if a single queue item
172
	// exceeds the memory limit, but in that case it will send that item by itself.
173
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
174
		$buffer_id = uniqid();
175
176
		$lock_result = $this->acquire_lock( $buffer_id );
177
		
178
		if ( is_wp_error( $lock_result ) ) {
179
			return $lock_result;
180
		}
181
182
		// get the map of buffer_id -> memory_size
183
		global $wpdb;
184
185
		$items_with_size = $wpdb->get_results(
186
			$wpdb->prepare(
187
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
188
				"jpsq_{$this->id}-%",
189
				$max_buffer_size
190
			),
191
			OBJECT
192
		);
193
194
		$total_memory = 0;
195
		$item_ids     = array();
196
197
		foreach ( $items_with_size as $item_with_size ) {
198
			$total_memory += $item_with_size->value_size;
199
200
			// if this is the first item and it exceeds memory, allow loop to continue
201
			// we will exit on the next iteration instead
202
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
203
				break;
204
			}
205
			$item_ids[] = $item_with_size->id;
206
		}
207
208
		$items = $this->fetch_items_by_id( $item_ids );
209
210
		if ( count( $items ) === 0 ) {
211
			$this->delete_checkout_id();
212
213
			return false;
214
		}
215
216
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
217
218
		return $buffer;
219
	}
220
221
	function checkin( $buffer ) {
222
		$released = $this->release_lock( $buffer );
223
224
		if ( is_wp_error( $released ) ) {
225
			return $released;
226
		}
227
228
		return true;
229
	}
230
231
	function close( $buffer, $ids_to_remove = null ) {
232
		$released = $this->release_lock( $buffer );
233
234
		if ( is_wp_error( $released ) ) {
235
			return $released;
236
		}
237
238
		// by default clear all items in the buffer
239
		if ( is_null( $ids_to_remove ) ) {
240
			$ids_to_remove = $buffer->get_item_ids();
241
		}
242
243
		global $wpdb;
244
245
		if ( count( $ids_to_remove ) > 0 ) {
246
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
247
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
248
			$wpdb->query( $query );
249
		}
250
251
		return true;
252
	}
253
254
	function flush_all() {
255
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
256
		$this->reset();
257
258
		return $items;
259
	}
260
261
	function get_all() {
262
		return $this->fetch_items();
263
	}
264
265
	// use with caution, this could allow multiple processes to delete
266
	// and send from the queue at the same time
267
	function force_checkin() {
268
		$this->delete_checkout_id();
269
	}
270
271
	// used to lock checkouts from the queue.
272
	// tries to wait up to $timeout seconds for the queue to be empty
273
	function lock( $timeout = 30 ) {
274
		$tries = 0;
275
276
		while ( $this->has_any_items() && $tries < $timeout ) {
277
			sleep( 1 );
278
			$tries += 1;
279
		}
280
281
		if ( $tries === 30 ) {
282
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
283
		}
284
285
		$lock_result = $this->acquire_lock( 'lock' );
286
287
		if ( is_wp_error( $lock_result ) ) {
288
			return $lock_result;
289
		}
290
291
		return true;
292
	}
293
294
	function unlock() {
295
		$this->delete_checkout_id();
296
	}
297
298
	private function acquire_lock( $buffer_id ) {
299
		if ( $this->use_named_lock ) {
0 ignored issues
show
Unused Code introduced by
This if statement is empty and can be removed.

This check looks for the bodies of if statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These if bodies can be removed. If you have an empty if but statements in the else branch, consider inverting the condition.

if (rand(1, 6) > 3) {
//print "Check failed";
} else {
    print "Check succeeded";
}

could be turned into

if (rand(1, 6) <= 3) {
    print "Check succeeded";
}

This is much more concise to read.

Loading history...
300
			// use mysql
301
		} else {
302
			if ( $this->get_checkout_id() ) {
303
				return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
304
			}
305
306
			$result = $this->set_checkout_id( $buffer_id );
307
308
			if ( ! $result ) {
309
				return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
310
			} elseif ( is_wp_error( $result ) ) {
311
				return $result;
312
			}
313
314
			return true;
315
		}
316
	}
317
318
	private function get_checkout_id() {
319
		return get_transient( $this->get_checkout_transient_name() );
320
	}
321
322
	private function set_checkout_id( $checkout_id ) {
323
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5 * 60 ); // 5 minute timeout
324
	}
325
326
	private function delete_checkout_id() {
327
		delete_transient( $this->get_checkout_transient_name() );
328
	}
329
330
	private function get_checkout_transient_name() {
331
		return "jpsq_{$this->id}_checkout";
332
	}
333
334
	private function get_next_data_row_option_name() {
335
		// this option is specifically chosen to, as much as possible, preserve time order
336
		// and minimise the possibility of collisions between multiple processes working
337
		// at the same time
338
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
339
		// @see: http://php.net/manual/en/function.microtime.php
340
		$timestamp = sprintf( '%.6f', microtime( true ) );
341
342
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
343
		if ( $this->row_iterator === PHP_INT_MAX ) {
344
			$this->row_iterator = 0;
345
		} else {
346
			$this->row_iterator += 1;
347
		}
348
349
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
350
	}
351
352
	private function fetch_items( $limit = null ) {
353
		global $wpdb;
354
355
		if ( $limit ) {
356
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
357
		} else {
358
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
359
		}
360
361
		$items = $wpdb->get_results( $query_sql, OBJECT );
362
		foreach ( $items as $item ) {
363
			$item->value = maybe_unserialize( $item->value );
364
		}
365
366
		return $items;
367
	}
368
369
	private function fetch_items_by_id( $item_ids ) {
370
		global $wpdb;
371
372
		if ( count( $item_ids ) > 0 ) {
373
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
374
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
375
			$items = $wpdb->get_results( $query, OBJECT );
376
			foreach ( $items as $item ) {
377
				$item->value = maybe_unserialize( $item->value );
378
			}
379
380
			return $items;
381
		} else {
382
			return array();
383
		}
384
	}
385
386
	private function release_lock( $buffer ) {
387
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
388
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
389
		}
390
391
		if ( $this->use_named_lock ) {
0 ignored issues
show
Unused Code introduced by
This if statement is empty and can be removed.

This check looks for the bodies of if statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These if bodies can be removed. If you have an empty if but statements in the else branch, consider inverting the condition.

if (rand(1, 6) > 3) {
//print "Check failed";
} else {
    print "Check succeeded";
}

could be turned into

if (rand(1, 6) <= 3) {
    print "Check succeeded";
}

This is much more concise to read.

Loading history...
392
			// TODO
0 ignored issues
show
Coding Style introduced by
Comment refers to a TODO task

This check looks TODO comments that have been left in the code.

``TODO``s show that something is left unfinished and should be attended to.

Loading history...
393
		} else {
394
			$checkout_id = $this->get_checkout_id();
395
396
			if ( ! $checkout_id ) {
397
				return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
398
			}
399
400
			if ( $checkout_id != $buffer->id ) {
401
				return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
402
			}
403
404
			$this->delete_checkout_id();
405
		}
406
407
		return true;
408
	}
409
}
410
411
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
412
413
	static function get_item_values( $items ) {
414
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
415
	}
416
417
	static function get_item_ids( $items ) {
418
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
419
	}
420
421
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
422
		return $item->value;
423
	}
424
425
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
426
		return $item->id;
427
	}
428
}
429