Completed
Push — sync/wpdb-insert ( 85a044 )
by George
11:52
created

Jetpack_Sync_Queue::checkin()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
c 0
b 0
f 0
nc 2
nop 1
dl 0
loc 11
rs 9.4285
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
		$this->random_int = mt_rand( 1, 1000000 );
0 ignored issues
show
Bug introduced by
The property random_int does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
42
	}
43
44
	function add( $item ) {
45
		// this basically tries to add the option until enough time has elapsed that
46
		// it has a unique (microtime-based) option key
47
		while ( ! add_option( $this->get_next_data_row_option_name(), serialize( $item ), '', 'no' ) );
48
	}
49
50
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
51
	function add_all( $items ) {
52
		global $wpdb;
53
		$base_option_name = $this->get_next_data_row_option_name();
54
55
		// Building this manually as we can condense many into a single db query.
56
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
57
58
		$rows = array();
59
60
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
61
			$option_name  = esc_sql( $base_option_name . '-' . $i );
62
			$option_value = esc_sql( serialize( $items[ $i ] ) );
63
			$rows[]       = "('$option_name', '$option_value', 'no')";
64
		}
65
66
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
67
68
		if ( count( $items ) === $rows_added ) {
69
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
70
		}
71
	}
72
73
	// Peek at the front-most item on the queue without checking it out
74
	function peek( $count = 1 ) {
75
		$items = $this->fetch_items( $count );
76
		if ( $items ) {
77
			return Jetpack_Sync_Utils::get_item_values( $items );
78
		}
79
80
		return array();
81
	}
82
83
	// lag is the difference in time between the age of the oldest item
84
	// (aka first or frontmost item) and the current time
85
	function lag() {
86
		global $wpdb;
87
88
		$first_item_name = $wpdb->get_var( $wpdb->prepare(
89
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
90
			"jpsq_{$this->id}-%"
91
		) );
92
93
		if ( ! $first_item_name ) {
94
			return 0;
95
		}
96
97
		// break apart the item name to get the timestamp
98
		$matches = null;
99
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
100
			return microtime( true ) - floatval( $matches[1] );
101
		} else {
102
			return 0;
103
		}
104
	}
105
106
	function reset() {
107
		global $wpdb;
108
		$this->delete_checkout_id();
109
		// Not using `$wpdb->delete()` as we need the LIKE comparator.
110
		$wpdb->query( $wpdb->prepare(
111
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
112
		) );
113
	}
114
115
	function size() {
116
		global $wpdb;
117
118
		// Not using `$wpdb->delete()` as we need the LIKE comparator.
119
		return (int) $wpdb->get_var( $wpdb->prepare(
120
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
121
		) );
122
	}
123
124
	// we use this peculiar implementation because it's much faster than count(*)
125
	function has_any_items() {
126
		global $wpdb;
127
		$value = $wpdb->get_var( $wpdb->prepare(
128
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
129
		) );
130
131
		return ( $value === '1' );
132
	}
133
134
	function checkout( $buffer_size ) {
135
		if ( $this->get_checkout_id() ) {
136
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
137
		}
138
139
		$buffer_id = uniqid();
140
141
		$result = $this->set_checkout_id( $buffer_id );
142
143
		if ( ! $result || is_wp_error( $result ) ) {
144
			return $result;
145
		}
146
147
		$items = $this->fetch_items( $buffer_size );
148
149
		if ( count( $items ) === 0 ) {
150
			return false;
151
		}
152
153
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
154
155
		return $buffer;
156
	}
157
158
	// this checks out rows until it either empties the queue or hits a certain memory limit
159
	// it loads the sizes from the DB first so that it doesn't accidentally
160
	// load more data into memory than it needs to.
161
	// The only way it will load more items than $max_size is if a single queue item
162
	// exceeds the memory limit, but in that case it will send that item by itself.
163
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
164
		if ( $this->get_checkout_id() ) {
165
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
166
		}
167
168
		$buffer_id = uniqid();
169
170
		$result = $this->set_checkout_id( $buffer_id );
171
172
		if ( ! $result || is_wp_error( $result ) ) {
173
			return $result;
174
		}
175
176
		// get the map of buffer_id -> memory_size
177
		global $wpdb;
178
179
		$items_with_size = $wpdb->get_results(
180
			$wpdb->prepare(
181
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
182
				"jpsq_{$this->id}-%",
183
				$max_buffer_size
184
			),
185
			OBJECT
186
		);
187
188
		if ( count( $items_with_size ) === 0 ) {
189
			return false;
190
		}
191
192
		$total_memory = 0;
193
194
		$min_item_id = $max_item_id = $items_with_size[0]->id;
195
196
		foreach ( $items_with_size as $id => $item_with_size ) {
197
			$total_memory += $item_with_size->value_size;
198
199
			// if this is the first item and it exceeds memory, allow loop to continue
200
			// we will exit on the next iteration instead
201
			if ( $total_memory > $max_memory && $id > 0 ) {
202
				break;
203
			}
204
205
			$max_item_id = $item_with_size->id;
206
		}
207
208
		$query = $wpdb->prepare( 
209
			"SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",
210
			$min_item_id,
211
			$max_item_id
212
		);
213
214
		$items = $wpdb->get_results( $query, OBJECT );
215
		foreach ( $items as $item ) {
216
			$item->value = maybe_unserialize( $item->value );
217
		}
