Completed
Push — fix/vvv-tests-wp-5-2 ( 6d6775...ec7a8b )
by
unknown
08:39 queued 01:50
created

Abstract_Jetpack_Sync_Queue   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 6
Duplicated Lines 0 %

Coupling/Cohesion

Components 0
Dependencies 0

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
wmc 1
lcom 0
cbo 0

1 Method

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * Base class for queue implementations
30
 */
31
abstract class Abstract_Jetpack_Sync_Queue {
32
	public $id;
33
	function __construct( $id ) {
34
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
35
	}
36
}
37
38
/**
39
 * A persistent queue that can be flushed in increments of N items,
40
 * and which blocks reads until checked-out buffers are checked in or
41
 * closed. This uses raw SQL for two reasons: speed, and not triggering
42
 * tons of added_option callbacks.
43
 */
44
class Jetpack_Sync_Queue extends Abstract_Jetpack_Sync_Queue {
45
	private $row_iterator;
46
47
	function __construct( $id ) {
48
		parent::__construct( $id );
49
		$this->row_iterator = 0;
50
		$this->random_int   = mt_rand( 1, 1000000 );
0 ignored issues
show
Bug introduced by
The property random_int does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
51
	}
52
53
	function add( $item ) {
54
		global $wpdb;
55
		$added = false;
56
		// this basically tries to add the option until enough time has elapsed that
57
		// it has a unique (microtime-based) option key
58
		while ( ! $added ) {
59
			$rows_added = $wpdb->query(
60
				$wpdb->prepare(
61
					"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
62
					$this->get_next_data_row_option_name(),
63
					serialize( $item ),
64
					'no'
65
				)
66
			);
67
			$added      = ( 0 !== $rows_added );
68
		}
69
	}
70
71
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
72
	function add_all( $items ) {
73
		global $wpdb;
74
		$base_option_name = $this->get_next_data_row_option_name();
75
76
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
77
78
		$rows = array();
79
80
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
81
			$option_name  = esc_sql( $base_option_name . '-' . $i );
82
			$option_value = esc_sql( serialize( $items[ $i ] ) );
83
			$rows[]       = "('$option_name', '$option_value', 'no')";
84
		}
85
86
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
87
88
		if ( count( $items ) === $rows_added ) {
89
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
90
		}
91
	}
92
93
	// Peek at the front-most item on the queue without checking it out
94
	function peek( $count = 1 ) {
95
		$items = $this->fetch_items( $count );
96
		if ( $items ) {
97
			return Jetpack_Sync_Utils::get_item_values( $items );
98
		}
99
100
		return array();
101
	}
102
103
	// lag is the difference in time between the age of the oldest item
104
	// (aka first or frontmost item) and the current time
105
	function lag( $now = null ) {
106
		global $wpdb;
107
108
		$first_item_name = $wpdb->get_var(
109
			$wpdb->prepare(
110
				"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
111
				"jpsq_{$this->id}-%"
112
			)
113
		);
114
115
		if ( ! $first_item_name ) {
116
			return 0;
117
		}
118
119
		if ( null === $now ) {
120
			$now = microtime( true );
121
		}
122
123
		// break apart the item name to get the timestamp
124
		$matches = null;
125
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
126
			return $now - floatval( $matches[1] );
127
		} else {
128
			return 0;
129
		}
130
	}
131
132
	function reset() {
133
		global $wpdb;
134
		$this->delete_checkout_id();
135
		$wpdb->query(
136
			$wpdb->prepare(
137
				"DELETE FROM $wpdb->options WHERE option_name LIKE %s",
138
				"jpsq_{$this->id}-%"
139
			)
140
		);
141
	}
142
143
	function size() {
144
		global $wpdb;
145
146
		return (int) $wpdb->get_var(
147
			$wpdb->prepare(
148
				"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s",
149
				"jpsq_{$this->id}-%"
150
			)
151
		);
152
	}
153
154
	// we use this peculiar implementation because it's much faster than count(*)
155
	function has_any_items() {
156
		global $wpdb;
157
		$value = $wpdb->get_var(
158
			$wpdb->prepare(
159
				"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )",
160
				"jpsq_{$this->id}-%"
161
			)
162
		);
163
164
		return ( $value === '1' );
165
	}
