Completed
Push — fix/vvv-tests-wp-5-2 ( ec7a8b...e4b178 )
by
unknown
12:05 queued 05:22
created

Abstract_Jetpack_Sync_Queue   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 6
Duplicated Lines 0 %

Coupling/Cohesion

Components 0
Dependencies 0

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
wmc 1
lcom 0
cbo 0
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * A persistent queue that can be flushed in increments of N items,
30
 * and which blocks reads until checked-out buffers are checked in or
31
 * closed. This uses raw SQL for two reasons: speed, and not triggering
32
 * tons of added_option callbacks.
33
 */
34
class Jetpack_Sync_Queue {
35
	public $id;
36
	private $row_iterator;
37
38
	function __construct( $id ) {
39
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
40
		$this->row_iterator = 0;
41
		$this->random_int   = mt_rand( 1, 1000000 );
0 ignored issues
show
Bug introduced by
The property random_int does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
42
	}
43
44
	function add( $item ) {
45
		global $wpdb;
46
		$added = false;
47
		// this basically tries to add the option until enough time has elapsed that
48
		// it has a unique (microtime-based) option key
49
		while ( ! $added ) {
50
			$rows_added = $wpdb->query(
51
				$wpdb->prepare(
52
					"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
53
					$this->get_next_data_row_option_name(),
54
					serialize( $item ),
55
					'no'
56
				)
57
			);
58
			$added      = ( 0 !== $rows_added );
59
		}
60
	}
61
62
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
63
	function add_all( $items ) {
64
		global $wpdb;
65
		$base_option_name = $this->get_next_data_row_option_name();
66
67
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
68
69
		$rows = array();
70
71
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
72
			$option_name  = esc_sql( $base_option_name . '-' . $i );
73
			$option_value = esc_sql( serialize( $items[ $i ] ) );
74
			$rows[]       = "('$option_name', '$option_value', 'no')";
75
		}
76
77
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
78
79
		if ( count( $items ) === $rows_added ) {
80
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
81
		}
82
	}
83
84
	// Peek at the front-most item on the queue without checking it out
85
	function peek( $count = 1 ) {
86
		$items = $this->fetch_items( $count );
87
		if ( $items ) {
88
			return Jetpack_Sync_Utils::get_item_values( $items );
89
		}
90
91
		return array();
92
	}
93
94
	// lag is the difference in time between the age of the oldest item
95
	// (aka first or frontmost item) and the current time
96
	function lag( $now = null ) {
97
		global $wpdb;
98
99
		$first_item_name = $wpdb->get_var(
100
			$wpdb->prepare(
101
				"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
102
				"jpsq_{$this->id}-%"
103
			)
104
		);
105
106
		if ( ! $first_item_name ) {
107
			return 0;
108
		}
109
110
		if ( null === $now ) {
111
			$now = microtime( true );
112
		}
113
114
		// break apart the item name to get the timestamp
115
		$matches = null;
116
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
117
			return $now - floatval( $matches[1] );
118
		} else {
119
			return 0;
120
		}
121
	}
122
123
	function reset() {
124
		global $wpdb;
125
		$this->delete_checkout_id();
126
		$wpdb->query(
127
			$wpdb->prepare(
128
				"DELETE FROM $wpdb->options WHERE option_name LIKE %s",
129
				"jpsq_{$this->id}-%"
130
			)
131
		);
132
	}
133
134
	function size() {
135
		global $wpdb;
136
137
		return (int) $wpdb->get_var(
138
			$wpdb->prepare(
139
				"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s",
140
				"jpsq_{$this->id}-%"
141
			)
142
		);
143
	}
144
145
	// we use this peculiar implementation because it's much faster than count(*)
146
	function has_any_items() {
147
		global $wpdb;
148
		$value = $wpdb->get_var(
149
			$wpdb->prepare(
150
				"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )",
151
				"jpsq_{$this->id}-%"
152
			)
153
		);
154
155
		return ( $value === '1' );
156
	}