218
219
		if ( count( $items ) === 0 ) {
220
			$this->delete_checkout_id();
221
222
			return false;
223
		}
224
225
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
226
227
		return $buffer;
228
	}
229
230
	function checkin( $buffer ) {
231
		$is_valid = $this->validate_checkout( $buffer );
232
233
		if ( is_wp_error( $is_valid ) ) {
234
			return $is_valid;
235
		}
236
237
		$this->delete_checkout_id();
238
239
		return true;
240
	}
241
242
	function close( $buffer, $ids_to_remove = null ) {
243
		$is_valid = $this->validate_checkout( $buffer );
244
245
		if ( is_wp_error( $is_valid ) ) {
246
			return $is_valid;
247
		}
248
249
		$this->delete_checkout_id();
250
251
		// by default clear all items in the buffer
252
		if ( is_null( $ids_to_remove ) ) {
253
			$ids_to_remove = $buffer->get_item_ids();
254
		}
255
256
		global $wpdb;
257
258
		if ( count( $ids_to_remove ) > 0 ) {
259
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
260
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
261
			$wpdb->query( $query );
262
		}
263
264
		return true;
265
	}
266
267
	function flush_all() {
268
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
269
		$this->reset();
270
271
		return $items;
272
	}
273
274
	function get_all() {
275
		return $this->fetch_items();
276
	}
277
278
	// use with caution, this could allow multiple processes to delete
279
	// and send from the queue at the same time
280
	function force_checkin() {
281
		$this->delete_checkout_id();
282
	}
283
284
	// used to lock checkouts from the queue.
285
	// tries to wait up to $timeout seconds for the queue to be empty
286
	function lock( $timeout = 30 ) {
287
		$tries = 0;
288
289
		while ( $this->has_any_items() && $tries < $timeout ) {
290
			sleep( 1 );
291
			$tries += 1;
292
		}
293
294
		if ( $tries === 30 ) {
295
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
296
		}
297
298
		if ( $this->get_checkout_id() ) {
299
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
300
		}
301
302
		// hopefully this means we can acquire a checkout?
303
		$result = $this->set_checkout_id( 'lock' );
304
305
		if ( ! $result || is_wp_error( $result ) ) {
306
			return $result;
307
		}
308
309
		return true;
310
	}
311
312
	function unlock() {
313
		return $this->delete_checkout_id();
314
	}
315
316
	private function get_checkout_id() {
317
		global $wpdb;
318
		$checkout_value = $wpdb->get_var( 
319
			$wpdb->prepare(
320
				"SELECT option_value FROM $wpdb->options WHERE option_name = %s", 
321
				$this->get_lock_option_name()
322
			)
323
		);
324
325
		if ( $checkout_value ) {
326
			list( $checkout_id, $timestamp ) = explode( ':', $checkout_value );
327
			if ( intval( $timestamp ) > time() ) {
328
				return $checkout_id;
329
			}
330
		}
331
332
		return false;
333
	}
334
335
	private function set_checkout_id( $checkout_id ) {
336
		$expires = time() + Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout;
0 ignored issues
show
Bug introduced by
The property default_sync_queue_lock_timeout cannot be accessed from this context as it is declared private in class Jetpack_Sync_Defaults.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
337
		return update_option( $this->get_lock_option_name(), "$checkout_id:$expires", 'no' );
338
	}
339
340
	private function delete_checkout_id() {
341
		global $wpdb;
342
		// rather than delete, which causes fragmentation, we update in place
343
		return $wpdb->update( $wpdb->options, array(
344
				'option_value' => '0:0',
345
			), array(
346
				'option_name' => $this->get_lock_option_name(),
347
			), '%s', '%s' );
348
	}
349
350
	private function get_lock_option_name() {
351
		return "jpsq_{$this->id}_checkout";
352
	}
353
354
	private function get_next_data_row_option_name() {
355
		// this option is specifically chosen to, as much as possible, preserve time order
356
		// and minimise the possibility of collisions between multiple processes working
357
		// at the same time
358
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
359
		// @see: http://php.net/manual/en/function.microtime.php
360
		$timestamp = sprintf( '%.6f', microtime( true ) );
361
362
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
363
		if ( $this->row_iterator === PHP_INT_MAX ) {
364
			$this->row_iterator = 0;
365
		} else {
366
			$this->row_iterator += 1;
367
		}
368
369
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
370
	}
371
372
	private function fetch_items( $limit = null ) {
373
		global $wpdb;
374
375
		if ( $limit ) {
376
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
377
		} else {
378
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
379
		}
380
381
		$items = $wpdb->get_results( $query_sql, OBJECT );
382
		foreach ( $items as $item ) {
383
			$item->value = maybe_unserialize( $item->value );
384
		}
385
386
		return $items;
387
	}
388
389
	private function validate_checkout( $buffer ) {
390
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
391
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
392
		}
393
394
		$checkout_id = $this->get_checkout_id();
395
396
		if ( ! $checkout_id ) {
397
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
398
		}
399
400
		if ( $checkout_id != $buffer->id ) {
401
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
402
		}
403
404
		return true;
405
	}
406
}
407
408
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
409
410
	static function get_item_values( $items ) {
411
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
412
	}
413
414
	static function get_item_ids( $items ) {
415
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
416
	}
417
418
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
419
		return $item->value;
420
	}
421
422
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
423
		return $item->id;
424
	}
425
}
426