Completed
Push — add/sync-action ( bcde85...625fbd )
by
unknown
30:56 queued 22:35
created

Jetpack_Sync_Queue_Buffer::get_items()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 2
Metric Value
cc 1
eloc 2
c 2
b 0
f 2
nc 1
nop 0
dl 0
loc 3
rs 10
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
	}
42
43
	function add( $item ) {
44
		global $wpdb;
45
		$added = false;
46
		// this basically tries to add the option until enough time has elapsed that
47
		// it has a unique (microtime-based) option key
48
		while ( ! $added ) {
49
			$rows_added = $wpdb->query( $wpdb->prepare(
50
				"INSERT INTO $wpdb->options (option_name, option_value,autoload) VALUES (%s, %s,%s)",
51
				$this->get_next_data_row_option_name(),
52
				serialize( $item ),
53
				'no'
54
			) );
55
			$added      = ( $rows_added !== 0 );
56
		}
57
	}
58
59
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
60
	function add_all( $items ) {
61
		global $wpdb;
62
		$base_option_name = $this->get_next_data_row_option_name();
63
64
		$query = "INSERT INTO $wpdb->options (option_name, option_value,autoload) VALUES ";
65
66
		$rows = array();
67
68
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
69
			$option_name  = esc_sql( $base_option_name . '-' . $i );
70
			$option_value = esc_sql( serialize( $items[ $i ] ) );
71
			$rows[]       = "('$option_name', '$option_value', 'no')";
72
		}
73
74
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
75
76
		if ( $rows_added !== count( $items ) ) {
77
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
78
		}
79
	}
80
81
	// Peek at the front-most item on the queue without checking it out
82
	function peek( $count = 1 ) {
83
		$items = $this->fetch_items( $count );
84
		if ( $items ) {
85
			return Jetpack_Sync_Utils::get_item_values( $items );
86
		}
87
88
		return array();
89
	}
90
91
	// lag is the difference in time between the age of the oldest item and the current time
92
	function lag() {
93
		global $wpdb;
94
95
		$last_item_name = $wpdb->get_var( $wpdb->prepare(
96
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
97
			"jpsq_{$this->id}-%"
98
		) );
99
100
		if ( ! $last_item_name ) {
101
			return null;
102
		}
103
104
		// break apart the item name to get the timestamp
105
		$matches = null;
106
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $last_item_name, $matches ) ) {
107
			return microtime( true ) - floatval( $matches[1] );
108
		} else {
109
			return null;
110
		}
111
	}
112
113
	function reset() {
114
		global $wpdb;
115
		$this->delete_checkout_id();
116
		$wpdb->query( $wpdb->prepare(
117
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
118
		) );
119
	}
120
121
	function size() {
122
		global $wpdb;
123
124
		return (int) $wpdb->get_var( $wpdb->prepare(
125
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
126
		) );
127
	}
128
129
	// we use this peculiar implementation because it's much faster than count(*)
130
	function has_any_items() {
131
		global $wpdb;
132
		$value = $wpdb->get_var( $wpdb->prepare(
133
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
134
		) );
135
136
		return ( $value === "1" );
137
	}
138
139
	function checkout( $buffer_size ) {
140
		if ( $this->get_checkout_id() ) {
141
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
142
		}
143
144
		$buffer_id = uniqid();
145
146
		$result = $this->set_checkout_id( $buffer_id );
147
148
		if ( ! $result || is_wp_error( $result ) ) {
149
			error_log( "badness setting checkout ID (this should not happen)" );
150
151
			return $result;
152
		}
153
154
		$items = $this->fetch_items( $buffer_size );
155
156
		if ( count( $items ) === 0 ) {
157
			return false;
158
		}
159
160
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
161
162
		return $buffer;
163
	}
164
165
	// this checks out rows until it either empties the queue or hits a certain memory limit
166
	// it loads the sizes from the DB first so that it doesn't accidentally
167
	// load more data into memory than it needs to.
168
	// The only way it will load more items than $max_size is if a single queue item 
169
	// exceeds the memory limit, but in that case it will send that item by itself.
170
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
171
		if ( $this->get_checkout_id() ) {
172
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
173
		}
174
175
		$buffer_id = uniqid();
176
177
		$result = $this->set_checkout_id( $buffer_id );
178
179
		if ( ! $result || is_wp_error( $result ) ) {
180
			error_log( "badness setting checkout ID (this should not happen)" );
181
182
			return $result;
183
		}
184
185
		// get the map of buffer_id -> memory_size
186
		global $wpdb;
187
188
		$items_with_size = $wpdb->get_results(
189
			$wpdb->prepare(
190
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
191
				"jpsq_{$this->id}-%",
192
				$max_buffer_size
193
			),
194
			OBJECT
195
		);
196
197
		$total_memory = 0;
198
		$item_ids     = array();
199
200
		foreach ( $items_with_size as $item_with_size ) {
201
			$total_memory += $item_with_size->value_size;
202
203
			// if this is the first item and it exceeds memory, allow loop to continue
204
			// we will exit on the next iteration instead
205
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
206
				break;
207
			}
208
			$item_ids[] = $item_with_size->id;
209
		}
