Completed
Push — add/sync-action ( cfb5f0...a36f82 )
by
unknown
09:16
created

Jetpack_Sync_Queue   C

Complexity

Total Complexity 60

Size/Duplication

Total Lines 368
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Importance

Changes 11
Bugs 0 Features 3
Metric Value
c 11
b 0
f 3
dl 0
loc 368
rs 6.0975
wmc 60
lcom 1
cbo 2

25 Methods

Rating   Name   Duplication   Size   Complexity  
A add() 0 15 2
A add_all() 0 20 3
A peek() 0 8 2
A reset() 0 7 1
A size() 0 7 1
A has_any_items() 0 8 1
A __construct() 0 4 1
A lag() 0 20 3
B checkout() 0 25 5
B checkout_with_memory_limit() 0 53 8
A checkin() 0 13 2
B close() 0 26 4
A flush_all() 0 6 1
A get_all() 0 3 1
A force_checkin() 0 3 1
C lock() 0 27 7
A unlock() 0 3 1
A get_checkout_id() 0 3 1
A set_checkout_id() 0 3 1
A delete_checkout_id() 0 3 1
A get_checkout_transient_name() 0 3 1
A get_next_data_row_option_name() 0 17 2
A fetch_items() 0 16 3
A fetch_items_by_id() 0 16 3
A validate_checkout() 0 17 4

How to fix   Complexity   

Complex Class

Complex classes like Jetpack_Sync_Queue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Jetpack_Sync_Queue, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), Jetpack_Sync_Utils::get_item_values( $this->items_with_ids ) );
17
	}
18
19
	public function get_item_ids() {
20
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
21
	}
22
}
23
24
/**
25
 * A persistent queue that can be flushed in increments of N items,
26
 * and which blocks reads until checked-out buffers are checked in or
27
 * closed. This uses raw SQL for two reasons: speed, and not triggering
28
 * tons of added_option callbacks.
29
 */
30
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
31
	public $id;
32
	private $row_iterator;
33
34
	function __construct( $id ) {
35
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
36
		$this->row_iterator = 0;
37
	}
38
39
	function add( $item ) {
40
		global $wpdb;
41
		$added = false;
42
		// this basically tries to add the option until enough time has elapsed that
43
		// it has a unique (microtime-based) option key
44
		while ( ! $added ) {
45
			$rows_added = $wpdb->query( $wpdb->prepare(
46
				"INSERT INTO $wpdb->options (option_name, option_value,autoload) VALUES (%s, %s,%s)",
47
				$this->get_next_data_row_option_name(),
48
				serialize( $item ),
49
				'no'
50
			) );
51
			$added      = ( $rows_added !== 0 );
52
		}
53
	}
54
55
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
56
	function add_all( $items ) {
57
		global $wpdb;
58
		$base_option_name = $this->get_next_data_row_option_name();
59
60
		$query = "INSERT INTO $wpdb->options (option_name, option_value,autoload) VALUES ";
61
62
		$rows = array();
63
64
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
65
			$option_name  = esc_sql( $base_option_name . '-' . $i );
66
			$option_value = esc_sql( serialize( $items[ $i ] ) );
67
			$rows[]       = "('$option_name', '$option_value', 'no')";
68
		}
69
70
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
71
72
		if ( $rows_added !== count( $items ) ) {
73
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
74
		}
75
	}
76
77
	// Peek at the front-most item on the queue without checking it out
78
	function peek( $count = 1 ) {
79
		$items = $this->fetch_items( $count );
80
		if ( $items ) {
81
			return Jetpack_Sync_Utils::get_item_values( $items );
82
		}
83
84
		return array();
85
	}
86
87
	// lag is the difference in time between the age of the oldest item and the current time
88
	function lag() {
89
		global $wpdb;
90
91
		$last_item_name = $wpdb->get_var( $wpdb->prepare(
92
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
93
			"jpsq_{$this->id}-%"
94
		) );
95
96
		if ( ! $last_item_name ) {
97
			return null;
98
		}
99
100
		// break apart the item name to get the timestamp
101
		$matches = null;
102
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $last_item_name, $matches ) ) {
103
			return microtime( true ) - floatval( $matches[1] );
104
		} else {
105
			return null;
106
		}
