Completed
Push — add/notice-if-sync-large-sync-... ( 4cc037 )
by
unknown
15:07 queued 05:11
created

Jetpack_Sync_Queue::get_lag()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 20
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 3
eloc 12
c 1
b 0
f 1
nc 3
nop 1
dl 0
loc 20
rs 9.4285
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
	}
42
43
	function add( $item ) {
44
		global $wpdb;
45
		$added = false;
46
		// this basically tries to add the option until enough time has elapsed that
47
		// it has a unique (microtime-based) option key
48
		while ( ! $added ) {
49
			$rows_added = $wpdb->query( $wpdb->prepare(
50
				"INSERT INTO $wpdb->options (option_name, option_value,autoload) VALUES (%s, %s,%s)",
51
				$this->get_next_data_row_option_name(),
52
				serialize( $item ),
53
				'no'
54
			) );
55
			$added      = ( 0 !== $rows_added );
56
		}
57
	}
58
59
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
60
	function add_all( $items ) {
61
		global $wpdb;
62
		$base_option_name = $this->get_next_data_row_option_name();
63
64
		$query = "INSERT INTO $wpdb->options (option_name, option_value,autoload) VALUES ";
65
66
		$rows = array();
67
68
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
69
			$option_name  = esc_sql( $base_option_name . '-' . $i );
70
			$option_value = esc_sql( serialize( $items[ $i ] ) );
71
			$rows[]       = "('$option_name', '$option_value', 'no')";
72
		}
73
74
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
75
76
		if ( count( $items ) === $rows_added ) {
77
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
78
		}
79
	}
80
81
	// Peek at the front-most item on the queue without checking it out
82
	function peek( $count = 1 ) {
83
		$items = $this->fetch_items( $count );
84
		if ( $items ) {
85
			return Jetpack_Sync_Utils::get_item_values( $items );
86
		}
87
88
		return array();
89
	}
90
91
	// lag is the difference in time between the age of the oldest item
92
	// (aka first or frontmost item) and the current time
93
	function lag() {
94
		return self::get_lag( $this->id );
95
	}
96
97
	static function get_lag( $id ) {
98
		global $wpdb;
99
100
		$first_item_name = $wpdb->get_var( $wpdb->prepare(
101
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
102
			"jpsq_{$id}-%"
103
		) );
104
105
		if ( ! $first_item_name ) {
106
			return 0;
107
		}
108
109
		// break apart the item name to get the timestamp
110
		$matches = null;
111
		if ( preg_match( '/^jpsq_' . $id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
112
			return microtime( true ) - floatval( $matches[1] );
113
		} else {
114
			return 0;
115
		}
116
	}
117
118
	function reset() {
119
		global $wpdb;
120
		$this->delete_checkout_id();
121
		$wpdb->query( $wpdb->prepare(
122
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
123
		) );
124
	}
125
126
	function size() {
127
		global $wpdb;
128
129
		return (int) $wpdb->get_var( $wpdb->prepare(
130
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
131
		) );
132
	}
133
134
	// we use this peculiar implementation because it's much faster than count(*)
135
	function has_any_items() {
136
		global $wpdb;
137
		$value = $wpdb->get_var( $wpdb->prepare(
138
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
139
		) );
140
141
		return ( $value === '1' );
142
	}
143
144
	function checkout( $buffer_size ) {
145
		if ( $this->get_checkout_id() ) {
146
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
147
		}
148
149
		$buffer_id = uniqid();
150
151
		$result = $this->set_checkout_id( $buffer_id );
152
153
		if ( ! $result || is_wp_error( $result ) ) {
154
			return $result;
155
		}
156
157
		$items = $this->fetch_items( $buffer_size );
158
159
		if ( count( $items ) === 0 ) {
160
			return false;
161
		}
162
163
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
164
165
		return $buffer;
166
	}
167
168
	// this checks out rows until it either empties the queue or hits a certain memory limit
169
	// it loads the sizes from the DB first so that it doesn't accidentally
170
	// load more data into memory than it needs to.
171
	// The only way it will load more items than $max_size is if a single queue item
172
	// exceeds the memory limit, but in that case it will send that item by itself.
173
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
174
		if ( $this->get_checkout_id() ) {
175
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
176
		}
177
178
		$buffer_id = uniqid();
179
180
		$result = $this->set_checkout_id( $buffer_id );
181
182
		if ( ! $result || is_wp_error( $result ) ) {
183
			return $result;
184
		}
185
186
		// get the map of buffer_id -> memory_size
187
		global $wpdb;
188
189
		$items_with_size = $wpdb->get_results(
190
			$wpdb->prepare(
191
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
192
				"jpsq_{$this->id}-%",
193
				$max_buffer_size
194
			),
195
			OBJECT
196
		);
197
198
		$total_memory = 0;
199
		$item_ids     = array();
200
201
		foreach ( $items_with_size as $item_with_size ) {
202
			$total_memory += $item_with_size->value_size;
203
204
			// if this is the first item and it exceeds memory, allow loop to continue
205
			// we will exit on the next iteration instead
206
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
207
				break;
208
			}
209
			$item_ids[] = $item_with_size->id;
210
		}