166
167
	function checkout( $buffer_size ) {
168
		if ( $this->get_checkout_id() ) {
169
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
170
		}
171
172
		$buffer_id = uniqid();
173
174
		$result = $this->set_checkout_id( $buffer_id );
175
176
		if ( ! $result || is_wp_error( $result ) ) {
177
			return $result;
178
		}
179
180
		$items = $this->fetch_items( $buffer_size );
181
182
		if ( count( $items ) === 0 ) {
183
			return false;
184
		}
185
186
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
187
188
		return $buffer;
189
	}
190
191
	// this checks out rows until it either empties the queue or hits a certain memory limit
192
	// it loads the sizes from the DB first so that it doesn't accidentally
193
	// load more data into memory than it needs to.
194
	// The only way it will load more items than $max_size is if a single queue item
195
	// exceeds the memory limit, but in that case it will send that item by itself.
196
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
197
		if ( $this->get_checkout_id() ) {
198
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
199
		}
200
201
		$buffer_id = uniqid();
202
203
		$result = $this->set_checkout_id( $buffer_id );
204
205
		if ( ! $result || is_wp_error( $result ) ) {
206
			return $result;
207
		}
208
209
		// get the map of buffer_id -> memory_size
210
		global $wpdb;
211
212
		$items_with_size = $wpdb->get_results(
213
			$wpdb->prepare(
214
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
215
				"jpsq_{$this->id}-%",
216
				$max_buffer_size
217
			),
218
			OBJECT
219
		);
220
221
		if ( count( $items_with_size ) === 0 ) {
222
			return false;
223
		}
224
225
		$total_memory = 0;
226
227
		$min_item_id = $max_item_id = $items_with_size[0]->id;
228
229
		foreach ( $items_with_size as $id => $item_with_size ) {
230
			$total_memory += $item_with_size->value_size;
231
232
			// if this is the first item and it exceeds memory, allow loop to continue
233
			// we will exit on the next iteration instead
234
			if ( $total_memory > $max_memory && $id > 0 ) {
235
				break;
236
			}
237
238
			$max_item_id = $item_with_size->id;
239
		}
240
241
		$query = $wpdb->prepare(
242
			"SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",
243
			$min_item_id,
244
			$max_item_id
245
		);
246
247
		$items = $wpdb->get_results( $query, OBJECT );
248
		foreach ( $items as $item ) {
249
			$item->value = maybe_unserialize( $item->value );
250
		}
251
252
		if ( count( $items ) === 0 ) {
253
			$this->delete_checkout_id();
254
255
			return false;
256
		}
257
258
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
259
260
		return $buffer;
261
	}
262
263
	function checkin( $buffer ) {
264
		$is_valid = $this->validate_checkout( $buffer );
265
266
		if ( is_wp_error( $is_valid ) ) {
267
			return $is_valid;
268
		}
269
270
		$this->delete_checkout_id();
271
272
		return true;
273
	}
274
275
	function close( $buffer, $ids_to_remove = null ) {
276
		$is_valid = $this->validate_checkout( $buffer );
277
278
		if ( is_wp_error( $is_valid ) ) {
279
			return $is_valid;
280
		}
281
282
		$this->delete_checkout_id();
283
284
		// by default clear all items in the buffer
285
		if ( is_null( $ids_to_remove ) ) {
286
			$ids_to_remove = $buffer->get_item_ids();
287
		}
288
289
		global $wpdb;
290
291
		if ( count( $ids_to_remove ) > 0 ) {
292
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
293
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
294
			$wpdb->query( $query );
295
		}
296
297
		return true;
298
	}
299
300
	function flush_all() {
301
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
302
		$this->reset();
303
304
		return $items;
305
	}
306
307
	function get_all() {
308
		return $this->fetch_items();
309
	}
310
311
	// use with caution, this could allow multiple processes to delete
312
	// and send from the queue at the same time
313
	function force_checkin() {
314
		$this->delete_checkout_id();
315
	}
316
317
	// used to lock checkouts from the queue.
318
	// tries to wait up to $timeout seconds for the queue to be empty