107
	}
108
109
	function reset() {
110
		global $wpdb;
111
		$this->delete_checkout_id();
112
		$wpdb->query( $wpdb->prepare(
113
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
114
		) );
115
	}
116
117
	function size() {
118
		global $wpdb;
119
120
		return (int) $wpdb->get_var( $wpdb->prepare(
121
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
122
		) );
123
	}
124
125
	// we use this peculiar implementation because it's much faster than count(*)
126
	function has_any_items() {
127
		global $wpdb;
128
		$value = $wpdb->get_var( $wpdb->prepare(
129
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
130
		) );
131
132
		return ( $value === "1" );
133
	}
134
135
	function checkout( $buffer_size ) {
136
		if ( $this->get_checkout_id() ) {
137
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
138
		}
139
140
		$buffer_id = uniqid();
141
142
		$result = $this->set_checkout_id( $buffer_id );
143
144
		if ( ! $result || is_wp_error( $result ) ) {
145
			error_log( "badness setting checkout ID (this should not happen)" );
146
147
			return $result;
148
		}
149
150
		$items = $this->fetch_items( $buffer_size );
151
152
		if ( count( $items ) === 0 ) {
153
			return false;
154
		}
155
156
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
157
158
		return $buffer;
159
	}
160
161
	// this checks out rows until it either empties the queue or hits a certain memory limit
162
	// it loads the sizes from the DB first so that it doesn't accidentally
163
	// load more data into memory than it needs to.
164
	// The only way it will load more items than $max_size is if a single queue item 
165
	// exceeds the memory limit, but in that case it will send that item by itself.
166
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
167
		if ( $this->get_checkout_id() ) {
168
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
169
		}
170
171
		$buffer_id = uniqid();
172
173
		$result = $this->set_checkout_id( $buffer_id );
174
175
		if ( ! $result || is_wp_error( $result ) ) {
176
			error_log( "badness setting checkout ID (this should not happen)" );
177
178
			return $result;
179
		}
180
181
		// get the map of buffer_id -> memory_size
182
		global $wpdb;
183
184
		$items_with_size = $wpdb->get_results(
185
			$wpdb->prepare(
186
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
187
				"jpsq_{$this->id}-%",
188
				$max_buffer_size
189
			),
190
			OBJECT
191
		);
192
193
		$total_memory = 0;
194
		$item_ids     = array();
195
196
		foreach ( $items_with_size as $item_with_size ) {
197
			$total_memory += $item_with_size->value_size;
198
199
			// if this is the first item and it exceeds memory, allow loop to continue
200
			// we will exit on the next iteration instead
201
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
202
				break;
203
			}
204
			$item_ids[] = $item_with_size->id;
205
		}
206
207
		$items = $this->fetch_items_by_id( $item_ids );
208
209
		if ( count( $items ) === 0 ) {
210
			$this->delete_checkout_id();
211
212
			return false;
213
		}
214
215
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
216
217
		return $buffer;
218
	}
219
220
	function checkin( $buffer ) {
221
		$is_valid = $this->validate_checkout( $buffer );
222
223
		if ( is_wp_error( $is_valid ) ) {
224
			error_log( "Invalid checkin: " . $is_valid->get_error_message() );
225
226
			return $is_valid;
227
		}
228
229
		$this->delete_checkout_id();
230
231
		return true;
232
	}
233
234
	function close( $buffer, $ids_to_remove = null ) {
235
		$is_valid = $this->validate_checkout( $buffer );
236
237
		if ( is_wp_error( $is_valid ) ) {
238
			error_log( "Invalid close: " . $is_valid->get_error_message() );
239
240
			return $is_valid;
241
		}
242
243
		$this->delete_checkout_id();
244
245
		// by default clear all items in the buffer
246
		if ( is_null( $ids_to_remove ) ) {
247
			$ids_to_remove = $buffer->get_item_ids();
248
		}
249
250
		global $wpdb;
251
252
		if ( count( $ids_to_remove ) > 0 ) {
253
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
254
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
255
			$wpdb->query( $query );
256
		}
257
258
		return true;
259
	}
