Completed
Push — add/stop-sync-filter ( 639cc9 )
by
unknown
85:52 queued 75:55
created

Jetpack_Sync_Queue::get_checkout_id()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
	}
42
43
	function add( $item ) {
44
		global $wpdb;
45
		$added = false;
46
47
		/**
48
		 * Filters whether to skip adding an item to the sync que.
49
		 *
50
		 * Passing true to the filter will prevent the item to not be added to the que.
51
		 * $item = array( $action, $args, $user_id, $microtime );
52
		 *
53
		 * @since 4.1.0
54
		 *
55
		 * @see Jetpack_Sync_Queue->add()
56
		 */
57
		if ( apply_filters( 'jetpack_skip_sync_item', false, $item ) ) {
58
			return false;
59
		}
60
61
		// this basically tries to add the option until enough time has elapsed that
62
		// it has a unique (microtime-based) option key
63
		while ( ! $added ) {
64
			$rows_added = $wpdb->query( $wpdb->prepare(
65
				"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
66
				$this->get_next_data_row_option_name(),
67
				serialize( $item ),
68
				'no'
69
			) );
70
			$added      = ( $rows_added !== 0 );
71
		}
72
	}
73
74
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
75
	function add_all( $items ) {
76
		global $wpdb;
77
		$base_option_name = $this->get_next_data_row_option_name();
78
79
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
80
81
		$rows = array();
82
83
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
84
			if ( apply_filters( 'jetpack_skip_sync_item', false, $items[ $i ]) ) {
85
				continue;
86
			}
87
			$option_name  = esc_sql( $base_option_name . '-' . $i );
88
			$option_value = esc_sql( serialize( $items[ $i ] ) );
89
			$rows[]       = "('$option_name', '$option_value', 'no')";
90
		}
91
92
		if ( empty( $rows ) ) {
93
			return false;
94
		}
95
96
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
97
98
		if ( $rows_added !== count( $rows ) ) {
99
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
100
		}
101
	}
102
103
	// Peek at the front-most item on the queue without checking it out
104
	function peek( $count = 1 ) {
105
		$items = $this->fetch_items( $count );
106
		if ( $items ) {
107
			return Jetpack_Sync_Utils::get_item_values( $items );
108
		}
109
110
		return array();
111
	}
112
113
	// lag is the difference in time between the age of the oldest item and the current time
114
	function lag() {
115
		global $wpdb;
116
117
		$last_item_name = $wpdb->get_var( $wpdb->prepare(
118
			"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
119
			"jpsq_{$this->id}-%"
120
		) );
121
122
		if ( ! $last_item_name ) {
123
			return null;
124
		}
125
126
		// break apart the item name to get the timestamp
127
		$matches = null;
128
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $last_item_name, $matches ) ) {
129
			return microtime( true ) - floatval( $matches[1] );
130
		} else {
131
			return null;
132
		}
133
	}
134
135
	function reset() {
136
		global $wpdb;
137
		$this->delete_checkout_id();
138
		$wpdb->query( $wpdb->prepare(
139
			"DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
140
		) );
141
	}
142
143
	function size() {
144
		global $wpdb;
145
146
		return (int) $wpdb->get_var( $wpdb->prepare(
147
			"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
148
		) );
149
	}
150
151
	// we use this peculiar implementation because it's much faster than count(*)
152
	function has_any_items() {
153
		global $wpdb;
154
		$value = $wpdb->get_var( $wpdb->prepare(
155
			"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
156
		) );
157
158
		return ( $value === "1" );
159
	}
160
161
	function checkout( $buffer_size ) {
162
		if ( $this->get_checkout_id() ) {
163
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
164
		}
165
166
		$buffer_id = uniqid();
167
168
		$result = $this->set_checkout_id( $buffer_id );
169
170
		if ( ! $result || is_wp_error( $result ) ) {
171
			return $result;
172
		}
173
174
		$items = $this->fetch_items( $buffer_size );
175
176
		if ( count( $items ) === 0 ) {
177
			return false;
178
		}
179
180
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
181
182
		return $buffer;
183
	}
184
185
	// this checks out rows until it either empties the queue or hits a certain memory limit
186
	// it loads the sizes from the DB first so that it doesn't accidentally
187
	// load more data into memory than it needs to.
188
	// The only way it will load more items than $max_size is if a single queue item 
189
	// exceeds the memory limit, but in that case it will send that item by itself.
190
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
191
		if ( $this->get_checkout_id() ) {
192
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
193
		}
194
195
		$buffer_id = uniqid();
196
197
		$result = $this->set_checkout_id( $buffer_id );
198
199
		if ( ! $result || is_wp_error( $result ) ) {
200
			return $result;
201
		}
202
203
		// get the map of buffer_id -> memory_size
204
		global $wpdb;
205
206
		$items_with_size = $wpdb->get_results(
207
			$wpdb->prepare(
208
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
209
				"jpsq_{$this->id}-%",
210
				$max_buffer_size
211
			),
212
			OBJECT
213
		);
214
215
		$total_memory = 0;
216
		$item_ids     = array();
217
218
		foreach ( $items_with_size as $item_with_size ) {
219
			$total_memory += $item_with_size->value_size;
220
221
			// if this is the first item and it exceeds memory, allow loop to continue
222
			// we will exit on the next iteration instead
223
			if ( $total_memory > $max_memory && count( $item_ids ) > 0 ) {
224
				break;
225
			}
226
			$item_ids[] = $item_with_size->id;
227
		}
228
229
		$items = $this->fetch_items_by_id( $item_ids );
