Completed
Push — add/sync-action ( 819ed1...faa3ed )
by
unknown
10:46
created

Jetpack_Sync_Queue::checkout()   B

Complexity

Conditions 5
Paths 4

Size

Total Lines 24
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 3
Metric Value
cc 5
eloc 13
c 3
b 0
f 3
nc 4
nop 1
dl 0
loc 24
rs 8.5125
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
17
	}
18
19
	public function get_item_ids() {
20
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
21
	}
22
}
23
24
/**
25
 * A persistent queue that can be flushed in increments of N items,
26
 * and which blocks reads until checked-out buffers are checked in or
27
 * closed. This uses raw SQL for two reasons: speed, and not triggering
28
 * tons of added_option callbacks.
29
 */
30
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
31
	public $id;
32
	private $row_iterator;
33
34
	function __construct( $id ) {
35
		$this->id            = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
36
		$this->row_iterator  = 0;
37
	}
38
39
	function add( $item ) {
40
		global $wpdb;
41
		$added = false;
42
		// this basically tries to add the option until enough time has elapsed that
43
		// it has a unique (microtime-based) option key
44
		while ( ! $added ) {
45
			$rows_added = $wpdb->query( $wpdb->prepare(
46
				"INSERT INTO $wpdb->options (option_name, option_value,autoload) VALUES (%s, %s,%s)",
47
				$this->get_next_data_row_option_name(),
48
				serialize( $item ),
49
				'no'
50
			) );
51
			$added      = ( $rows_added !== 0 );
52
		}
53
	}
54
55
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
56
	function add_all( $items ) {
57
		global $wpdb;
58
		$base_option_name = $this->get_next_data_row_option_name();
59
60
		$query = "INSERT INTO $wpdb->options (option_name, option_value,autoload) VALUES ";
61
62
		$rows = array();
63
64
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
65
			$option_name  = esc_sql( $base_option_name . '-' . $i );
66
			$option_value = esc_sql( serialize( $items[ $i ] ) );
67
			$rows[]       = "('$option_name', '$option_value', 'no')";
68
		}
69
70
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
71
72
		if ( $rows_added !== count( $items ) ) {
73
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
74
		}
75
	}
76
77
	// Peek at the front-most item on the queue without checking it out
78
	function peek( $count = 1 ) {
79
		$items = $this->fetch_items( $count );
80
		if ( $items ) {
81
			return Jetpack_Sync_Utils::get_item_values( $items );
82
		}
83
84
		return array();
85
	}
86
87
	// lag is the difference in time between the age of the oldest item and the current time
88
	function lag() {
89
		global $wpdb;
90
91
		$last_item_name = $wpdb->get_var( $wpdb->prepare( 
92
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
93
			"jpsq_{$this->id}-%"
94
		) );
95
96
		if ( ! $last_item_name ) {
97
			return null;
98
		}
99
100
		// break apart the item name to get the timestamp
101
		$matches = null;
102
		if ( preg_match( '/^jpsq_'.$this->id.'-(\d+\.\d+)-/', $last_item_name, $matches ) ) {
103
			return microtime(true) - floatval($matches[1]);	
104
		} else {
105
			return null;
106
		}		
107
	}
108
109
	function reset() {
110
		global $wpdb;
111
		$this->delete_checkout_id();
112
		$wpdb->query( $wpdb->prepare(
113
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
114
		) );
115
	}
116
117
	function size() {
118
		global $wpdb;
119
120
		return $wpdb->get_var( $wpdb->prepare(
121
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
122
		) );
123
	}
124
125
	// we use this peculiar implementation because it's much faster than count(*)
126
	function has_any_items() {
127
		global $wpdb;
128
		$value = $wpdb->get_var( $wpdb->prepare(
129
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
130
		) );
131
132
		return ( $value === "1" );
133
	}
134
135
	function checkout( $buffer_size ) {
136
		if ( $this->get_checkout_id() ) {
137
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
138
		}
139
140
		$buffer_id = uniqid();
141
142
		$result = $this->set_checkout_id( $buffer_id );
143
144
		if ( ! $result || is_wp_error( $result ) ) {
145
			error_log( "badness setting checkout ID (this should not happen)" );
146
			return $result;
147
		}
148
149
		$items = $this->fetch_items( $buffer_size );
150
151
		if ( count( $items ) === 0 ) {
152
			return false;
153
		}
154
155
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
156
157
		return $buffer;
158
	}
159
160
	// this checks out rows until it either empties the queue or hits a certain memory limit
161
	// it loads the sizes from the DB first so that it doesn't accidentally
162
	// load more data into memory than it needs to.
163
	// The only way it will load more items than $max_size is if a single queue item 
164
	// exceeds the memory limit, but in that case it will send that item by itself.
165
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 100 ) {
166
		if ( $this->get_checkout_id() ) {
167
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
168
		}
169
170
		$buffer_id = uniqid();
171
172
		$result = $this->set_checkout_id( $buffer_id );
173
174
		if ( ! $result || is_wp_error( $result ) ) {
175
			error_log( "badness setting checkout ID (this should not happen)" );
176
			return $result;
177
		}
178
179
		// get the map of buffer_id -> memory_size
180
		global $wpdb;
181
182
		$items_with_size = $wpdb->get_results(
183
			$wpdb->prepare(
184
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", 
185
				"jpsq_{$this->id}-%", 
186
				$max_buffer_size 
187
			),
188
			OBJECT
189
		);
190
191
		$total_memory = 0;
192
		$item_ids = array();
193
194
		foreach( $items_with_size as $item_with_size ) {
195
			$total_memory += $item_with_size->value_size;
196
			
197
			// if this is the first item and it exceeds memory, allow loop to continue
198
			// we will exit on the next iteration instead
199
			if ( $total_memory > $max_memory && count($item_ids) > 0 ) {
200
				break;
201
			}
202
			$item_ids[] = $item_with_size->id;
203
		}
204
205
		$items = $this->fetch_items_by_id( $item_ids );
206
207
		if ( count( $items ) === 0 ) {
208
			return false;
209
		}
210
211
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
212
213
		return $buffer;
214
	}