157
158
	function checkout( $buffer_size ) {
159
		if ( $this->get_checkout_id() ) {
160
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
161
		}
162
163
		$buffer_id = uniqid();
164
165
		$result = $this->set_checkout_id( $buffer_id );
166
167
		if ( ! $result || is_wp_error( $result ) ) {
168
			return $result;
169
		}
170
171
		$items = $this->fetch_items( $buffer_size );
172
173
		if ( count( $items ) === 0 ) {
174
			return false;
175
		}
176
177
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
178
179
		return $buffer;
180
	}
181
182
	// this checks out rows until it either empties the queue or hits a certain memory limit
183
	// it loads the sizes from the DB first so that it doesn't accidentally
184
	// load more data into memory than it needs to.
185
	// The only way it will load more items than $max_size is if a single queue item
186
	// exceeds the memory limit, but in that case it will send that item by itself.
187
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
188
		if ( $this->get_checkout_id() ) {
189
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
190
		}
191
192
		$buffer_id = uniqid();
193
194
		$result = $this->set_checkout_id( $buffer_id );
195
196
		if ( ! $result || is_wp_error( $result ) ) {
197
			return $result;
198
		}
199
200
		// get the map of buffer_id -> memory_size
201
		global $wpdb;
202
203
		$items_with_size = $wpdb->get_results(
204
			$wpdb->prepare(
205
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
206
				"jpsq_{$this->id}-%",
207
				$max_buffer_size
208
			),
209
			OBJECT
210
		);
211
212
		if ( count( $items_with_size ) === 0 ) {
213
			return false;
214
		}
215
216
		$total_memory = 0;
217
218
		$min_item_id = $max_item_id = $items_with_size[0]->id;
219
220
		foreach ( $items_with_size as $id => $item_with_size ) {
221
			$total_memory += $item_with_size->value_size;
222
223
			// if this is the first item and it exceeds memory, allow loop to continue
224
			// we will exit on the next iteration instead
225
			if ( $total_memory > $max_memory && $id > 0 ) {
226
				break;
227
			}
228
229
			$max_item_id = $item_with_size->id;
230
		}
231
232
		$query = $wpdb->prepare(
233
			"SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",
234
			$min_item_id,
235
			$max_item_id
236
		);
237
238
		$items = $wpdb->get_results( $query, OBJECT );
239
		foreach ( $items as $item ) {
240
			$item->value = maybe_unserialize( $item->value );
241
		}
242
243
		if ( count( $items ) === 0 ) {
244
			$this->delete_checkout_id();
245
246
			return false;
247
		}
248
249
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
250
251
		return $buffer;
252
	}
253
254
	function checkin( $buffer ) {
255
		$is_valid = $this->validate_checkout( $buffer );
256
257
		if ( is_wp_error( $is_valid ) ) {
258
			return $is_valid;
259
		}
260
261
		$this->delete_checkout_id();
262
263
		return true;
264
	}
265
266
	function close( $buffer, $ids_to_remove = null ) {
267
		$is_valid = $this->validate_checkout( $buffer );
268
269
		if ( is_wp_error( $is_valid ) ) {
270
			return $is_valid;
271
		}
272
273
		$this->delete_checkout_id();
274
275
		// by default clear all items in the buffer
276
		if ( is_null( $ids_to_remove ) ) {
277
			$ids_to_remove = $buffer->get_item_ids();
278
		}
279
280
		global $wpdb;
281
282
		if ( count( $ids_to_remove ) > 0 ) {
283
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
284
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
285
			$wpdb->query( $query );
286
		}
287
288
		return true;
289
	}
290
291
	function flush_all() {
292
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
293
		$this->reset();
294
295
		return $items;
296
	}
297
298
	function get_all() {
299
		return $this->fetch_items();
300
	}
301
302
	// use with caution, this could allow multiple processes to delete
303
	// and send from the queue at the same time
304
	function force_checkin() {
305
		$this->delete_checkout_id();
306
	}
307
308
	// used to lock checkouts from the queue.
309
	// tries to wait up to $timeout seconds for the queue to be empty