260
261
	function flush_all() {
262
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
263
		$this->reset();
264
265
		return $items;
266
	}
267
268
	function get_all() {
269
		return $this->fetch_items();
270
	}
271
272
	// use with caution, this could allow multiple processes to delete
273
	// and send from the queue at the same time
274
	function force_checkin() {
275
		$this->delete_checkout_id();
276
	}
277
278
	// used to lock checkouts from the queue.
279
	// tries to wait up to $timeout seconds for the queue to be empty
280
	function lock( $timeout = 30 ) {
281
		$tries = 0;
282
283
		while ( $this->has_any_items() && $tries < $timeout ) {
284
			sleep( 1 );
285
			$tries += 1;
286
		}
287
288
		if ( $tries === 30 ) {
289
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
290
		}
291
292
		if ( $this->get_checkout_id() ) {
293
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
294
		}
295
296
		// hopefully this means we can acquire a checkout?
297
		$result = $this->set_checkout_id( 'lock' );
298
299
		if ( ! $result || is_wp_error( $result ) ) {
300
			error_log( "badness setting checkout ID (this should not happen)" );
301
302
			return $result;
303
		}
304
305
		return true;
306
	}
307
308
	function unlock() {
309
		$this->delete_checkout_id();
310
	}
311
312
	private function get_checkout_id() {
313
		return get_transient( $this->get_checkout_transient_name() );
314
	}
315
316
	private function set_checkout_id( $checkout_id ) {
317
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5 * 60 ); // 5 minute timeout
318
	}
319
320
	private function delete_checkout_id() {
321
		delete_transient( $this->get_checkout_transient_name() );
322
	}
323
324
	private function get_checkout_transient_name() {
325
		return "jpsq_{$this->id}_checkout";
326
	}
327
328
	private function get_next_data_row_option_name() {
329
		// this option is specifically chosen to, as much as possible, preserve time order
330
		// and minimise the possibility of collisions between multiple processes working 
331
		// at the same time
332
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
333
		// @see: http://php.net/manual/en/function.microtime.php
334
		$timestamp = sprintf( '%.6f', microtime( true ) );
335
336
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
337
		if ( $this->row_iterator === PHP_INT_MAX ) {
338
			$this->row_iterator = 0;
339
		} else {
340
			$this->row_iterator += 1;
341
		}
342
343
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
344
	}
345
346
	private function fetch_items( $limit = null ) {
347
		global $wpdb;
348
349
		if ( $limit ) {
350
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
351
		} else {
352
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
353
		}
354
355
		$items = $wpdb->get_results( $query_sql, OBJECT );
356
		foreach ( $items as $item ) {
357
			$item->value = maybe_unserialize( $item->value );
358
		}
359
360
		return $items;
361
	}
362
363
	private function fetch_items_by_id( $item_ids ) {
364
		global $wpdb;
365
366
		if ( count( $item_ids ) > 0 ) {
367
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
368
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
369
			$items = $wpdb->get_results( $query, OBJECT );
370
			foreach ( $items as $item ) {
371
				$item->value = maybe_unserialize( $item->value );
372
			}
373
374
			return $items;
375
		} else {
376
			return array();
377
		}
378
	}
379
380
	private function validate_checkout( $buffer ) {
381
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
382
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
383
		}
384
385
		$checkout_id = $this->get_checkout_id();
386
387
		if ( ! $checkout_id ) {
388
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
389
		}
390
391
		if ( $checkout_id != $buffer->id ) {
392
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
393
		}
394
395
		return true;
396
	}
397
}
398
399
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
400
401
	static function get_item_values( $items ) {
402
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
403
	}
404
405
	static function get_item_ids( $items ) {
406
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
407
	}
408
409
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
410
		return $item->value;
411
	}
412
413
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
414
		return $item->id;
415
	}
416
}
417