215
216
	function checkin( $buffer ) {
217
		$is_valid = $this->validate_checkout( $buffer );
218
219
		if ( is_wp_error( $is_valid ) ) {
220
			error_log("Invalid checkin: ".$is_valid->get_error_message());
221
			return $is_valid;
222
		}
223
224
		$this->delete_checkout_id();
225
226
		return true;
227
	}
228
229
	function close( $buffer ) {
230
		$is_valid = $this->validate_checkout( $buffer );
231
232
		if ( is_wp_error( $is_valid ) ) {
233
			error_log("Invalid close: ".$is_valid->get_error_message());
234
			return $is_valid;
235
		}
236
237
		$this->delete_checkout_id();
238
239
		global $wpdb;
240
241
		// all this fanciness is basically so we can prepare a statement with an IN(id1, id2, id3) clause
242
		$ids_to_remove = $buffer->get_item_ids();
243
		if ( count( $ids_to_remove ) > 0 ) {
244
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
245
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
246
			$wpdb->query( $query );
247
		}
248
249
		return true;
250
	}
251
252
	function flush_all() {
253
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
254
		$this->reset();
255
256
		return $items;
257
	}
258
259
	function get_all() {
260
		return $this->fetch_items();
261
	}
262
263
	// use with caution, this could allow multiple processes to delete
264
	// and send from the queue at the same time
265
	function force_checkin() {
266
		$this->delete_checkout_id();
267
	}
268
269
	// used to lock checkouts from the queue.
270
	// tries to wait up to $timeout seconds for the queue to be empty
271
	function lock( $timeout = 30 ) {
272
		$tries = 0;
273
274
		while( $this->has_any_items() && $tries < $timeout ) {
275
			sleep(1);			
276
			$tries += 1;
277
		}
278
279
		if ( $tries === 30 ) {
280
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
281
		}
282
283
		if ( $this->get_checkout_id() ) {
284
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
285
		}
286
287
		// hopefully this means we can acquire a checkout?
288
		$result = $this->set_checkout_id( 'lock' );
289
290
		if ( ! $result || is_wp_error( $result ) ) {
291
			error_log( "badness setting checkout ID (this should not happen)" );
292
			return $result;
293
		}
294
295
		return true;
296
	}
297
298
	function unlock() {
299
		$this->delete_checkout_id();
300
	}
301
302
	private function get_checkout_id() {
303
		return get_transient( $this->get_checkout_transient_name() );
304
	}
305
306
	private function set_checkout_id( $checkout_id ) {
307
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5*60 ); // 5 minute timeout
308
	}
309
310
	private function delete_checkout_id() {
311
		delete_transient( $this->get_checkout_transient_name() );
312
	}
313
314
	private function get_checkout_transient_name() {
315
		return "jpsq_{$this->id}_checkout";
316
	}
317
318
	private function get_next_data_row_option_name() {
319
		// this option is specifically chosen to, as much as possible, preserve time order
320
		// and minimise the possibility of collisions between multiple processes working 
321
		// at the same time
322
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
323
		// @see: http://php.net/manual/en/function.microtime.php
324
		$timestamp = sprintf( '%.6f', microtime( true ) );
325
326
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
327
		if ( $this->row_iterator === PHP_INT_MAX ) {
328
			$this->row_iterator = 0;
329
		} else {
330
			$this->row_iterator += 1;
331
		}
332
333
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
334
	}
335
336
	private function fetch_items( $limit = null ) {
337
		global $wpdb;
338
339
		if ( $limit ) {
340
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
341
		} else {
342
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
343
		}
344
345
		$items = $wpdb->get_results( $query_sql, OBJECT );
346
		foreach ( $items as $item ) {
347
			$item->value = maybe_unserialize( $item->value );
348
		}
349
350
		return $items;
351
	}
352
353
	private function fetch_items_by_id( $item_ids ) {
354
		global $wpdb;
355
356
		if ( count( $item_ids ) > 0 ) {
357
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
358
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
359
			$items = $wpdb->get_results( $query, OBJECT );
360
			foreach ( $items as $item ) {
361
				$item->value = maybe_unserialize( $item->value );
362
			}
363
			return $items;
364
		} else {
365
			return array();
366
		}
367
	}
368
369
	private function validate_checkout( $buffer ) {
370
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
371
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
372
		}
373
374
		$checkout_id = $this->get_checkout_id();
375
376
		if ( ! $checkout_id ) {
377
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
378
		}
379
380
		if ( $checkout_id != $buffer->id ) {
381
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
382
		}
383
384
		return true;
385
	}
386
}
387
388
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
389
390
	static function get_item_values( $items ) {
391
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
392
	}
393
394
	static function get_item_ids( $items ) {
395
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
396
	}
397
398
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
399
		return $item->value;
400
	}
401
402
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
403
		return $item->id;
404
	}
405
}
406