319
	function lock( $timeout = 30 ) {
320
		$tries = 0;
321
322
		while ( $this->has_any_items() && $tries < $timeout ) {
323
			sleep( 1 );
324
			$tries += 1;
325
		}
326
327
		if ( $tries === 30 ) {
328
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
329
		}
330
331
		if ( $this->get_checkout_id() ) {
332
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
333
		}
334
335
		// hopefully this means we can acquire a checkout?
336
		$result = $this->set_checkout_id( 'lock' );
337
338
		if ( ! $result || is_wp_error( $result ) ) {
339
			return $result;
340
		}
341
342
		return true;
343
	}
344
345
	function unlock() {
346
		return $this->delete_checkout_id();
347
	}
348
349
	/**
350
	 * This option is specifically chosen to, as much as possible, preserve time order
351
	 * and minimise the possibility of collisions between multiple processes working
352
	 * at the same time.
353
	 *
354
	 * @return string
355
	 */
356
	protected function generate_option_name_timestamp() {
357
		return sprintf( '%.6f', microtime( true ) );
358
	}
359
360
	private function get_checkout_id() {
361
		global $wpdb;
362
		$checkout_value = $wpdb->get_var(
363
			$wpdb->prepare(
364
				"SELECT option_value FROM $wpdb->options WHERE option_name = %s",
365
				$this->get_lock_option_name()
366
			)
367
		);
368
369
		if ( $checkout_value ) {
370
			list( $checkout_id, $timestamp ) = explode( ':', $checkout_value );
371
			if ( intval( $timestamp ) > time() ) {
372
				return $checkout_id;
373
			}
374
		}
375
376
		return false;
377
	}
378
379
	private function set_checkout_id( $checkout_id ) {
380
		global $wpdb;
381
382
		$expires     = time() + Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout;
0 ignored issues
show
Bug introduced by
The property default_sync_queue_lock_timeout cannot be accessed from this context as it is declared private in class Jetpack_Sync_Defaults.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
383
		$updated_num = $wpdb->query(
384
			$wpdb->prepare(
385
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
386
				"$checkout_id:$expires",
387
				$this->get_lock_option_name()
388
			)
389
		);
390
391
		if ( ! $updated_num ) {
392
			$updated_num = $wpdb->query(
393
				$wpdb->prepare(
394
					"INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )",
395
					$this->get_lock_option_name(),
396
					"$checkout_id:$expires"
397
				)
398
			);
399
		}
400
401
		return $updated_num;
402
	}
403
404
	private function delete_checkout_id() {
405
		global $wpdb;
406
		// rather than delete, which causes fragmentation, we update in place
407
		return $wpdb->query(
408
			$wpdb->prepare(
409
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
410
				'0:0',
411
				$this->get_lock_option_name()
412
			)
413
		);
414
415
	}
416
417
	private function get_lock_option_name() {
418
		return "jpsq_{$this->id}_checkout";
419
	}
420
421
	private function get_next_data_row_option_name() {
422
		$timestamp = $this->generate_option_name_timestamp();
423
424
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
425
		if ( $this->row_iterator === PHP_INT_MAX ) {
426
			$this->row_iterator = 0;
427
		} else {
428
			$this->row_iterator += 1;
429
		}
430
431
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
432
	}
433
434
	private function fetch_items( $limit = null ) {
435
		global $wpdb;
436
437
		if ( $limit ) {
438
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
439
		} else {
440
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
441
		}
442
443
		$items = $wpdb->get_results( $query_sql, OBJECT );
444
		foreach ( $items as $item ) {
445
			$item->value = maybe_unserialize( $item->value );
446
		}
447
448
		return $items;
449
	}
450
451
	private function validate_checkout( $buffer ) {
452
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
453
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
454
		}
455
456
		$checkout_id = $this->get_checkout_id();
457
458
		if ( ! $checkout_id ) {
459
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
460
		}
461
462
		if ( $checkout_id != $buffer->id ) {
463
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
464
		}
465
466
		return true;
467
	}
468
}
469
470
class Jetpack_Sync_Utils {
471
472
	static function get_item_values( $items ) {
473
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
474
	}
475
476
	static function get_item_ids( $items ) {
477
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
478
	}
479
480
	private static function get_item_value( $item ) {
481
		return $item->value;
482
	}
483
484
	private static function get_item_id( $item ) {
485
		return $item->id;
486
	}
487
}
488