230
231
		if ( count( $items ) === 0 ) {
232
			$this->delete_checkout_id();
233
234
			return false;
235
		}
236
237
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
238
239
		return $buffer;
240
	}
241
242
	function checkin( $buffer ) {
243
		$is_valid = $this->validate_checkout( $buffer );
244
245
		if ( is_wp_error( $is_valid ) ) {
246
			return $is_valid;
247
		}
248
249
		$this->delete_checkout_id();
250
251
		return true;
252
	}
253
254
	function close( $buffer, $ids_to_remove = null ) {
255
		$is_valid = $this->validate_checkout( $buffer );
256
257
		if ( is_wp_error( $is_valid ) ) {
258
			return $is_valid;
259
		}
260
261
		$this->delete_checkout_id();
262
263
		// by default clear all items in the buffer
264
		if ( is_null( $ids_to_remove ) ) {
265
			$ids_to_remove = $buffer->get_item_ids();
266
		}
267
268
		global $wpdb;
269
270
		if ( count( $ids_to_remove ) > 0 ) {
271
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
272
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
273
			$wpdb->query( $query );
274
		}
275
276
		return true;
277
	}
278
279
	function flush_all() {
280
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
281
		$this->reset();
282
283
		return $items;
284
	}
285
286
	function get_all() {
287
		return $this->fetch_items();
288
	}
289
290
	// use with caution, this could allow multiple processes to delete
291
	// and send from the queue at the same time
292
	function force_checkin() {
293
		$this->delete_checkout_id();
294
	}
295
296
	// used to lock checkouts from the queue.
297
	// tries to wait up to $timeout seconds for the queue to be empty
298
	function lock( $timeout = 30 ) {
299
		$tries = 0;
300
301
		while ( $this->has_any_items() && $tries < $timeout ) {
302
			sleep( 1 );
303
			$tries += 1;
304
		}
305
306
		if ( $tries === 30 ) {
307
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
308
		}
309
310
		if ( $this->get_checkout_id() ) {
311
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
312
		}
313
314
		// hopefully this means we can acquire a checkout?
315
		$result = $this->set_checkout_id( 'lock' );
316
317
		if ( ! $result || is_wp_error( $result ) ) {
318
			return $result;
319
		}
320
321
		return true;
322
	}
323
324
	function unlock() {
325
		$this->delete_checkout_id();
326
	}
327
328
	private function get_checkout_id() {
329
		return get_transient( $this->get_checkout_transient_name() );
330
	}
331
332
	private function set_checkout_id( $checkout_id ) {
333
		return set_transient( $this->get_checkout_transient_name(), $checkout_id, 5 * 60 ); // 5 minute timeout
334
	}
335
336
	private function delete_checkout_id() {
337
		delete_transient( $this->get_checkout_transient_name() );
338
	}
339
340
	private function get_checkout_transient_name() {
341
		return "jpsq_{$this->id}_checkout";
342
	}
343
344
	private function get_next_data_row_option_name() {
345
		// this option is specifically chosen to, as much as possible, preserve time order
346
		// and minimise the possibility of collisions between multiple processes working 
347
		// at the same time
348
		// TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX)
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
349
		// @see: http://php.net/manual/en/function.microtime.php
350
		$timestamp = sprintf( '%.6f', microtime( true ) );
351
352
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
353
		if ( $this->row_iterator === PHP_INT_MAX ) {
354
			$this->row_iterator = 0;
355
		} else {
356
			$this->row_iterator += 1;
357
		}
358
359
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . getmypid() . '-' . $this->row_iterator;
360
	}
361
362
	private function fetch_items( $limit = null ) {
363
		global $wpdb;
364
365
		if ( $limit ) {
366
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
367
		} else {
368
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
369
		}
370
371
		$items = $wpdb->get_results( $query_sql, OBJECT );
372
		foreach ( $items as $item ) {
373
			$item->value = maybe_unserialize( $item->value );
374
		}
375
376
		return $items;
377
	}
378
379
	private function fetch_items_by_id( $item_ids ) {
380
		global $wpdb;
381
382
		if ( count( $item_ids ) > 0 ) {
383
			$sql   = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $item_ids ), '%s' ) ) . ') ORDER BY option_name ASC';
384
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $item_ids ) );
385
			$items = $wpdb->get_results( $query, OBJECT );
386
			foreach ( $items as $item ) {
387
				$item->value = maybe_unserialize( $item->value );
388
			}
389
390
			return $items;
391
		} else {
392
			return array();
393
		}
394
	}
395
396
	private function validate_checkout( $buffer ) {
397
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
398
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
399
		}
400
401
		$checkout_id = $this->get_checkout_id();
402
403
		if ( ! $checkout_id ) {
404
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
405
		}
406
407
		if ( $checkout_id != $buffer->id ) {
408
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
409
		}
410
411
		return true;
412
	}
413
}
414
415
class Jetpack_Sync_Utils {
0 ignored issues
show
Coding Style Compatibility introduced by
PSR1 recommends that each class should be in its own file to aid autoloaders.

Having each class in a dedicated file usually plays nice with PSR autoloaders and is therefore a well established practice. If you use other autoloaders, you might not want to follow this rule.

Loading history...
416
417
	static function get_item_values( $items ) {
418
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
419
	}
420
421
	static function get_item_ids( $items ) {
422
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
423
	}
424
425
	static private function get_item_value( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
426
		return $item->value;
427
	}
428
429
	static private function get_item_id( $item ) {
0 ignored issues
show
Coding Style introduced by
As per PSR2, the static declaration should come after the visibility declaration.
Loading history...
430
		return $item->id;
431
	}
432
}
433