310
	function lock( $timeout = 30 ) {
311
		$tries = 0;
312
313
		while ( $this->has_any_items() && $tries < $timeout ) {
314
			sleep( 1 );
315
			$tries += 1;
316
		}
317
318
		if ( $tries === 30 ) {
319
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
320
		}
321
322
		if ( $this->get_checkout_id() ) {
323
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
324
		}
325
326
		// hopefully this means we can acquire a checkout?
327
		$result = $this->set_checkout_id( 'lock' );
328
329
		if ( ! $result || is_wp_error( $result ) ) {
330
			return $result;
331
		}
332
333
		return true;
334
	}
335
336
	function unlock() {
337
		return $this->delete_checkout_id();
338
	}
339
340
	/**
341
	 * This option is specifically chosen to, as much as possible, preserve time order
342
	 * and minimise the possibility of collisions between multiple processes working
343
	 * at the same time.
344
	 *
345
	 * @return string
346
	 */
347
	protected function generate_option_name_timestamp() {
348
		return sprintf( '%.6f', microtime( true ) );
349
	}
350
351
	private function get_checkout_id() {
352
		global $wpdb;
353
		$checkout_value = $wpdb->get_var(
354
			$wpdb->prepare(
355
				"SELECT option_value FROM $wpdb->options WHERE option_name = %s",
356
				$this->get_lock_option_name()
357
			)
358
		);
359
360
		if ( $checkout_value ) {
361
			list( $checkout_id, $timestamp ) = explode( ':', $checkout_value );
362
			if ( intval( $timestamp ) > time() ) {
363
				return $checkout_id;
364
			}
365
		}
366
367
		return false;
368
	}
369
370
	private function set_checkout_id( $checkout_id ) {
371
		global $wpdb;
372
373
		$expires     = time() + Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout;
0 ignored issues
show
Bug introduced by
The property default_sync_queue_lock_timeout cannot be accessed from this context as it is declared private in class Jetpack_Sync_Defaults.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
374
		$updated_num = $wpdb->query(
375
			$wpdb->prepare(
376
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
377
				"$checkout_id:$expires",
378
				$this->get_lock_option_name()
379
			)
380
		);
381
382
		if ( ! $updated_num ) {
383
			$updated_num = $wpdb->query(
384
				$wpdb->prepare(
385
					"INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )",
386
					$this->get_lock_option_name(),
387
					"$checkout_id:$expires"
388
				)
389
			);
390
		}
391
392
		return $updated_num;
393
	}
394
395
	private function delete_checkout_id() {
396
		global $wpdb;
397
		// rather than delete, which causes fragmentation, we update in place
398
		return $wpdb->query(
399
			$wpdb->prepare(
400
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
401
				'0:0',
402
				$this->get_lock_option_name()
403
			)
404
		);
405
406
	}
407
408
	private function get_lock_option_name() {
409
		return "jpsq_{$this->id}_checkout";
410
	}
411
412
	private function get_next_data_row_option_name() {
413
		$timestamp = $this->generate_option_name_timestamp();
414
415
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
416
		if ( $this->row_iterator === PHP_INT_MAX ) {
417
			$this->row_iterator = 0;
418
		} else {
419
			$this->row_iterator += 1;
420
		}
421
422
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
423
	}
424
425
	private function fetch_items( $limit = null ) {
426
		global $wpdb;
427
428
		if ( $limit ) {
429
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
430
		} else {
431
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
432
		}
433
434
		$items = $wpdb->get_results( $query_sql, OBJECT );
435
		foreach ( $items as $item ) {
436
			$item->value = maybe_unserialize( $item->value );
437
		}
438
439
		return $items;
440
	}
441
442
	private function validate_checkout( $buffer ) {
443
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
444
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
445
		}
446
447
		$checkout_id = $this->get_checkout_id();
448
449
		if ( ! $checkout_id ) {
450
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
451
		}
452
453
		if ( $checkout_id != $buffer->id ) {
454
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
455
		}
456
457
		return true;
458
	}
459
}
460
461
class Jetpack_Sync_Utils {
462
463
	static function get_item_values( $items ) {
464
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
465
	}
466
467
	static function get_item_ids( $items ) {
468
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
469
	}
470
471
	private static function get_item_value( $item ) {
472
		return $item->value;
473
	}
474
475
	private static function get_item_id( $item ) {
476
		return $item->id;
477
	}
478
}
479