211
212
		$items = $this->fetch_items_by_id( $item_ids );
213
214
		if ( count( $items ) === 0 ) {
215
			$this->delete_checkout_id();
216
217
			return false;
218
		}
219
220
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
221
222
		return $buffer;
223
	}
224
225
	function checkin( $buffer ) {
226
		$is_valid = $this->validate_checkout( $buffer );
227
228
		if ( is_wp_error( $is_valid ) ) {
229
			return $is_valid;
230
		}
231
232
		$this->delete_checkout_id();
233
234
		return true;
235
	}
236
237
	function close( $buffer, $ids_to_remove = null ) {
238
		$is_valid = $this->validate_checkout( $buffer );
239
240
		if ( is_wp_error( $is_valid ) ) {
241
			return $is_valid;
242
		}
243
244
		$this->delete_checkout_id();
245
246
		// by default clear all items in the buffer
247
		if ( is_null( $ids_to_remove ) ) {
248
			$ids_to_remove = $buffer->get_item_ids();
249
		}
250
251
		global $wpdb;
252
253
		if ( count( $ids_to_remove ) > 0 ) {
254
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
255
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
256
			$wpdb->query( $query );
257
		}
258
259
		return true;
260
	}
261
262
	function flush_all() {
263
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
264
		$this->reset();
265
266
		return $items;
267
	}
268
269
	function get_all() {
270
		return $this->fetch_items();
271
	}
272
273
	// use with caution, this could allow multiple processes to delete
274
	// and send from the queue at the same time
275
	function force_checkin() {
276
		$this->delete_checkout_id();
277
	}
278
279
	// used to lock checkouts from the queue.
280
	// tries to wait up to $timeout seconds for the queue to be empty
281
	function lock( $timeout = 30 ) {
282
		$tries = 0;
283
284
		while ( $this->has_any_items() && $tries < $timeout ) {
285
			sleep( 1 );
286
			$tries += 1;
287
		}
288
289
		if ( $tries === 30 ) {
290
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
291
		}
292
293
		if ( $this->get_checkout_id() ) {
294
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
295
		}
296
297
		// hopefully this means we can acquire a checkout?
298
		$result = $this->set_checkout_id( 'lock' );
299
300
		if ( ! $result || is_wp_error( $result ) ) {
301
			return $result;
302
		}
303
304
		return true;
305
	}
306
307
	function unlock() {
308
		$this->delete_checkout_id();
309
	}
310
311
	private function get_checkout_id() {
312
		return get_transient( $this->get_checkout_transient_name() );
313
	}
314
315
	private function set_checkout_id( $checkout_id ) {
316
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5 * 60 ); // 5 minute timeout
317
	}
318
319
	private function delete_checkout_id() {
320
		delete_transient( $this->get_checkout_transient_name() );
321
	}
322
323
	private function get_checkout_transient_name() {
324
		return "jpsq_{$this->id}_checkout";
325
	}
326
327
	private function get_next_data_row_option_name() {
328
		// this option is specifically chosen to, as much as possible, preserve time order
329
		// and minimise the possibility of collisions between multiple processes working
330
		// at the same time
331
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
332
		// @see: http://php.net/manual/en/function.microtime.php
333
		$timestamp = sprintf( '%.6f', microtime( true ) );
334
335
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
336
		if ( $this->row_iterator === PHP_INT_MAX ) {
337
			$this->row_iterator = 0;
338
		} else {
339
			$this->row_iterator += 1;
340
		}
341
342
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
343
	}
344
345
	private function fetch_items( $limit = null ) {
346
		global $wpdb;
347
348
		if ( $limit ) {
349
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
350
		} else {
351
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
352
		}
353
354
		$items = $wpdb->get_results( $query_sql, OBJECT );
355
		foreach ( $items as $item ) {
356
			$item->value = maybe_unserialize( $item->value );
357
		}
358
359
		return $items;
360
	}
361
362
	private function fetch_items_by_id( $item_ids ) {
363
		global $wpdb;
364
365
		if ( count( $item_ids ) > 0 ) {
366
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
367
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
368
			$items = $wpdb->get_results( $query, OBJECT );
369
			foreach ( $items as $item ) {
370
				$item->value = maybe_unserialize( $item->value );
371
			}
372
373
			return $items;
374
		} else {
375
			return array();
376
		}
377
	}
378
379
	private function validate_checkout( $buffer ) {
380
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
381
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
382
		}
383
384
		$checkout_id = $this->get_checkout_id();
385
386
		if ( ! $checkout_id ) {
387
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
388
		}
389
390
		if ( $checkout_id != $buffer->id ) {
391
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
392
		}
393
394
		return true;
395
	}
396
}
397
398
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
399
400
	static function get_item_values( $items ) {
401
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
402
	}
403
404
	static function get_item_ids( $items ) {
405
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
406
	}
407
408
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
409
		return $item->value;
410
	}
411
412
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
413
		return $item->id;
414
	}
415
}
416