210
211
		$items = $this->fetch_items_by_id( $item_ids );
212
213
		if ( count( $items ) === 0 ) {
214
			$this->delete_checkout_id();
215
216
			return false;
217
		}
218
219
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
220
221
		return $buffer;
222
	}
223
224
	function checkin( $buffer ) {
225
		$is_valid = $this->validate_checkout( $buffer );
226
227
		if ( is_wp_error( $is_valid ) ) {
228
			error_log( "Invalid checkin: " . $is_valid->get_error_message() );
229
230
			return $is_valid;
231
		}
232
233
		$this->delete_checkout_id();
234
235
		return true;
236
	}
237
238
	function close( $buffer, $ids_to_remove = null ) {
239
		$is_valid = $this->validate_checkout( $buffer );
240
241
		if ( is_wp_error( $is_valid ) ) {
242
			error_log( "Invalid close: " . $is_valid->get_error_message() );
243
244
			return $is_valid;
245
		}
246
247
		$this->delete_checkout_id();
248
249
		// by default clear all items in the buffer
250
		if ( is_null( $ids_to_remove ) ) {
251
			$ids_to_remove = $buffer->get_item_ids();
252
		}
253
254
		global $wpdb;
255
256
		if ( count( $ids_to_remove ) > 0 ) {
257
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
258
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
259
			$wpdb->query( $query );
260
		}
261
262
		return true;
263
	}
264
265
	function flush_all() {
266
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
267
		$this->reset();
268
269
		return $items;
270
	}
271
272
	function get_all() {
273
		return $this->fetch_items();
274
	}
275
276
	// use with caution, this could allow multiple processes to delete
277
	// and send from the queue at the same time
278
	function force_checkin() {
279
		$this->delete_checkout_id();
280
	}
281
282
	// used to lock checkouts from the queue.
283
	// tries to wait up to $timeout seconds for the queue to be empty
284
	function lock( $timeout = 30 ) {
285
		$tries = 0;
286
287
		while ( $this->has_any_items() && $tries < $timeout ) {
288
			sleep( 1 );
289
			$tries += 1;
290
		}
291
292
		if ( $tries === 30 ) {
293
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
294
		}
295
296
		if ( $this->get_checkout_id() ) {
297
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
298
		}
299
300
		// hopefully this means we can acquire a checkout?
301
		$result = $this->set_checkout_id( 'lock' );
302
303
		if ( ! $result || is_wp_error( $result ) ) {
304
			error_log( "badness setting checkout ID (this should not happen)" );
305
306
			return $result;
307
		}
308
309
		return true;
310
	}
311
312
	function unlock() {
313
		$this->delete_checkout_id();
314
	}
315
316
	private function get_checkout_id() {
317
		return get_transient( $this->get_checkout_transient_name() );
318
	}
319
320
	private function set_checkout_id( $checkout_id ) {
321
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5 * 60 ); // 5 minute timeout
322
	}
323
324
	private function delete_checkout_id() {
325
		delete_transient( $this->get_checkout_transient_name() );
326
	}
327
328
	private function get_checkout_transient_name() {
329
		return "jpsq_{$this->id}_checkout";
330
	}
331
332
	private function get_next_data_row_option_name() {
333
		// this option is specifically chosen to, as much as possible, preserve time order
334
		// and minimise the possibility of collisions between multiple processes working 
335
		// at the same time
336
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
337
		// @see: http://php.net/manual/en/function.microtime.php
338
		$timestamp = sprintf( '%.6f', microtime( true ) );
339
340
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
341
		if ( $this->row_iterator === PHP_INT_MAX ) {
342
			$this->row_iterator = 0;
343
		} else {
344
			$this->row_iterator += 1;
345
		}
346
347
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
348
	}
349
350
	private function fetch_items( $limit = null ) {
351
		global $wpdb;
352
353
		if ( $limit ) {
354
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
355
		} else {
356
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
357
		}
358
359
		$items = $wpdb->get_results( $query_sql, OBJECT );
360
		foreach ( $items as $item ) {
361
			$item->value = maybe_unserialize( $item->value );
362
		}
363
364
		return $items;
365
	}
366
367
	private function fetch_items_by_id( $item_ids ) {
368
		global $wpdb;
369
370
		if ( count( $item_ids ) > 0 ) {
371
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
372
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
373
			$items = $wpdb->get_results( $query, OBJECT );
374
			foreach ( $items as $item ) {
375
				$item->value = maybe_unserialize( $item->value );
376
			}
377
378
			return $items;
379
		} else {
380
			return array();
381
		}
382
	}
383
384
	private function validate_checkout( $buffer ) {
385
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
386
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
387
		}
388
389
		$checkout_id = $this->get_checkout_id();
390
391
		if ( ! $checkout_id ) {
392
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
393
		}
394
395
		if ( $checkout_id != $buffer->id ) {
396
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
397
		}
398
399
		return true;
400
	}
401
}
402
403
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
404
405
	static function get_item_values( $items ) {
406
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
407
	}
408
409
	static function get_item_ids( $items ) {
410
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
411
	}
412
413
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
414
		return $item->value;
415
	}
416
417
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
418
		return $item->id;
419
	}
420
}
421